Source code for adapya.era.tapada

#! /usr/bin/env python
# -*- coding: latin1 -*-
""" Replication Target Adapter for Adabas

o to do:
  transaction continuation
  stats() move into reptor URBS handler
  sversion checking
"""
from __future__ import print_function          # PY3


import sys
import datetime as mdt # requires Python 2.3

from adapya.base import stck
from adapya.base.defs import log,LOGBEFORE,LOGCMD,LOGCB,LOGRB,LOGRSP,LOGFB
log(LOGCB+LOGFB+LOGRB)

from adapya.adabas.api import Adabas
from adapya.adabas.api import DatabaseError, adaSetTimeout


# reptor.Replicator.instance.logging
LOGTAPA = 1 << 16
LOGTAPA2 = 1 << 17
LOGTAPA4 = 1 << 18

from adapya.era import reptor,urb
from adapya.era.urb import URBDEYE,URBEEYE,URBHEYE,URBREYE,URBTEYE

dt=mdt.datetime(mdt.MINYEAR,1,1)
DELTA0=mdt.timedelta(0)

# set to 1 if delete record before update or insert
SAFEUPDATE=0


[docs]class UexAdabas(Adabas): # Special for testing lnkuex_0 exits # overrides adabas() call with call to lnkuex_() when lnkuex parm is given # lnkuexso=None
[docs] def call(self, **cbfields): """ issue Call to lnkuex_0 with the Adabas control block class variables :param cbfields: list of key value pairs that can be used to set the control block before the call """ global totalCalls, lnkuexso cb=self.cb for (key, val) in cbfields.items(): setattr(cb,key,val) if self.dbid==0 and cb.dbid!=0: self.dbid=cb.dbid # remember dbid if not yet done if cb.typ==0x04: # physical call (default is 0x30) if self.nucid==0 and cb.pnucid!=0: self.nucid=cb.pnucid # remember dbid if not yet done cb.pdbid=self.dbid cb.pnucid=self.nucid cb.rsp=0 #o cb.dbid=0 else: cb.typ=0x30 # logical call (reset after call adaOS6.1) cb.dbid=self.dbid cb.pdbid=0 cb.pnucid=0 if adabas.logopt&LOGBEFORE: log.debug('Before Adabas call') self.logapa() # --- call lnkuex_0: exit's response is in i ---- i = self.lnkuexso.lnkuex_0(self.acb, self.fb,self.rb,self.sb,self.vb,self.ib) # ----------------------------------------------- if i: self.cb.rsp=i else: self.cb.rsp=0 # clear dbid if self.cb.rsp > 0 and self.cb.rsp != 3: # prepare subcode data and error texts if sys.byteorder == 'big': self.sub1=self.cb.ad2>>16 self.sub2=self.cb.ad2&0xFFFF else: self.sub2=self.cb.ad2>>16 self.sub1=self.cb.ad2&0xFFFF errtext=adaerror.rsptext(self.cb.rsp, self.sub1, self.sub2, cmd=self.cb.cmd, subcmd1=self.cb.op1, subcmd2=self.cb.op2), if adabas.logopt&LOGCMD or \ (adabas.logopt&LOGRSP and \ (self.cb.rsp not in (0,2,3)) and \ not (self.cb.rsp == 64 and self.cb.cmd == 'CL')): log.debug('After Adabas call') self.logapa() if self.expected_responses: xrsp,xsub = self.expected_responses.pop(0) if xsub == None: # only response code specified if adabas.logopt&LOGCMD: log.debug('Checking for expected response %d'%xrsp) assert xrsp == self.cb.rsp, \ 'Unexpected response %d, expected response %d'%( self.cb.rsp, xrsp) else: if adabas.logopt&LOGCMD: log.debug('Checking for expected response %d/%d'%(xrsp,xsub)) assert xrsp == self.cb.rsp and xsub == self.sub2, \ 'Unexpected response %d/subcode %d, expected response %d/%d'%( self.cb.rsp, self.sub2, xrsp, xsub) return if self.cb.rsp == 0: return elif self.cb.rsp == 2: # ignore DE truncation warning # self.cb.rsp = 0 return elif self.cb.rsp == 3: self.cb.rsp = 0 raise DataEnd("End of Data",self) elif self.cb.rsp > 0 and not \ (self.cb.rsp == 64 and self.cb.cmd == 'CL'): log.debug('Adabas Database Error: %s'% errtext) # do not raise if CL and rsp=64 raise DatabaseError(errtext,self)
[docs]class Tapada(reptor.Replicator): """Class implements the Adabas Database Update handlers and processes input data passed via the process() method """ def __init__(self,subsparm,lnkuexso=None): global dt super(Tapada,self).__init__() # initialize Replicator() self.setHandler(URBDEYE, self.metUrbd) self.setHandler(URBEEYE, self.metUrbe) self.setHandler(URBHEYE, self.metUrbh) self.setHandler(URBREYE, self.metUrbr) self.setHandler(URBTEYE, self.metUrbt) if lnkuexso: self.c1=UexAdabas(rbl=subsparm.rblmax,fbl=0) # also used for open/close self.c2=UexAdabas(rbl=1,fbl=1) # used for L4 self.c1.lnkuexso=lnkuexso self.c2.lnkuexso=lnkuexso else: self.c1=Adabas(rbl=subsparm.rblmax,fbl=0) # also used for open/close self.c2=Adabas(rbl=1,fbl=1) # used for L4 self.c1.cb.dbid=subsparm.tdbid # set target database self.c2.fb.value='.' self.c2.cb.dbid=subsparm.tdbid # set target database self.transSeq=0 # current TA sequence number self.rcnt=0 # count of records (URBRs) in transaction (urbtrcnt) # also indicator that an input transaction has started self.dcnt=0 # count of Data elements (URBD) for current record (URBR) self.rsnr=0 # current rec seq nr in transaction, if rcnt == rsnr: do ET # self.dsnr=0 # directly from URBD self.lastIsn=0 self.curIsn=0 self.rtyp='' # current record type D, I, U or R (initial state) self.psf = 0 # current parameter subscr. source file (ParmsSfile) self.psu = subsparm # subscription parameters from configuration module self.sfile = 0 # current source file # statistics self.numIsn=0 # number of records processed (urbrrsp==0) self.dt=dt.now() self.bytes=0 self.messages=0 self.records=0 # total number of records updated or inserted self.transactions=0 self.transUpdCnt=0 self.firstTransaction=None self.lastTransaction=None def terminate(self,printstats=0): if self.psu.tdbid and self.rcnt: self.perrors += 1 log.error('Error: Transaction %d not completed, BT-ing' %(self.transSeq, )) self.c1.bt() self.c1.close() if __debug__: if self.logging & LOGTAPA: log.info('Database %d closed' % (self.psu.tdbid,)) if printstats: print( '\t%10d records updated/inserted' % (self.records,)) print( '\t%10d transactions processed' % (self.transactions,)) if self.firstTransaction: sub, seq, tim = self.firstTransaction print( 'First transaction: sub=%s, seq=%d, ttime=%s' % (sub,seq, stck.sstckd(tim))) if self.lastTransaction: sub, seq, tim = self.lastTransaction print( 'Last transaction : sub=%s, seq=%d, ttime=%s' % (sub,seq, stck.sstckd(tim)))
[docs] def metUrbr(self, rr, substat): "URBR Record block handler" if not self.rcnt: # skipping records of this subscription self.perrors += 1 log.error('Error: URBR record received without Transaction (URBT)') return self.rsnr = rr.urbrrsnr # recno within transaction self.dcnt = rr.urbrdcnt # number of data elements following if rr.urbrrsp == 0: # only count successful records self.numIsn+=1 self.curIsn=0 # reset curIsn if record is ignored, used in urbd handler if rr.urbrfnr == self.sfile: self.curIsn=rr.urbrisn # set ISN self.rtyp=rr.urbrtyp else: # set new sfile fnr=rr.urbrfnr for sf in self.psu.sfiles: if fnr == sf.sfnr: # matched source file self.psf = sf self.curIsn = rr.urbrisn # set ISN self.sfile = sf.sfnr # remember source file nr self.rtyp = rr.urbrtyp self.c1.cb.fnr = sf.tfnr self.c2.cb.fnr = sf.tfnr # for L4/E1 self.c1.cb.isn = self.curIsn if self.psf.rbl: self.c1.cb.rbl = self.psf.rbl # set rbl for current file/format else: self.c1.cb.rbl = self.psu.rblmax self.c1.cb.fbl = len( self.psf.fb ) self.c1.fb = sf.fb break # for sf else: self.perrors += 1 log.error('Error: URBR record received with Response (URBR)')
[docs] def metUrbd(self, dd, substat): "URBD data block handler" holding=0 # ISN on hold if 0: log=log.debug('Enter URBD handler: %s %s, isn %d' %\ (self.rtyp, dd.urbdtyp, self.curIsn)) if not self.rcnt: # skipping data of this subscription self.perrors += 1 log.error('Error: URBD Data records received without Transaction (URBT)') return # ignore record if self.curIsn not zero (no previous URBR) if self.curIsn!=0: self.c1.cb.isn=self.curIsn if dd.urbdtyp=='B': # before image if self.rtyp=='D' : # delete // or self.rtyp=='U)' or update try: self.c1.call(cmd='E1') self.transUpdCnt+=1 except DatabaseError as e: if e.apa.cb.rsp == 113: # tolerate rsp-113 log.debug('Note: Before Image not found ISN %d' % self.curIsn ) pass else: raise elif dd.urbdtyp=='A': # after image if not (self.rtyp=='I' or self.rtyp=='R' or self.rtyp=='U'): # it is no (insert or Initial state or update) raise 'invalid after image for urbrtyp %s' % self.rtyp # elif self.rtyp=='I' or self.rtyp=='R': # insert or Initial state else: try: holding=0 if SAFEUPDATE: # delete record to be on the safe side self.c2.call(cmd='E1',isn=self.curIsn) self.transUpdCnt+=1 else: self.c2.call(cmd='L4',isn=self.curIsn) holding=self.curIsn except DatabaseError as e: if e.apa.cb.rsp == 113: # tolerate rsp-113 log.warning('Note: Before Image not found ISN %d' % self.curIsn) pass else: log.error(e.value) dump(e.apa.acb, header='Control Block',log=log.error) raise # insert self.records+=1 self.numIsn+=1 if dd.urbdlend > self.c1.cb.rbl: raise reptor.ReptorError( 'URBD record data length %d bytes > maximum ACBRBL %d .'\ % (dd.urbdlend, self.c1.cb.rbl)) else: # copy record dlen=dd.urbdlend self.c1.rb[0:dlen]=dd.buffer[dd.offset+URBDL:dd.offset+URBDL+dd.urbdlend] try: if 0: log.debug('self.c1.cb.isn %d, curIsn %d' % (self.c1.cb.isn, self.curIsn)) if holding: self.c1.call(cmd='A1',rbl=dlen) action='Updated' else: self.c1.call(cmd='N2',rbl=dlen) action='Inserted' self.transUpdCnt+=1 if __debug__: if self.logging & LOGTAPA: log.debug("%s record %d in file %d " % ( action, self.c1.cb.isn, self.c1.cb.fnr)) except DatabaseError as e: print( e.value) dump(e.apa.acb, header='Control Block') dump(e.apa.fb, header='Format Buffer') dump(e.apa.rb, header='Record Buffer') raise else: raise 'Invalid urbdtyp %s' % dd.urbdtyp # adarpe writes no URBE if not self.typ & reptor.typURBH and \ self.rcnt == self.rsnr and self.dcnt == dd.urbddsnr: try: if __debug__: if self.logging & LOGTAPA: log.debug('End of transaction %d for subscription %s after all %d records, current update count %d' %\ (self.transSeq, self.psu.subscription, self.rcnt, self.transUpdCnt)) self.c1.et() self.rcnt=0 # reset transaction record count self.dcnt=0 # reset data element count self.transactions += 1 except DatabaseError as e: print( e.value) dump(e.apa.acb, header='Control Block') raise if __debug__: if self.logging & LOGTAPA4: log.debug('URBD: %s %s, isn %d/%d, num recs %d' %\ (self.rtyp, dd.urbdtyp, self.curIsn,self.c1.cb.isn, self.numIsn))
def metUrbt(self, tt, substat): if not self.firstTransaction: self.firstTransaction = (tt.urbtsnam, tt.urbttsnr, tt.urbtttim ) self.lastTransaction = (tt.urbtsnam, tt.urbttsnr, tt.urbtttim ) # select only this subscription if tt.urbtsnam == self.psu.subscription: self.transSeq=tt.urbttsnr self.rcnt = tt.urbtrcnt self.transUpdCnt=0 if __debug__: if self.logging & LOGTAPA4: log.debug('URBT: Transaction %d for subscription %s with %d records, current update count %d' %\ (self.transSeq, tt.urbtsnam, self.rcnt, self.transUpdCnt)) else: if __debug__: if self.logging & LOGTAPA: log.debug('Skipping transaction for subscription %s\n' % tt.urbtsnam) def metUrbe(self, ee, substat): if __debug__: if self.logging & LOGTAPA4: log.debug('Enter URBE handler: %s tsnr %d, records in TA %d, cnt %d' %\ (ee.urbesnam, self.transSeq, self.rcnt, self.transUpdCnt)) if not self.rcnt: # skipping this subscription return if ee.urbesnam != self.psu.subscription: log.debug('Error: Unexpected subscription %s' % ee.urbesnam) elif self.rcnt and self.transUpdCnt>0: if __debug__: if self.logging & LOGTAPA4: log.debug('End Transaction %d for subscription %s with %d updates' %\ (self.transSeq, self.psu.subscription, self.transUpdCnt)) if self.transSeq != ee.urbetsnr: raise 'Error: Expected ta seq number %d, but received %d' %\ (self.transSeq, ee.urbetsnr) else: try: self.c1.et() self.rcnt=0 self.transactions += 1 except DatabaseError as e: print( e.value) dump(e.apa.acb, header='Control Block') raise if __debug__: if self.logging & LOGTAPA: log.debug('URBE: End of transaction %d for subscription %s with %d records, current update count %d' %\ (self.transSeq, ee.urbesnam, self.rcnt, self.transUpdCnt)) def metUrbh(self, hh, substat): self.messages+=1 self.bytes+=hh.urbhlent
__version__ = 'v.r.l' if __version__ == 'v.r.l': _svndate='$Date: 2018-05-30 17:31:49 +0200 (Mi, 30 Mai 2018) $' _svnrev='$Rev: 837 $' __version__ = 'Dev ' + _svnrev.strip('$') + \ ' '.join(_svndate.strip('$').split()[0:3]) # Copyright 2004-ThisYear Software AG # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License.