Source code for adapya.entirex.broker

#! /usr/bin/env python
# -*- coding: latin1 -*-

"""broker.py is a Python interface to Webmethods EntireX Broker

The module loads the EntireX Broker stub:

* broker.dll for 64-bit Python otherwise broker32.dll (on Windows)
* libbroker.so shared library (on Unix)

broker.py defines the Broker class using the Advanced Communication
interface (ACI) for communicating with the EntireX broker.
"""

from __future__ import print_function          # PY3

from . import acierror
# from exceptions import Exception             # PY3
from datetime import datetime
# import string PY3
import struct
import sys
import types

import ctypes
from ctypes import c_char_p

from adapya.base.defs import Abuf
from adapya.base.dump import dump
from adapya.base.datamap import Datamap, String, Bytes, Filler, Uint1, Uint4, \
    T_IN, T_OUT, T_INOUT, str_str


[docs]class BrokerException(Exception): """ Instance will have set the following values: self.value is the EXX Broker response string self.etb is the Broker ACI call parameters that were used when the error occurred Example on how to call it:: try: raise BrokerException(value,etb) except BrokerException as e: adalog.warning('BrokerException', e.value, e.__class__) dump(e.etb.error_buffer,log=adalog.warning) """ def __init__(self, value, etb): if not value or value.startswith(' '): # indicator no explanation in acierror.py value+='\nError-Text: %s' %(etb.errtext_buffer.value) self.value = value self.etb = etb def __str__(self): return repr(self.value)
[docs]class BrokerError(BrokerException): "Subclass of BrokerException for Broker Error Responses" pass
[docs]class BrokerTimeOut(BrokerException): "Subclass of BrokerException for timeouts of requests to Broker" pass
[docs]class InterfaceError(BrokerException): "Subclass of BrokerException for Broker Interface Errors" pass
if sys.platform == 'zos': # or != ? # MVS load lib search order: STEPLIB, JOBLIB, LPA and Link List etblnk=ctypes.cdll.LoadLibrary('//BROKER2') else: if sys.platform in ('win32','cli'): # CPython or IronPython import ctypes.util etbname = ctypes.util.find_library('broker') # get full path of broker.dll print('broker=%r' % etbname) else: etbname = 'libbroker.so' try: etblnk=ctypes.cdll.LoadLibrary(etbname) except OSError as e: print('Running Python Version %s\n\ton platform %s, %d bit, byteorder=%s' % ( sys.version, sys.platform, sizeof(c_char_p)*8, sys.byteorder )) print('"%s" could not be loaded: check that EntireX bin directory' %(etbname,)) print("Exception '%r' occured" % e.value) raise if 0: if sys.platform.startswith('win'): if ctypes.sizeof(c_char_p) == 8: etblnk=ctypes.cdll.broker # 64 bit broker DLL else: etblnk=ctypes.cdll.broker32 # 32 bit broker DLL else: etblnk=ctypes.cdll.LoadLibrary('libbroker.so') etblnk.broker.argtypes = [c_char_p,c_char_p,c_char_p,c_char_p] # --- EntireX Broker API Type Constants (api_type) ----------------- API_TYPE1= 1 # --- EntireX Broker API Version Constants (api_version) ----------- API_VERS1= 1 API_VERS2= 2 API_VERS3= 3 API_VERS4= 4 API_VERS5= 5 API_VERS6= 6 API_VERS7= 7 API_VERS8= 8 API_VERS9= 9 API_VERS10= 10 API_VERS_HIGHEST= 10 # Broker V9.9 single conv. mode # --- EntireX Broker API API Function Constants (function) --------- FCT_SEND = 1 FCT_RECEIVE = 2 FCT_UNDO = 4 FCT_EOC = 5 FCT_REGISTER = 6 FCT_DEREGISTER = 7 FCT_VERSION = 8 FCT_LOGON = 9 FCT_LOGOFF = 10 FCT_SET = 11 FCT_GET = 12 FCT_SYNCPOINT = 13 FCT_KERNELVERS = 14 FCT_LOCTRANS = 15 # deprecated FCT_SETSSLPARMS = 16 FCT_SENDPUBLICATION = 17 FCT_RECVPUBLICATION = 18 FCT_SUBSCRIBE = 19 FCT_UNSUBSCRIBE = 20 FCT_CNTLPUBLICATION = 21 FCT_REPYERROR = 22 function_str = lambda i: str_str(i, {1:'SEND', 2:'RECEIVE', 4:'UNDO', 5:'EOC', 6:'REGISTER', 7:'DEREGISTER', 8:'VERSION', 9:'LOGON', 10:'LOGOFF', 11:'SET', 12:'GET', 13:'SYNCPOINT', 14:'KERNELVERS', 15:'LOCTRANS', 16:'SETSSLPARMS', 17:'SENDPUBLICATION', 18:'RECVPUBLICATION', 19:'SUBSCRIBE', 20:'UNSUBSCRIBE', 21:'CNTLPUBLICATION', 22:'REPLYERROR'} ) # --- EntireX Broker API Option Constants (option) ----------------- OPT_MSG = 0x01 OPT_HOLD = 0x02 OPT_IMMED = 0x03 OPT_QUIESCE = 0x04 OPT_EOC = 0x05 OPT_CANCEL = 0x06 OPT_LAST = 0x07 OPT_NEXT = 0x08 OPT_PREVIEW = 0x09 OPT_COMMIT = 0x0a OPT_BACKOUT = 0x0b OPT_SYNC = 0x0c OPT_ATTACH = 0x0d OPT_DELETE = 0x0e OPT_EOCCANCEL = 0x0f OPT_QUERY = 0x10 OPT_SETUSTATUS = 0x11 OPT_ANY = 0x12 OPT_TERMINATE = 0x13 OPT_DURABLE = 0x14 OPT_CHECKSERVICE = 0x15 OPT_EXTENDED = 0x16 option_str = lambda i: str_str(i, {1:'MSG', 2:'HOLD', 3:'IMMED', 4:'QUIESCE', 5:'EOC', 6:'CANCEL', 7:'LAST', 8:'NEXT', 9:'PREVIEW', 10:'COMMIT', 11:'BACKOUT', 12:'SYNC', 13:'ATTACH', 14:'DELETE', 15:'EOCCANCEL', 16:'QUERY', 17:'SETUSTATUS', 18:'ANY', 19:'TERMINATE', 20:'DURABLE', 21:'CHECKSERVICE', 22:'EXTENDED',}) # --- EntireX Broker Conversation Status Constants (conv_stat) ----- CONVSTAT_NEW = 1 CONVSTAT_OLD = 2 CONVSTAT_NONE = 3 convstat_str = lambda i: str_str(i, {1:'NEW',2:'OLD',3:'None'}) # --- EntireX Broker Store Constants (store) ----------------------- STORE_OFF= '\x01' STORE_BROKER= '\x02' # --- EntireX Broker Status Constants (status) --------------------- STAT_OFF= '\x01' STAT_STORED= '\x02' STAT_DELIVERY_ATTEMP= '\x03' STAT_DELIVERED= '\x04' STAT_PROCESSED= '\x05' STAT_DEAD= '\x06' texts_STAT = ('n/a','OFF','STORED','DELIVERY_ATTEMP','DELIVERED', 'PROCESSED','DEAD') # --- EntireX Broker UOW Status Constants (uowStatus) -------------- RECV_NONE = 0 RECEIVED = 1 ACCEPTED = 2 DELIVERED = 3 BACKEDOUT = 4 PROCESSED = 5 CANCELLED = 6 TIMEOUT = 7 DISCARDED = 8 RECV_FIRST = 9 RECV_MIDDLE = 10 RECV_LAST = 11 RECV_ONLY = 12 uowStatus_str = lambda i: str_str(i, {0:'RECV_NONE',1:'RECEIVED', 2:'ACCEPTED',3:'DELIVERED',4:'BACKEDOUT',5:'PROCESSED', 6:'CANCELLED',7:'TIMEOUT',8:'DISCARDED',9:'RECV_FIRST', 10:'RECV_MIDDLE',11:'RECV_LAST',12:'RECV_ONLY'}) # --- EntireX Broker UOW Status Persist (uowStatusPersist) --------- UWSTATP_DEFAULT= '\x00' # use default from Broker UWSTATP_NO= '\xff' # status is not persistent # --- EntireX Broker Architecture Constants (data_arch) ------------ ACODE_HIGH_ASCII_IBM= '\x01' ACODE_LOW__ASCII_IBM= '\x02' ACODE_HIGH_EBCDIC_IBM= '\x03' ACODE_LOW__EBCDIC_IBM= '\x04' ACODE_HIGH_ASCII_VAX= '\x05' ACODE_LOW__ASCII_VAX= '\x06' ACODE_HIGH_EBCDIC_VAX= '\x07' ACODE_LOW__EBCDIC_VAX= '\x08' ACODE_HIGH_ASCII_IEEE= '\x09' ACODE_LOW__ASCII_IEEE= '\x0a' ACODE_HIGH_EBCDIC_IEEE= '\x0b' ACODE_LOW__EBCDIC_IEEE= '\x0c' ACODE_HIGHEST_VALUE= '\x0d' # --- EntireX Broker Force Logon Constants (forceLogon) ------------ FORCE_LOGON_NO= 'N' FORCE_LOGON_YES= 'Y' FORCE_LOGON_S= 'S' # --- EntireX Broker Encryption Level Constants (encryptionLevel) -- ENCLEVEL_NONE= '\x00' ENCLEVEL_TO_BROKER= '\x01' ENCLEVEL_TO_TARGET= '\x02' # --- EntireX Broker Kernel Security Constants (kernelSecurity) ---- KERNEL_SECURITY_NO= 'N' KERNEL_SECURITY_YES= 'Y' KERNEL_SECURITY_USER= 'U' KERNEL_SECURITY_LIGHT= 'L' # --- EntireX Broker Compression Level Constants (compress) -------- COMPRESS_LEVEL_0= '0' COMPRESS_LEVEL_1= '1' COMPRESS_LEVEL_2= '2' COMPRESS_LEVEL_3= '3' COMPRESS_LEVEL_4= '4' COMPRESS_LEVEL_5= '5' COMPRESS_LEVEL_6= '6' COMPRESS_LEVEL_7= '7' COMPRESS_LEVEL_8= '8' COMPRESS_LEVEL_9= '9' COMPRESS_LEVEL_NO= 'N' COMPRESS_LEVEL_YES= 'Y' # --- EntireX Broker API Size of fields ---------------------------- S_ADAPTERR= 8 S_APPLICATION_NAME= 64 S_APPLICATION_TYPE= 8 S_BROKER_ID= 32 S_CLIENTUID= 32 S_COMMIT_TIME= 17 S_CONV_ID= 16 S_ENVIRONMENT= 32 S_ERROR_CODE= 8 S_ERROR_CLASS= 4 S_ERROR_NUMBER= 4 S_LOCALE= 40 S_MSGID= 32 S_MSGTYPE= 16 S_PASSWORD= 32 S_PLATFORM= 8 S_PRODUCT_VERSION= 16 S_PTIME= 8 S_PUBLICATION_ID= 16 S_PUID= 28 S_SECURITY_TOKEN= 32 S_SERVER_CLASS= 32 S_SERVER_NAME= 32 S_SERVICE= 32 S_T_NAME= 8 S_TOKEN= 32 S_TOPIC= 96 S_TXT= 40 S_U_STATUS= 32 S_UOW_ID= 16 S_USER_ID= 32 S_USRDATA= 16 S_VERS= 8 S_WAIT= 8 S_BROKER_URL= 512 ETB_CODEPAGE_USE_PLATFORM_DEFAULT= "LOCAL" # EntireX Broker control block etbcbfields = ( Uint1('api_type', opt=T_IN), Uint1('api_version', opt=T_IN), Uint1('function', opt=T_IN), Uint1('option', opt=T_IN), Filler('reserved1', 16), Uint4('send_length', opt=T_IN), Uint4('receive_length', opt=T_IN), # maximum receive length Uint4('return_length', opt=T_OUT), # length of returned data Uint4('errtext_length', opt=T_IN), # error text buffer len String('broker_id', 32, opt=T_IN), # service name String('server_class', 32), String('server_name', 32), String('service', 32), String('user_id', 32, opt=T_IN), String('password', 32, opt=T_IN), # may contain binary data String('token', 32, opt=T_IN), Bytes('security_token', 32), String('conv_id', 16), #0x124 conversational/non. String('wait', 8, opt=T_IN), #blocked/non-blocked, time in sec String('error_code', 8, opt=T_OUT), String('environment', 32, opt=T_IN), #translation parm # V2 following Uint4('adcount', opt=T_OUT), # attempted deliv. count Bytes('user_data', 16, opt=T_OUT), Bytes('msg_id', 32), String('msg_type', 16), String('ptime', 8, opt=T_IN), # future use Bytes('newpassword', 32, opt=T_IN), String('adapt_err', 8, opt=T_OUT), String('client_uid', 32, opt=T_OUT), Uint1('conv_stat', opt=T_OUT), Bytes('store', 1), # UOW is persistent Bytes('status', 1), # future use # V3 following Uint1('uowStatus'), String('uowTime', 8, opt=T_IN), # lifetime of UOW (sec) String('uowID', 16), String('userStatus', 32), Bytes('uowStatusPersist', 1, opt=T_IN), # persist flag Filler('reserved2', 3), # V4 following String('locale_string', 40, opt=T_IN), # callers set_locale Bytes('data_arch', 1, opt=T_IN), # architecture # V6 following String('forceLogon', 1, opt=T_IN), Bytes('encryptionLevel', 1), # V7 following String('kernelsecurity', 1), String('commitTime', 17, opt=T_OUT), String('compress', 1, opt=T_IN), # Filler('reserved3', 114), # size=744 ACI V6/V7 # V8 following (=Broker V7.2) Filler('reserved4', 6), # align String('uowStatusTime', 8), # UOW status life time String('topic', 96), # String('publicationID', 16), # size = 756 ACI V8 # V9 following (=Broker V8.1) Filler('reserved5', 32), # was bid (partner broker id) Filler('reserved6', 12), # align Uint4('clientId', opt=T_OUT), # unique client id set on RECV/SEND w. WAIT Filler('reserved7', 32), # align String('logCommand', 1), # String('credentialsType', 1), # default: userid/passw, '1'=IAF auth. Filler('reserved8', 34), # size=872 V9 # V10 following (=Broker V9.7) Uint4('varlist_offset'), # Uint4('long_broker_id_length'), # size = 880 / 0x350 LETBCB10 ) LETBCB9 = 872 LETBCB10 = 880 def pptime(timestring): if timestring.strip() and timestring.strip('\x00') : # string not blank or zero return "%04s-%02s-%02s %02s:%02s:%02s.%03s UTC+0" % (timestring[0:4], timestring[4:6], timestring[6:8], timestring[8:10],timestring[10:12],timestring[12:14], timestring[14:]) else: return ''
[docs]class Etbcb(Datamap): "Defines Broker control block with its attributes and Broker call()" def __init__(self, send_length=0, receive_length=0, use_api_version=API_VERS7,**kw): self.__dict__['errtext_buffer'] = Abuf(80) self.__dict__['send_buffer'] = None self.__dict__['receive_buffer'] = None self.__dict__['trace'] = None self.__dict__['use_api_version'] = use_api_version Datamap.__init__(self, 'Etbcb', *etbcbfields, **kw) self.buffer=Abuf(self.dmlen) self.errtext_length=80 self.api_type=API_TYPE1 self.api_version=use_api_version if receive_length > 0: self.receive_buffer=Abuf(receive_length) self.receive_length=receive_length if send_length > 0: self.send_buffer=Abuf(send_length) self.send_length=send_length def call(self): if self.trace&1: print('Before Broker call') dump(self.buffer, header='ETBCB') dump(self.receive_buffer, header='Receive Buffer') # print(repr(self.send_buffer), len(self.send_buffer), self.send_length) dump(self.send_buffer, header='Send Buffer') dump(self.errtext_buffer, header='Error Text Buffer') # self.error_code='' if self.trace&4: print('\n%s == EXX %s%s%s%s' % ( datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f'), function_str( self.function), ' option='+option_str(self.option) if self.option else '', ' wait='+self.wait.strip('\x00 ') if self.function==FCT_RECEIVE and self.wait > 8*' ' else '', ' user=%s token=%s Service=%s/%s/%s'%(self.user_id,self.token, self.server_class,self.server_name,self.service) if self.function==FCT_LOGON else '' )) if 0: # self.function==FCT_SEND: print(repr(self.send_buffer), len(self.send_buffer), self.send_length) dump(self.send_buffer[0:self.send_length], header=' send_buffer',prefix=' ') i = etblnk.broker(self.buffer, self.send_buffer, self.receive_buffer, self.errtext_buffer ) if self.trace&2: print('After Broker call') dump(self.buffer, header='ETBCB') # dump(self.receive_buffer, header='Receive Buffer') if self.return_length>0: dump(self.receive_buffer[0:self.return_length], header=' receive_buffer', prefix=' ') # print(repr(self.send_buffer), len(self.send_buffer), self.send_length) dump(self.send_buffer, header='Send Buffer') dump(self.errtext_buffer, header='Error Text Buffer') if self.trace&4: print(' conv_id=%-16s conv_stat=%s return_length=%d' %( self.conv_id.strip('\x00 '), convstat_str(self.conv_stat), self.return_length)) print(' uowID=%-16s uowStatus=%-9s commitTime=%s' % ( self.uowID.strip('\x00 '), uowStatus_str(self.uowStatus), pptime(self.commitTime))) if self.error_code > '00000000': if self.error_code in ( '00740074', # Wait timeout '02150373'): # Transport timeout (new with single conv?) raise BrokerTimeOut( acierror.geterror(self.error_code), self) raise BrokerError(acierror.geterror(self.error_code),self) if i != 0: raise InterfaceError(acierror.geterror('0020%04d' %i), self)
[docs]class Broker(Etbcb): """Defines the essential Broker ACI functions using the Etbcb. For reference see EntireX Broker ACI Programming. """ def __init__(self, broker_id='localhost', user_id='monty', token=None, receive_length=2048, send_length=2048): Etbcb.__init__(self, receive_length=receive_length, send_length=send_length) self.broker_id=broker_id self.user_id=user_id if token != None: self.token=token
[docs] def backout(self): "Backout UOW but continue conversation" self.function=FCT_SYNCPOINT self.option=OPT_BACKOUT self.call() self.uowID='' # reset some fields after commit self.uowStatus=RECV_NONE
[docs] def commit(self): "Commit UOW but continue conversation" self.function=FCT_SYNCPOINT self.option=OPT_COMMIT self.call() self.uowID='' # reset some fields after commit self.uowStatus=RECV_NONE
[docs] def commitEndConversation(self): "Commit UOW and end conversation" self.function=FCT_SYNCPOINT self.option=OPT_EOC self.call() self.conv_id='' # reset some fields after commit self.uowID='' self.uowStatus=RECV_NONE
[docs] def kernelVersion(self): "Determine Broker kernel version" self.function=FCT_KERNELVERS self.option = OPT_EXTENDED self.call() print('\nMAX-MSG is %d' % self.return_length) # maxmsg is returned with the OPT_EXTENDED option # obviously works with use_api_version V4 # unrelated to single conversation mode if self.api_version > self.use_api_version: self.api_version=self.use_api_version # return self.errtext_buffer[:].strip(b'\x00 ') # remove blanks and x00 return self.errtext_buffer.buf2str() # remove blanks and x00
[docs] def deregister(self): "A server can deregister a service from EntireX Broker" self.function=FCT_DEREGISTER self.option=OPT_QUIESCE self.call()
[docs] def endConversation(self, option=0): """A client or server can terminate one or more conversations. This is the EOC function in ACI terms """ self.function=FCT_EOC self.option=option # set to CANCEL to abort conversation self.call()
[docs] def logon(self, password=None, newpassword=None): "Establish communication with a Broker kernel" self.function=FCT_LOGON if password: self.password=password self.option=0 self.call()
[docs] def logoff(self): "Terminate communication with Broker kernel" self.function=FCT_LOGOFF self.option=OPT_HOLD self.call()
[docs] def receive(self, conv_id='', option=0, wait=''): """Used by clients to receive incoming messages and by servers to receive incoming requests """ self.function=FCT_RECEIVE if conv_id!='': self.conv_id=conv_id if option!=0: self.option=option if wait!='': self.wait=wait self.call()
[docs] def receiveNew(self, wait=''): """Receive any message from new conversation""" self.function=FCT_RECEIVE self.conv_id='NEW' self.option=OPT_ANY if wait!='': self.wait=wait self.call()
[docs] def register(self, option=0): """Used by servers to inform EntireX Broker that a service is available """ self.function=FCT_REGISTER self.option = option # only valid option: ATTACH self.call()
[docs] def send(self, conv_id='', option=0): """Used by clients to send requests and servers to send replies""" self.function=FCT_SEND if conv_id!='': self.conv_id=conv_id if option!=0: self.option=option self.call() if self.option == OPT_COMMIT: # self.conv_id='' # keep conv_id self.uowID='' # reset uow fields self.uowStatus=RECV_NONE elif self.option == OPT_EOC: self.conv_id='' # reset conv/uow fields self.uowID='' self.uowStatus=RECV_NONE
[docs] def syncpoint(self,option=0): """ Function allows to manage Units of Work (UOWs)""" self.function=FCT_SYNCPOINT if option!=0: self.option=option self.call() if self.option == OPT_COMMIT: # reset conv/uow fields # self.conv_id='' # leave conversation open self.uowID='' self.uowStatus=RECV_NONE
[docs] def undo(self): """Remove messages that have been sent out but not received""" self.function=FCT_UNDO self.call()
[docs] def version(self): """return the version of the EntireX Broker Stub""" self.function=FCT_VERSION self.call() if self.api_version > self.use_api_version: self.api_version=self.use_api_version if self.return_length > 0: return self.receive_buffer.buf2str()[0:self.return_length-1] else: return ''
[docs]class ParmsBrokerService: "Defines parameters for a Broker service" def __init__(self,broker_id='',user_id='',\ server_class='',server_name='',service=''): self.broker_id=broker_id self.user_id=user_id self.server_class=server_class self.server_name=server_name self.service=service
__version__ = 'v.r.l' if __version__ == 'v.r.l': _svndate='$Date: 2023-01-04 10:56:59 +0100 (Wed, 04 Jan 2023) $' _svnrev='$Rev: 1050 $' __version__ = 'Dev ' + _svnrev.strip('$') + \ ' '.join(_svndate.strip('$').split()[0:3]) # Copyright 2004-ThisYear Software AG # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.