http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py index b1e577a..accb94f 100644 --- a/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -11532,6 +11532,154 @@ class CommitTxnRequest: def __ne__(self, other): return not (self == other) +class ReplTblWriteIdStateRequest: + """ + Attributes: + - validWriteIdlist + - user + - hostName + - dbName + - tableName + - partNames + """ + + thrift_spec = ( + None, # 0 + (1, TType.STRING, 'validWriteIdlist', None, None, ), # 1 + (2, TType.STRING, 'user', None, None, ), # 2 + (3, TType.STRING, 'hostName', None, None, ), # 3 + (4, TType.STRING, 'dbName', None, None, ), # 4 + (5, TType.STRING, 'tableName', None, None, ), # 5 + (6, TType.LIST, 'partNames', (TType.STRING,None), None, ), # 6 + ) + + def __init__(self, validWriteIdlist=None, user=None, hostName=None, dbName=None, tableName=None, partNames=None,): + self.validWriteIdlist = validWriteIdlist + self.user = user + self.hostName = hostName + self.dbName = dbName + self.tableName = tableName + self.partNames = partNames + + def read(self, iprot): + if iprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None and fastbinary is not None: + fastbinary.decode_binary(self, iprot.trans, (self.__class__, self.thrift_spec)) + return + iprot.readStructBegin() + while True: + (fname, ftype, fid) = iprot.readFieldBegin() + if ftype == TType.STOP: + break + if fid == 1: + if ftype == TType.STRING: + self.validWriteIdlist = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 2: + if ftype == TType.STRING: + self.user = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 3: + if ftype == TType.STRING: + self.hostName = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 4: + if ftype == TType.STRING: + self.dbName = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 5: + if ftype == TType.STRING: + self.tableName = iprot.readString() + else: + iprot.skip(ftype) + elif fid == 6: + if ftype == TType.LIST: + self.partNames = [] + (_etype526, _size523) = iprot.readListBegin() + for _i527 in xrange(_size523): + _elem528 = iprot.readString() + self.partNames.append(_elem528) + iprot.readListEnd() + else: + iprot.skip(ftype) + else: + iprot.skip(ftype) + iprot.readFieldEnd() + iprot.readStructEnd() + + def write(self, oprot): + if oprot.__class__ == TBinaryProtocol.TBinaryProtocolAccelerated and self.thrift_spec is not None and fastbinary is not None: + oprot.trans.write(fastbinary.encode_binary(self, (self.__class__, self.thrift_spec))) + return + oprot.writeStructBegin('ReplTblWriteIdStateRequest') + if self.validWriteIdlist is not None: + oprot.writeFieldBegin('validWriteIdlist', TType.STRING, 1) + oprot.writeString(self.validWriteIdlist) + oprot.writeFieldEnd() + if self.user is not None: + oprot.writeFieldBegin('user', TType.STRING, 2) + oprot.writeString(self.user) + oprot.writeFieldEnd() + if self.hostName is not None: + oprot.writeFieldBegin('hostName', TType.STRING, 3) + oprot.writeString(self.hostName) + oprot.writeFieldEnd() + if self.dbName is not None: + oprot.writeFieldBegin('dbName', TType.STRING, 4) + oprot.writeString(self.dbName) + oprot.writeFieldEnd() + if self.tableName is not None: + oprot.writeFieldBegin('tableName', TType.STRING, 5) + oprot.writeString(self.tableName) + oprot.writeFieldEnd() + if self.partNames is not None: + oprot.writeFieldBegin('partNames', TType.LIST, 6) + oprot.writeListBegin(TType.STRING, len(self.partNames)) + for iter529 in self.partNames: + oprot.writeString(iter529) + oprot.writeListEnd() + oprot.writeFieldEnd() + oprot.writeFieldStop() + oprot.writeStructEnd() + + def validate(self): + if self.validWriteIdlist is None: + raise TProtocol.TProtocolException(message='Required field validWriteIdlist is unset!') + if self.user is None: + raise TProtocol.TProtocolException(message='Required field user is unset!') + if self.hostName is None: + raise TProtocol.TProtocolException(message='Required field hostName is unset!') + if self.dbName is None: + raise TProtocol.TProtocolException(message='Required field dbName is unset!') + if self.tableName is None: + raise TProtocol.TProtocolException(message='Required field tableName is unset!') + return + + + def __hash__(self): + value = 17 + value = (value * 31) ^ hash(self.validWriteIdlist) + value = (value * 31) ^ hash(self.user) + value = (value * 31) ^ hash(self.hostName) + value = (value * 31) ^ hash(self.dbName) + value = (value * 31) ^ hash(self.tableName) + value = (value * 31) ^ hash(self.partNames) + return value + + def __repr__(self): + L = ['%s=%r' % (key, value) + for key, value in self.__dict__.iteritems()] + return '%s(%s)' % (self.__class__.__name__, ', '.join(L)) + + def __eq__(self, other): + return isinstance(other, self.__class__) and self.__dict__ == other.__dict__ + + def __ne__(self, other): + return not (self == other) + class GetValidWriteIdsRequest: """ Attributes: @@ -11561,10 +11709,10 @@ class GetValidWriteIdsRequest: if fid == 1: if ftype == TType.LIST: self.fullTableNames = [] - (_etype526, _size523) = iprot.readListBegin() - for _i527 in xrange(_size523): - _elem528 = iprot.readString() - self.fullTableNames.append(_elem528) + (_etype533, _size530) = iprot.readListBegin() + for _i534 in xrange(_size530): + _elem535 = iprot.readString() + self.fullTableNames.append(_elem535) iprot.readListEnd() else: iprot.skip(ftype) @@ -11586,8 +11734,8 @@ class GetValidWriteIdsRequest: if self.fullTableNames is not None: oprot.writeFieldBegin('fullTableNames', TType.LIST, 1) oprot.writeListBegin(TType.STRING, len(self.fullTableNames)) - for iter529 in self.fullTableNames: - oprot.writeString(iter529) + for iter536 in self.fullTableNames: + oprot.writeString(iter536) oprot.writeListEnd() oprot.writeFieldEnd() if self.validTxnList is not None: @@ -11670,10 +11818,10 @@ class TableValidWriteIds: elif fid == 3: if ftype == TType.LIST: self.invalidWriteIds = [] - (_etype533, _size530) = iprot.readListBegin() - for _i534 in xrange(_size530): - _elem535 = iprot.readI64() - self.invalidWriteIds.append(_elem535) + (_etype540, _size537) = iprot.readListBegin() + for _i541 in xrange(_size537): + _elem542 = iprot.readI64() + self.invalidWriteIds.append(_elem542) iprot.readListEnd() else: iprot.skip(ftype) @@ -11708,8 +11856,8 @@ class TableValidWriteIds: if self.invalidWriteIds is not None: oprot.writeFieldBegin('invalidWriteIds', TType.LIST, 3) oprot.writeListBegin(TType.I64, len(self.invalidWriteIds)) - for iter536 in self.invalidWriteIds: - oprot.writeI64(iter536) + for iter543 in self.invalidWriteIds: + oprot.writeI64(iter543) oprot.writeListEnd() oprot.writeFieldEnd() if self.minOpenWriteId is not None: @@ -11781,11 +11929,11 @@ class GetValidWriteIdsResponse: if fid == 1: if ftype == TType.LIST: self.tblValidWriteIds = [] - (_etype540, _size537) = iprot.readListBegin() - for _i541 in xrange(_size537): - _elem542 = TableValidWriteIds() - _elem542.read(iprot) - self.tblValidWriteIds.append(_elem542) + (_etype547, _size544) = iprot.readListBegin() + for _i548 in xrange(_size544): + _elem549 = TableValidWriteIds() + _elem549.read(iprot) + self.tblValidWriteIds.append(_elem549) iprot.readListEnd() else: iprot.skip(ftype) @@ -11802,8 +11950,8 @@ class GetValidWriteIdsResponse: if self.tblValidWriteIds is not None: oprot.writeFieldBegin('tblValidWriteIds', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.tblValidWriteIds)) - for iter543 in self.tblValidWriteIds: - iter543.write(oprot) + for iter550 in self.tblValidWriteIds: + iter550.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -11879,10 +12027,10 @@ class AllocateTableWriteIdsRequest: elif fid == 3: if ftype == TType.LIST: self.txnIds = [] - (_etype547, _size544) = iprot.readListBegin() - for _i548 in xrange(_size544): - _elem549 = iprot.readI64() - self.txnIds.append(_elem549) + (_etype554, _size551) = iprot.readListBegin() + for _i555 in xrange(_size551): + _elem556 = iprot.readI64() + self.txnIds.append(_elem556) iprot.readListEnd() else: iprot.skip(ftype) @@ -11894,11 +12042,11 @@ class AllocateTableWriteIdsRequest: elif fid == 5: if ftype == TType.LIST: self.srcTxnToWriteIdList = [] - (_etype553, _size550) = iprot.readListBegin() - for _i554 in xrange(_size550): - _elem555 = TxnToWriteId() - _elem555.read(iprot) - self.srcTxnToWriteIdList.append(_elem555) + (_etype560, _size557) = iprot.readListBegin() + for _i561 in xrange(_size557): + _elem562 = TxnToWriteId() + _elem562.read(iprot) + self.srcTxnToWriteIdList.append(_elem562) iprot.readListEnd() else: iprot.skip(ftype) @@ -11923,8 +12071,8 @@ class AllocateTableWriteIdsRequest: if self.txnIds is not None: oprot.writeFieldBegin('txnIds', TType.LIST, 3) oprot.writeListBegin(TType.I64, len(self.txnIds)) - for iter556 in self.txnIds: - oprot.writeI64(iter556) + for iter563 in self.txnIds: + oprot.writeI64(iter563) oprot.writeListEnd() oprot.writeFieldEnd() if self.replPolicy is not None: @@ -11934,8 +12082,8 @@ class AllocateTableWriteIdsRequest: if self.srcTxnToWriteIdList is not None: oprot.writeFieldBegin('srcTxnToWriteIdList', TType.LIST, 5) oprot.writeListBegin(TType.STRUCT, len(self.srcTxnToWriteIdList)) - for iter557 in self.srcTxnToWriteIdList: - iter557.write(oprot) + for iter564 in self.srcTxnToWriteIdList: + iter564.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -12077,11 +12225,11 @@ class AllocateTableWriteIdsResponse: if fid == 1: if ftype == TType.LIST: self.txnToWriteIds = [] - (_etype561, _size558) = iprot.readListBegin() - for _i562 in xrange(_size558): - _elem563 = TxnToWriteId() - _elem563.read(iprot) - self.txnToWriteIds.append(_elem563) + (_etype568, _size565) = iprot.readListBegin() + for _i569 in xrange(_size565): + _elem570 = TxnToWriteId() + _elem570.read(iprot) + self.txnToWriteIds.append(_elem570) iprot.readListEnd() else: iprot.skip(ftype) @@ -12098,8 +12246,8 @@ class AllocateTableWriteIdsResponse: if self.txnToWriteIds is not None: oprot.writeFieldBegin('txnToWriteIds', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.txnToWriteIds)) - for iter564 in self.txnToWriteIds: - iter564.write(oprot) + for iter571 in self.txnToWriteIds: + iter571.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -12327,11 +12475,11 @@ class LockRequest: if fid == 1: if ftype == TType.LIST: self.component = [] - (_etype568, _size565) = iprot.readListBegin() - for _i569 in xrange(_size565): - _elem570 = LockComponent() - _elem570.read(iprot) - self.component.append(_elem570) + (_etype575, _size572) = iprot.readListBegin() + for _i576 in xrange(_size572): + _elem577 = LockComponent() + _elem577.read(iprot) + self.component.append(_elem577) iprot.readListEnd() else: iprot.skip(ftype) @@ -12368,8 +12516,8 @@ class LockRequest: if self.component is not None: oprot.writeFieldBegin('component', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.component)) - for iter571 in self.component: - iter571.write(oprot) + for iter578 in self.component: + iter578.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.txnid is not None: @@ -13067,11 +13215,11 @@ class ShowLocksResponse: if fid == 1: if ftype == TType.LIST: self.locks = [] - (_etype575, _size572) = iprot.readListBegin() - for _i576 in xrange(_size572): - _elem577 = ShowLocksResponseElement() - _elem577.read(iprot) - self.locks.append(_elem577) + (_etype582, _size579) = iprot.readListBegin() + for _i583 in xrange(_size579): + _elem584 = ShowLocksResponseElement() + _elem584.read(iprot) + self.locks.append(_elem584) iprot.readListEnd() else: iprot.skip(ftype) @@ -13088,8 +13236,8 @@ class ShowLocksResponse: if self.locks is not None: oprot.writeFieldBegin('locks', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.locks)) - for iter578 in self.locks: - iter578.write(oprot) + for iter585 in self.locks: + iter585.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -13304,20 +13452,20 @@ class HeartbeatTxnRangeResponse: if fid == 1: if ftype == TType.SET: self.aborted = set() - (_etype582, _size579) = iprot.readSetBegin() - for _i583 in xrange(_size579): - _elem584 = iprot.readI64() - self.aborted.add(_elem584) + (_etype589, _size586) = iprot.readSetBegin() + for _i590 in xrange(_size586): + _elem591 = iprot.readI64() + self.aborted.add(_elem591) iprot.readSetEnd() else: iprot.skip(ftype) elif fid == 2: if ftype == TType.SET: self.nosuch = set() - (_etype588, _size585) = iprot.readSetBegin() - for _i589 in xrange(_size585): - _elem590 = iprot.readI64() - self.nosuch.add(_elem590) + (_etype595, _size592) = iprot.readSetBegin() + for _i596 in xrange(_size592): + _elem597 = iprot.readI64() + self.nosuch.add(_elem597) iprot.readSetEnd() else: iprot.skip(ftype) @@ -13334,15 +13482,15 @@ class HeartbeatTxnRangeResponse: if self.aborted is not None: oprot.writeFieldBegin('aborted', TType.SET, 1) oprot.writeSetBegin(TType.I64, len(self.aborted)) - for iter591 in self.aborted: - oprot.writeI64(iter591) + for iter598 in self.aborted: + oprot.writeI64(iter598) oprot.writeSetEnd() oprot.writeFieldEnd() if self.nosuch is not None: oprot.writeFieldBegin('nosuch', TType.SET, 2) oprot.writeSetBegin(TType.I64, len(self.nosuch)) - for iter592 in self.nosuch: - oprot.writeI64(iter592) + for iter599 in self.nosuch: + oprot.writeI64(iter599) oprot.writeSetEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -13439,11 +13587,11 @@ class CompactionRequest: elif fid == 6: if ftype == TType.MAP: self.properties = {} - (_ktype594, _vtype595, _size593 ) = iprot.readMapBegin() - for _i597 in xrange(_size593): - _key598 = iprot.readString() - _val599 = iprot.readString() - self.properties[_key598] = _val599 + (_ktype601, _vtype602, _size600 ) = iprot.readMapBegin() + for _i604 in xrange(_size600): + _key605 = iprot.readString() + _val606 = iprot.readString() + self.properties[_key605] = _val606 iprot.readMapEnd() else: iprot.skip(ftype) @@ -13480,9 +13628,9 @@ class CompactionRequest: if self.properties is not None: oprot.writeFieldBegin('properties', TType.MAP, 6) oprot.writeMapBegin(TType.STRING, TType.STRING, len(self.properties)) - for kiter600,viter601 in self.properties.items(): - oprot.writeString(kiter600) - oprot.writeString(viter601) + for kiter607,viter608 in self.properties.items(): + oprot.writeString(kiter607) + oprot.writeString(viter608) oprot.writeMapEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -13917,11 +14065,11 @@ class ShowCompactResponse: if fid == 1: if ftype == TType.LIST: self.compacts = [] - (_etype605, _size602) = iprot.readListBegin() - for _i606 in xrange(_size602): - _elem607 = ShowCompactResponseElement() - _elem607.read(iprot) - self.compacts.append(_elem607) + (_etype612, _size609) = iprot.readListBegin() + for _i613 in xrange(_size609): + _elem614 = ShowCompactResponseElement() + _elem614.read(iprot) + self.compacts.append(_elem614) iprot.readListEnd() else: iprot.skip(ftype) @@ -13938,8 +14086,8 @@ class ShowCompactResponse: if self.compacts is not None: oprot.writeFieldBegin('compacts', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.compacts)) - for iter608 in self.compacts: - iter608.write(oprot) + for iter615 in self.compacts: + iter615.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -14028,10 +14176,10 @@ class AddDynamicPartitions: elif fid == 5: if ftype == TType.LIST: self.partitionnames = [] - (_etype612, _size609) = iprot.readListBegin() - for _i613 in xrange(_size609): - _elem614 = iprot.readString() - self.partitionnames.append(_elem614) + (_etype619, _size616) = iprot.readListBegin() + for _i620 in xrange(_size616): + _elem621 = iprot.readString() + self.partitionnames.append(_elem621) iprot.readListEnd() else: iprot.skip(ftype) @@ -14069,8 +14217,8 @@ class AddDynamicPartitions: if self.partitionnames is not None: oprot.writeFieldBegin('partitionnames', TType.LIST, 5) oprot.writeListBegin(TType.STRING, len(self.partitionnames)) - for iter615 in self.partitionnames: - oprot.writeString(iter615) + for iter622 in self.partitionnames: + oprot.writeString(iter622) oprot.writeListEnd() oprot.writeFieldEnd() if self.operationType is not None: @@ -14300,10 +14448,10 @@ class CreationMetadata: elif fid == 4: if ftype == TType.SET: self.tablesUsed = set() - (_etype619, _size616) = iprot.readSetBegin() - for _i620 in xrange(_size616): - _elem621 = iprot.readString() - self.tablesUsed.add(_elem621) + (_etype626, _size623) = iprot.readSetBegin() + for _i627 in xrange(_size623): + _elem628 = iprot.readString() + self.tablesUsed.add(_elem628) iprot.readSetEnd() else: iprot.skip(ftype) @@ -14337,8 +14485,8 @@ class CreationMetadata: if self.tablesUsed is not None: oprot.writeFieldBegin('tablesUsed', TType.SET, 4) oprot.writeSetBegin(TType.STRING, len(self.tablesUsed)) - for iter622 in self.tablesUsed: - oprot.writeString(iter622) + for iter629 in self.tablesUsed: + oprot.writeString(iter629) oprot.writeSetEnd() oprot.writeFieldEnd() if self.validTxnList is not None: @@ -14650,11 +14798,11 @@ class NotificationEventResponse: if fid == 1: if ftype == TType.LIST: self.events = [] - (_etype626, _size623) = iprot.readListBegin() - for _i627 in xrange(_size623): - _elem628 = NotificationEvent() - _elem628.read(iprot) - self.events.append(_elem628) + (_etype633, _size630) = iprot.readListBegin() + for _i634 in xrange(_size630): + _elem635 = NotificationEvent() + _elem635.read(iprot) + self.events.append(_elem635) iprot.readListEnd() else: iprot.skip(ftype) @@ -14671,8 +14819,8 @@ class NotificationEventResponse: if self.events is not None: oprot.writeFieldBegin('events', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.events)) - for iter629 in self.events: - iter629.write(oprot) + for iter636 in self.events: + iter636.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -14966,20 +15114,20 @@ class InsertEventRequestData: elif fid == 2: if ftype == TType.LIST: self.filesAdded = [] - (_etype633, _size630) = iprot.readListBegin() - for _i634 in xrange(_size630): - _elem635 = iprot.readString() - self.filesAdded.append(_elem635) + (_etype640, _size637) = iprot.readListBegin() + for _i641 in xrange(_size637): + _elem642 = iprot.readString() + self.filesAdded.append(_elem642) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 3: if ftype == TType.LIST: self.filesAddedChecksum = [] - (_etype639, _size636) = iprot.readListBegin() - for _i640 in xrange(_size636): - _elem641 = iprot.readString() - self.filesAddedChecksum.append(_elem641) + (_etype646, _size643) = iprot.readListBegin() + for _i647 in xrange(_size643): + _elem648 = iprot.readString() + self.filesAddedChecksum.append(_elem648) iprot.readListEnd() else: iprot.skip(ftype) @@ -15000,15 +15148,15 @@ class InsertEventRequestData: if self.filesAdded is not None: oprot.writeFieldBegin('filesAdded', TType.LIST, 2) oprot.writeListBegin(TType.STRING, len(self.filesAdded)) - for iter642 in self.filesAdded: - oprot.writeString(iter642) + for iter649 in self.filesAdded: + oprot.writeString(iter649) oprot.writeListEnd() oprot.writeFieldEnd() if self.filesAddedChecksum is not None: oprot.writeFieldBegin('filesAddedChecksum', TType.LIST, 3) oprot.writeListBegin(TType.STRING, len(self.filesAddedChecksum)) - for iter643 in self.filesAddedChecksum: - oprot.writeString(iter643) + for iter650 in self.filesAddedChecksum: + oprot.writeString(iter650) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -15166,10 +15314,10 @@ class FireEventRequest: elif fid == 5: if ftype == TType.LIST: self.partitionVals = [] - (_etype647, _size644) = iprot.readListBegin() - for _i648 in xrange(_size644): - _elem649 = iprot.readString() - self.partitionVals.append(_elem649) + (_etype654, _size651) = iprot.readListBegin() + for _i655 in xrange(_size651): + _elem656 = iprot.readString() + self.partitionVals.append(_elem656) iprot.readListEnd() else: iprot.skip(ftype) @@ -15207,8 +15355,8 @@ class FireEventRequest: if self.partitionVals is not None: oprot.writeFieldBegin('partitionVals', TType.LIST, 5) oprot.writeListBegin(TType.STRING, len(self.partitionVals)) - for iter650 in self.partitionVals: - oprot.writeString(iter650) + for iter657 in self.partitionVals: + oprot.writeString(iter657) oprot.writeListEnd() oprot.writeFieldEnd() if self.catName is not None: @@ -15400,12 +15548,12 @@ class GetFileMetadataByExprResult: if fid == 1: if ftype == TType.MAP: self.metadata = {} - (_ktype652, _vtype653, _size651 ) = iprot.readMapBegin() - for _i655 in xrange(_size651): - _key656 = iprot.readI64() - _val657 = MetadataPpdResult() - _val657.read(iprot) - self.metadata[_key656] = _val657 + (_ktype659, _vtype660, _size658 ) = iprot.readMapBegin() + for _i662 in xrange(_size658): + _key663 = iprot.readI64() + _val664 = MetadataPpdResult() + _val664.read(iprot) + self.metadata[_key663] = _val664 iprot.readMapEnd() else: iprot.skip(ftype) @@ -15427,9 +15575,9 @@ class GetFileMetadataByExprResult: if self.metadata is not None: oprot.writeFieldBegin('metadata', TType.MAP, 1) oprot.writeMapBegin(TType.I64, TType.STRUCT, len(self.metadata)) - for kiter658,viter659 in self.metadata.items(): - oprot.writeI64(kiter658) - viter659.write(oprot) + for kiter665,viter666 in self.metadata.items(): + oprot.writeI64(kiter665) + viter666.write(oprot) oprot.writeMapEnd() oprot.writeFieldEnd() if self.isSupported is not None: @@ -15499,10 +15647,10 @@ class GetFileMetadataByExprRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype663, _size660) = iprot.readListBegin() - for _i664 in xrange(_size660): - _elem665 = iprot.readI64() - self.fileIds.append(_elem665) + (_etype670, _size667) = iprot.readListBegin() + for _i671 in xrange(_size667): + _elem672 = iprot.readI64() + self.fileIds.append(_elem672) iprot.readListEnd() else: iprot.skip(ftype) @@ -15534,8 +15682,8 @@ class GetFileMetadataByExprRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter666 in self.fileIds: - oprot.writeI64(iter666) + for iter673 in self.fileIds: + oprot.writeI64(iter673) oprot.writeListEnd() oprot.writeFieldEnd() if self.expr is not None: @@ -15609,11 +15757,11 @@ class GetFileMetadataResult: if fid == 1: if ftype == TType.MAP: self.metadata = {} - (_ktype668, _vtype669, _size667 ) = iprot.readMapBegin() - for _i671 in xrange(_size667): - _key672 = iprot.readI64() - _val673 = iprot.readString() - self.metadata[_key672] = _val673 + (_ktype675, _vtype676, _size674 ) = iprot.readMapBegin() + for _i678 in xrange(_size674): + _key679 = iprot.readI64() + _val680 = iprot.readString() + self.metadata[_key679] = _val680 iprot.readMapEnd() else: iprot.skip(ftype) @@ -15635,9 +15783,9 @@ class GetFileMetadataResult: if self.metadata is not None: oprot.writeFieldBegin('metadata', TType.MAP, 1) oprot.writeMapBegin(TType.I64, TType.STRING, len(self.metadata)) - for kiter674,viter675 in self.metadata.items(): - oprot.writeI64(kiter674) - oprot.writeString(viter675) + for kiter681,viter682 in self.metadata.items(): + oprot.writeI64(kiter681) + oprot.writeString(viter682) oprot.writeMapEnd() oprot.writeFieldEnd() if self.isSupported is not None: @@ -15698,10 +15846,10 @@ class GetFileMetadataRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype679, _size676) = iprot.readListBegin() - for _i680 in xrange(_size676): - _elem681 = iprot.readI64() - self.fileIds.append(_elem681) + (_etype686, _size683) = iprot.readListBegin() + for _i687 in xrange(_size683): + _elem688 = iprot.readI64() + self.fileIds.append(_elem688) iprot.readListEnd() else: iprot.skip(ftype) @@ -15718,8 +15866,8 @@ class GetFileMetadataRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter682 in self.fileIds: - oprot.writeI64(iter682) + for iter689 in self.fileIds: + oprot.writeI64(iter689) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -15825,20 +15973,20 @@ class PutFileMetadataRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype686, _size683) = iprot.readListBegin() - for _i687 in xrange(_size683): - _elem688 = iprot.readI64() - self.fileIds.append(_elem688) + (_etype693, _size690) = iprot.readListBegin() + for _i694 in xrange(_size690): + _elem695 = iprot.readI64() + self.fileIds.append(_elem695) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 2: if ftype == TType.LIST: self.metadata = [] - (_etype692, _size689) = iprot.readListBegin() - for _i693 in xrange(_size689): - _elem694 = iprot.readString() - self.metadata.append(_elem694) + (_etype699, _size696) = iprot.readListBegin() + for _i700 in xrange(_size696): + _elem701 = iprot.readString() + self.metadata.append(_elem701) iprot.readListEnd() else: iprot.skip(ftype) @@ -15860,15 +16008,15 @@ class PutFileMetadataRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter695 in self.fileIds: - oprot.writeI64(iter695) + for iter702 in self.fileIds: + oprot.writeI64(iter702) oprot.writeListEnd() oprot.writeFieldEnd() if self.metadata is not None: oprot.writeFieldBegin('metadata', TType.LIST, 2) oprot.writeListBegin(TType.STRING, len(self.metadata)) - for iter696 in self.metadata: - oprot.writeString(iter696) + for iter703 in self.metadata: + oprot.writeString(iter703) oprot.writeListEnd() oprot.writeFieldEnd() if self.type is not None: @@ -15976,10 +16124,10 @@ class ClearFileMetadataRequest: if fid == 1: if ftype == TType.LIST: self.fileIds = [] - (_etype700, _size697) = iprot.readListBegin() - for _i701 in xrange(_size697): - _elem702 = iprot.readI64() - self.fileIds.append(_elem702) + (_etype707, _size704) = iprot.readListBegin() + for _i708 in xrange(_size704): + _elem709 = iprot.readI64() + self.fileIds.append(_elem709) iprot.readListEnd() else: iprot.skip(ftype) @@ -15996,8 +16144,8 @@ class ClearFileMetadataRequest: if self.fileIds is not None: oprot.writeFieldBegin('fileIds', TType.LIST, 1) oprot.writeListBegin(TType.I64, len(self.fileIds)) - for iter703 in self.fileIds: - oprot.writeI64(iter703) + for iter710 in self.fileIds: + oprot.writeI64(iter710) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -16226,11 +16374,11 @@ class GetAllFunctionsResponse: if fid == 1: if ftype == TType.LIST: self.functions = [] - (_etype707, _size704) = iprot.readListBegin() - for _i708 in xrange(_size704): - _elem709 = Function() - _elem709.read(iprot) - self.functions.append(_elem709) + (_etype714, _size711) = iprot.readListBegin() + for _i715 in xrange(_size711): + _elem716 = Function() + _elem716.read(iprot) + self.functions.append(_elem716) iprot.readListEnd() else: iprot.skip(ftype) @@ -16247,8 +16395,8 @@ class GetAllFunctionsResponse: if self.functions is not None: oprot.writeFieldBegin('functions', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.functions)) - for iter710 in self.functions: - iter710.write(oprot) + for iter717 in self.functions: + iter717.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -16300,10 +16448,10 @@ class ClientCapabilities: if fid == 1: if ftype == TType.LIST: self.values = [] - (_etype714, _size711) = iprot.readListBegin() - for _i715 in xrange(_size711): - _elem716 = iprot.readI32() - self.values.append(_elem716) + (_etype721, _size718) = iprot.readListBegin() + for _i722 in xrange(_size718): + _elem723 = iprot.readI32() + self.values.append(_elem723) iprot.readListEnd() else: iprot.skip(ftype) @@ -16320,8 +16468,8 @@ class ClientCapabilities: if self.values is not None: oprot.writeFieldBegin('values', TType.LIST, 1) oprot.writeListBegin(TType.I32, len(self.values)) - for iter717 in self.values: - oprot.writeI32(iter717) + for iter724 in self.values: + oprot.writeI32(iter724) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -16566,10 +16714,10 @@ class GetTablesRequest: elif fid == 2: if ftype == TType.LIST: self.tblNames = [] - (_etype721, _size718) = iprot.readListBegin() - for _i722 in xrange(_size718): - _elem723 = iprot.readString() - self.tblNames.append(_elem723) + (_etype728, _size725) = iprot.readListBegin() + for _i729 in xrange(_size725): + _elem730 = iprot.readString() + self.tblNames.append(_elem730) iprot.readListEnd() else: iprot.skip(ftype) @@ -16601,8 +16749,8 @@ class GetTablesRequest: if self.tblNames is not None: oprot.writeFieldBegin('tblNames', TType.LIST, 2) oprot.writeListBegin(TType.STRING, len(self.tblNames)) - for iter724 in self.tblNames: - oprot.writeString(iter724) + for iter731 in self.tblNames: + oprot.writeString(iter731) oprot.writeListEnd() oprot.writeFieldEnd() if self.capabilities is not None: @@ -16667,11 +16815,11 @@ class GetTablesResult: if fid == 1: if ftype == TType.LIST: self.tables = [] - (_etype728, _size725) = iprot.readListBegin() - for _i729 in xrange(_size725): - _elem730 = Table() - _elem730.read(iprot) - self.tables.append(_elem730) + (_etype735, _size732) = iprot.readListBegin() + for _i736 in xrange(_size732): + _elem737 = Table() + _elem737.read(iprot) + self.tables.append(_elem737) iprot.readListEnd() else: iprot.skip(ftype) @@ -16688,8 +16836,8 @@ class GetTablesResult: if self.tables is not None: oprot.writeFieldBegin('tables', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.tables)) - for iter731 in self.tables: - iter731.write(oprot) + for iter738 in self.tables: + iter738.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -17003,10 +17151,10 @@ class Materialization: if fid == 1: if ftype == TType.SET: self.tablesUsed = set() - (_etype735, _size732) = iprot.readSetBegin() - for _i736 in xrange(_size732): - _elem737 = iprot.readString() - self.tablesUsed.add(_elem737) + (_etype742, _size739) = iprot.readSetBegin() + for _i743 in xrange(_size739): + _elem744 = iprot.readString() + self.tablesUsed.add(_elem744) iprot.readSetEnd() else: iprot.skip(ftype) @@ -17038,8 +17186,8 @@ class Materialization: if self.tablesUsed is not None: oprot.writeFieldBegin('tablesUsed', TType.SET, 1) oprot.writeSetBegin(TType.STRING, len(self.tablesUsed)) - for iter738 in self.tablesUsed: - oprot.writeString(iter738) + for iter745 in self.tablesUsed: + oprot.writeString(iter745) oprot.writeSetEnd() oprot.writeFieldEnd() if self.validTxnList is not None: @@ -17944,44 +18092,44 @@ class WMFullResourcePlan: elif fid == 2: if ftype == TType.LIST: self.pools = [] - (_etype742, _size739) = iprot.readListBegin() - for _i743 in xrange(_size739): - _elem744 = WMPool() - _elem744.read(iprot) - self.pools.append(_elem744) + (_etype749, _size746) = iprot.readListBegin() + for _i750 in xrange(_size746): + _elem751 = WMPool() + _elem751.read(iprot) + self.pools.append(_elem751) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 3: if ftype == TType.LIST: self.mappings = [] - (_etype748, _size745) = iprot.readListBegin() - for _i749 in xrange(_size745): - _elem750 = WMMapping() - _elem750.read(iprot) - self.mappings.append(_elem750) + (_etype755, _size752) = iprot.readListBegin() + for _i756 in xrange(_size752): + _elem757 = WMMapping() + _elem757.read(iprot) + self.mappings.append(_elem757) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 4: if ftype == TType.LIST: self.triggers = [] - (_etype754, _size751) = iprot.readListBegin() - for _i755 in xrange(_size751): - _elem756 = WMTrigger() - _elem756.read(iprot) - self.triggers.append(_elem756) + (_etype761, _size758) = iprot.readListBegin() + for _i762 in xrange(_size758): + _elem763 = WMTrigger() + _elem763.read(iprot) + self.triggers.append(_elem763) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 5: if ftype == TType.LIST: self.poolTriggers = [] - (_etype760, _size757) = iprot.readListBegin() - for _i761 in xrange(_size757): - _elem762 = WMPoolTrigger() - _elem762.read(iprot) - self.poolTriggers.append(_elem762) + (_etype767, _size764) = iprot.readListBegin() + for _i768 in xrange(_size764): + _elem769 = WMPoolTrigger() + _elem769.read(iprot) + self.poolTriggers.append(_elem769) iprot.readListEnd() else: iprot.skip(ftype) @@ -18002,29 +18150,29 @@ class WMFullResourcePlan: if self.pools is not None: oprot.writeFieldBegin('pools', TType.LIST, 2) oprot.writeListBegin(TType.STRUCT, len(self.pools)) - for iter763 in self.pools: - iter763.write(oprot) + for iter770 in self.pools: + iter770.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.mappings is not None: oprot.writeFieldBegin('mappings', TType.LIST, 3) oprot.writeListBegin(TType.STRUCT, len(self.mappings)) - for iter764 in self.mappings: - iter764.write(oprot) + for iter771 in self.mappings: + iter771.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.triggers is not None: oprot.writeFieldBegin('triggers', TType.LIST, 4) oprot.writeListBegin(TType.STRUCT, len(self.triggers)) - for iter765 in self.triggers: - iter765.write(oprot) + for iter772 in self.triggers: + iter772.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.poolTriggers is not None: oprot.writeFieldBegin('poolTriggers', TType.LIST, 5) oprot.writeListBegin(TType.STRUCT, len(self.poolTriggers)) - for iter766 in self.poolTriggers: - iter766.write(oprot) + for iter773 in self.poolTriggers: + iter773.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -18498,11 +18646,11 @@ class WMGetAllResourcePlanResponse: if fid == 1: if ftype == TType.LIST: self.resourcePlans = [] - (_etype770, _size767) = iprot.readListBegin() - for _i771 in xrange(_size767): - _elem772 = WMResourcePlan() - _elem772.read(iprot) - self.resourcePlans.append(_elem772) + (_etype777, _size774) = iprot.readListBegin() + for _i778 in xrange(_size774): + _elem779 = WMResourcePlan() + _elem779.read(iprot) + self.resourcePlans.append(_elem779) iprot.readListEnd() else: iprot.skip(ftype) @@ -18519,8 +18667,8 @@ class WMGetAllResourcePlanResponse: if self.resourcePlans is not None: oprot.writeFieldBegin('resourcePlans', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.resourcePlans)) - for iter773 in self.resourcePlans: - iter773.write(oprot) + for iter780 in self.resourcePlans: + iter780.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -18824,20 +18972,20 @@ class WMValidateResourcePlanResponse: if fid == 1: if ftype == TType.LIST: self.errors = [] - (_etype777, _size774) = iprot.readListBegin() - for _i778 in xrange(_size774): - _elem779 = iprot.readString() - self.errors.append(_elem779) + (_etype784, _size781) = iprot.readListBegin() + for _i785 in xrange(_size781): + _elem786 = iprot.readString() + self.errors.append(_elem786) iprot.readListEnd() else: iprot.skip(ftype) elif fid == 2: if ftype == TType.LIST: self.warnings = [] - (_etype783, _size780) = iprot.readListBegin() - for _i784 in xrange(_size780): - _elem785 = iprot.readString() - self.warnings.append(_elem785) + (_etype790, _size787) = iprot.readListBegin() + for _i791 in xrange(_size787): + _elem792 = iprot.readString() + self.warnings.append(_elem792) iprot.readListEnd() else: iprot.skip(ftype) @@ -18854,15 +19002,15 @@ class WMValidateResourcePlanResponse: if self.errors is not None: oprot.writeFieldBegin('errors', TType.LIST, 1) oprot.writeListBegin(TType.STRING, len(self.errors)) - for iter786 in self.errors: - oprot.writeString(iter786) + for iter793 in self.errors: + oprot.writeString(iter793) oprot.writeListEnd() oprot.writeFieldEnd() if self.warnings is not None: oprot.writeFieldBegin('warnings', TType.LIST, 2) oprot.writeListBegin(TType.STRING, len(self.warnings)) - for iter787 in self.warnings: - oprot.writeString(iter787) + for iter794 in self.warnings: + oprot.writeString(iter794) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -19439,11 +19587,11 @@ class WMGetTriggersForResourePlanResponse: if fid == 1: if ftype == TType.LIST: self.triggers = [] - (_etype791, _size788) = iprot.readListBegin() - for _i792 in xrange(_size788): - _elem793 = WMTrigger() - _elem793.read(iprot) - self.triggers.append(_elem793) + (_etype798, _size795) = iprot.readListBegin() + for _i799 in xrange(_size795): + _elem800 = WMTrigger() + _elem800.read(iprot) + self.triggers.append(_elem800) iprot.readListEnd() else: iprot.skip(ftype) @@ -19460,8 +19608,8 @@ class WMGetTriggersForResourePlanResponse: if self.triggers is not None: oprot.writeFieldBegin('triggers', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.triggers)) - for iter794 in self.triggers: - iter794.write(oprot) + for iter801 in self.triggers: + iter801.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop() @@ -20645,11 +20793,11 @@ class SchemaVersion: elif fid == 4: if ftype == TType.LIST: self.cols = [] - (_etype798, _size795) = iprot.readListBegin() - for _i799 in xrange(_size795): - _elem800 = FieldSchema() - _elem800.read(iprot) - self.cols.append(_elem800) + (_etype805, _size802) = iprot.readListBegin() + for _i806 in xrange(_size802): + _elem807 = FieldSchema() + _elem807.read(iprot) + self.cols.append(_elem807) iprot.readListEnd() else: iprot.skip(ftype) @@ -20709,8 +20857,8 @@ class SchemaVersion: if self.cols is not None: oprot.writeFieldBegin('cols', TType.LIST, 4) oprot.writeListBegin(TType.STRUCT, len(self.cols)) - for iter801 in self.cols: - iter801.write(oprot) + for iter808 in self.cols: + iter808.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() if self.state is not None: @@ -20965,11 +21113,11 @@ class FindSchemasByColsResp: if fid == 1: if ftype == TType.LIST: self.schemaVersions = [] - (_etype805, _size802) = iprot.readListBegin() - for _i806 in xrange(_size802): - _elem807 = SchemaVersionDescriptor() - _elem807.read(iprot) - self.schemaVersions.append(_elem807) + (_etype812, _size809) = iprot.readListBegin() + for _i813 in xrange(_size809): + _elem814 = SchemaVersionDescriptor() + _elem814.read(iprot) + self.schemaVersions.append(_elem814) iprot.readListEnd() else: iprot.skip(ftype) @@ -20986,8 +21134,8 @@ class FindSchemasByColsResp: if self.schemaVersions is not None: oprot.writeFieldBegin('schemaVersions', TType.LIST, 1) oprot.writeListBegin(TType.STRUCT, len(self.schemaVersions)) - for iter808 in self.schemaVersions: - iter808.write(oprot) + for iter815 in self.schemaVersions: + iter815.write(oprot) oprot.writeListEnd() oprot.writeFieldEnd() oprot.writeFieldStop()
http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb index 2687ce5..21dc708 100644 --- a/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -2572,6 +2572,37 @@ class CommitTxnRequest ::Thrift::Struct.generate_accessors self end +class ReplTblWriteIdStateRequest + include ::Thrift::Struct, ::Thrift::Struct_Union + VALIDWRITEIDLIST = 1 + USER = 2 + HOSTNAME = 3 + DBNAME = 4 + TABLENAME = 5 + PARTNAMES = 6 + + FIELDS = { + VALIDWRITEIDLIST => {:type => ::Thrift::Types::STRING, :name => 'validWriteIdlist'}, + USER => {:type => ::Thrift::Types::STRING, :name => 'user'}, + HOSTNAME => {:type => ::Thrift::Types::STRING, :name => 'hostName'}, + DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbName'}, + TABLENAME => {:type => ::Thrift::Types::STRING, :name => 'tableName'}, + PARTNAMES => {:type => ::Thrift::Types::LIST, :name => 'partNames', :element => {:type => ::Thrift::Types::STRING}, :optional => true} + } + + def struct_fields; FIELDS; end + + def validate + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field validWriteIdlist is unset!') unless @validWriteIdlist + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field user is unset!') unless @user + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field hostName is unset!') unless @hostName + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field dbName is unset!') unless @dbName + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field tableName is unset!') unless @tableName + end + + ::Thrift::Struct.generate_accessors self +end + class GetValidWriteIdsRequest include ::Thrift::Struct, ::Thrift::Struct_Union FULLTABLENAMES = 1 http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb index 4de8bd3..7946b6c 100644 --- a/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb +++ b/standalone-metastore/src/gen/thrift/gen-rb/thrift_hive_metastore.rb @@ -2437,6 +2437,20 @@ module ThriftHiveMetastore return end + def repl_tbl_writeid_state(rqst) + send_repl_tbl_writeid_state(rqst) + recv_repl_tbl_writeid_state() + end + + def send_repl_tbl_writeid_state(rqst) + send_message('repl_tbl_writeid_state', Repl_tbl_writeid_state_args, :rqst => rqst) + end + + def recv_repl_tbl_writeid_state() + result = receive_message(Repl_tbl_writeid_state_result) + return + end + def get_valid_write_ids(rqst) send_get_valid_write_ids(rqst) return recv_get_valid_write_ids() @@ -5273,6 +5287,13 @@ module ThriftHiveMetastore write_result(result, oprot, 'commit_txn', seqid) end + def process_repl_tbl_writeid_state(seqid, iprot, oprot) + args = read_args(iprot, Repl_tbl_writeid_state_args) + result = Repl_tbl_writeid_state_result.new() + @handler.repl_tbl_writeid_state(args.rqst) + write_result(result, oprot, 'repl_tbl_writeid_state', seqid) + end + def process_get_valid_write_ids(seqid, iprot, oprot) args = read_args(iprot, Get_valid_write_ids_args) result = Get_valid_write_ids_result.new() @@ -11467,6 +11488,37 @@ module ThriftHiveMetastore ::Thrift::Struct.generate_accessors self end + class Repl_tbl_writeid_state_args + include ::Thrift::Struct, ::Thrift::Struct_Union + RQST = 1 + + FIELDS = { + RQST => {:type => ::Thrift::Types::STRUCT, :name => 'rqst', :class => ::ReplTblWriteIdStateRequest} + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + + class Repl_tbl_writeid_state_result + include ::Thrift::Struct, ::Thrift::Struct_Union + + FIELDS = { + + } + + def struct_fields; FIELDS; end + + def validate + end + + ::Thrift::Struct.generate_accessors self + end + class Get_valid_write_ids_args include ::Thrift::Struct, ::Thrift::Struct_Union RQST = 1 http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java index 397a081..92cd131 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java @@ -7022,6 +7022,11 @@ public class HiveMetaStore extends ThriftHiveMetastore { } @Override + public void repl_tbl_writeid_state(ReplTblWriteIdStateRequest rqst) throws TException { + getTxnHandler().replTableWriteIdState(rqst); + } + + @Override public GetValidWriteIdsResponse get_valid_write_ids(GetValidWriteIdsRequest rqst) throws TException { return getTxnHandler().getValidWriteIds(rqst); } http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java index 1138ed3..70080b1 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java @@ -2487,6 +2487,33 @@ public class HiveMetaStoreClient implements IMetaStoreClient, AutoCloseable { } @Override + public void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames) + throws TException { + String user; + try { + user = UserGroupInformation.getCurrentUser().getUserName(); + } catch (IOException e) { + LOG.error("Unable to resolve current user name " + e.getMessage()); + throw new RuntimeException(e); + } + + String hostName; + try { + hostName = InetAddress.getLocalHost().getHostName(); + } catch (UnknownHostException e) { + LOG.error("Unable to resolve my host name " + e.getMessage()); + throw new RuntimeException(e); + } + + ReplTblWriteIdStateRequest rqst + = new ReplTblWriteIdStateRequest(validWriteIdList, user, hostName, dbName, tableName); + if (partNames != null) { + rqst.setPartNames(partNames); + } + client.repl_tbl_writeid_state(rqst); + } + + @Override public long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException { return allocateTableWriteIdsBatch(Collections.singletonList(txnId), dbName, tableName).get(0).getWriteId(); } http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 72b814d..0f302a6 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -2820,14 +2820,14 @@ public interface IMetaStoreClient { /** * Rollback a transaction. This will also unlock any locks associated with * this transaction. - * @param txnid id of transaction to be rolled back. + * @param srcTxnid id of transaction at source while is rolled back and to be replicated. * @param replPolicy the replication policy to identify the source cluster * @throws NoSuchTxnException if the requested transaction does not exist. * Note that this can result from the transaction having timed out and been * deleted. * @throws TException */ - void replRollbackTxn(long txnid, String replPolicy) throws NoSuchTxnException, TException; + void replRollbackTxn(long srcTxnid, String replPolicy) throws NoSuchTxnException, TException; /** * Commit a transaction. This will also unlock any locks associated with @@ -2846,7 +2846,7 @@ public interface IMetaStoreClient { /** * Commit a transaction. This will also unlock any locks associated with * this transaction. - * @param txnid id of transaction to be committed. + * @param srcTxnid id of transaction at source which is committed and to be replicated. * @param replPolicy the replication policy to identify the source cluster * @throws NoSuchTxnException if the requested transaction does not exist. * This can result fro the transaction having timed out and been deleted by @@ -2855,7 +2855,7 @@ public interface IMetaStoreClient { * aborted. This can result from the transaction timing out. * @throws TException */ - void replCommitTxn(long txnid, String replPolicy) + void replCommitTxn(long srcTxnid, String replPolicy) throws NoSuchTxnException, TxnAbortedException, TException; /** @@ -2874,6 +2874,17 @@ public interface IMetaStoreClient { long allocateTableWriteId(long txnId, String dbName, String tableName) throws TException; /** + * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark. + * @param validWriteIdList Snapshot of writeid list when the table/partition is dumped. + * @param dbName Database name + * @param tableName Table which is written. + * @param partNames List of partitions being written. + * @throws TException in case of failure to replicate the writeid state + */ + void replTableWriteIdState(String validWriteIdList, String dbName, String tableName, List<String> partNames) + throws TException; + + /** * Allocate a per table write ID and associate it with the given transaction. * @param txnIds ids of transaction batchto which the allocated write ID to be associated. * @param dbName name of DB in which the table belongs. http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 7c1d5f5..abe1226 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -64,25 +64,24 @@ public class ReplChangeManager { } public static class FileInfo { - FileSystem srcFs; - Path sourcePath; - Path cmPath; - String checkSum; - boolean useSourcePath; - - public FileInfo(FileSystem srcFs, Path sourcePath) { - this.srcFs = srcFs; - this.sourcePath = sourcePath; - this.cmPath = null; - this.checkSum = null; - this.useSourcePath = true; + private FileSystem srcFs; + private Path sourcePath; + private Path cmPath; + private String checkSum; + private boolean useSourcePath; + private String subDir; + + public FileInfo(FileSystem srcFs, Path sourcePath, String subDir) { + this(srcFs, sourcePath, null, null, true, subDir); } - public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath, String checkSum, boolean useSourcePath) { + public FileInfo(FileSystem srcFs, Path sourcePath, Path cmPath, + String checkSum, boolean useSourcePath, String subDir) { this.srcFs = srcFs; this.sourcePath = sourcePath; this.cmPath = cmPath; this.checkSum = checkSum; this.useSourcePath = useSourcePath; + this.subDir = subDir; } public FileSystem getSrcFs() { return srcFs; @@ -102,6 +101,9 @@ public class ReplChangeManager { public void setIsUseSourcePath(boolean useSourcePath) { this.useSourcePath = useSourcePath; } + public String getSubDir() { + return subDir; + } public Path getEffectivePath() { if (useSourcePath) { return sourcePath; @@ -301,20 +303,21 @@ public class ReplChangeManager { * matches, return the file; otherwise, use chksumString to retrieve it from cmroot * @param src Original file location * @param checksumString Checksum of the original file + * @param subDir Sub directory to which the source file belongs to * @param conf * @return Corresponding FileInfo object */ - public static FileInfo getFileInfo(Path src, String checksumString, Configuration conf) + public static FileInfo getFileInfo(Path src, String checksumString, String subDir, Configuration conf) throws MetaException { try { FileSystem srcFs = src.getFileSystem(conf); if (checksumString == null) { - return new FileInfo(srcFs, src); + return new FileInfo(srcFs, src, subDir); } Path cmPath = getCMPath(conf, src.getName(), checksumString); if (!srcFs.exists(src)) { - return new FileInfo(srcFs, src, cmPath, checksumString, false); + return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); } String currentChecksumString; @@ -322,12 +325,12 @@ public class ReplChangeManager { currentChecksumString = checksumFor(src, srcFs); } catch (IOException ex) { // If the file is missing or getting modified, then refer CM path - return new FileInfo(srcFs, src, cmPath, checksumString, false); + return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); } if ((currentChecksumString == null) || checksumString.equals(currentChecksumString)) { - return new FileInfo(srcFs, src, cmPath, checksumString, true); + return new FileInfo(srcFs, src, cmPath, checksumString, true, subDir); } else { - return new FileInfo(srcFs, src, cmPath, checksumString, false); + return new FileInfo(srcFs, src, cmPath, checksumString, false, subDir); } } catch (IOException e) { throw new MetaException(StringUtils.stringifyException(e)); @@ -335,19 +338,24 @@ public class ReplChangeManager { } /*** - * Concatenate filename and checksum with "#" + * Concatenate filename, checksum and subdirectory with "#" * @param fileUriStr Filename string * @param fileChecksum Checksum string + * @param encodedSubDir sub directory path into which this file belongs to. Here encoded means, + * the multiple levels of subdirectories are concatenated with path separator "/" * @return Concatenated Uri string */ // TODO: this needs to be enhanced once change management based filesystem is implemented - // Currently using fileuri#checksum as the format - static public String encodeFileUri(String fileUriStr, String fileChecksum) { + // Currently using fileuri#checksum#subdirs as the format + public static String encodeFileUri(String fileUriStr, String fileChecksum, String encodedSubDir) { + String encodedUri = fileUriStr; if (fileChecksum != null) { - return fileUriStr + URI_FRAGMENT_SEPARATOR + fileChecksum; - } else { - return fileUriStr; + encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + fileChecksum; + } + if (encodedSubDir != null) { + encodedUri = encodedUri + URI_FRAGMENT_SEPARATOR + encodedSubDir; } + return encodedUri; } /*** @@ -357,11 +365,14 @@ public class ReplChangeManager { */ static public String[] getFileWithChksumFromURI(String fileURIStr) { String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR); - String[] result = new String[2]; + String[] result = new String[3]; result[0] = uriAndFragment[0]; if (uriAndFragment.length>1) { result[1] = uriAndFragment[1]; } + if (uriAndFragment.length>2) { + result[2] = uriAndFragment[2]; + } return result; } http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index db596a6..c513b4d 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -62,6 +62,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidReaderWriteIdList; import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.RetrySemantics; @@ -104,6 +105,7 @@ import org.apache.hadoop.hive.metastore.api.NoSuchTxnException; import org.apache.hadoop.hive.metastore.api.OpenTxnRequest; import org.apache.hadoop.hive.metastore.api.OpenTxnsResponse; import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.ReplTblWriteIdStateRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement; @@ -557,7 +559,6 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { try { Connection dbConn = null; Statement stmt = null; - ResultSet rs = null; try { lockInternal(); /** @@ -583,111 +584,124 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if (numTxns > maxTxns) numTxns = maxTxns; stmt = dbConn.createStatement(); + List<Long> txnIds = openTxns(dbConn, stmt, rqst); - if (rqst.isSetReplPolicy()) { - List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), stmt); - if (!targetTxnIdList.isEmpty()) { - if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) { - LOG.warn("target txn id number " + targetTxnIdList.toString() + - " is not matching with source txn id number " + rqst.getReplSrcTxnIds().toString()); - } - LOG.info("Target transactions " + targetTxnIdList.toString() + " are present for repl policy :" + - rqst.getReplPolicy() + " and Source transaction id : " + rqst.getReplSrcTxnIds().toString()); - return new OpenTxnsResponse(targetTxnIdList); - } - } - - String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID"); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new MetaException("Transaction database not properly " + - "configured, can't find next transaction id."); - } - long first = rs.getLong(1); - s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); - LOG.debug("Going to execute update <" + s + ">"); - stmt.executeUpdate(s); + LOG.debug("Going to commit"); + dbConn.commit(); + return new OpenTxnsResponse(txnIds); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); + throw new MetaException("Unable to select from transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(null, stmt, dbConn); + unlockInternal(); + } + } catch (RetryException e) { + return openTxns(rqst); + } + } - long now = getDbTime(dbConn); - List<Long> txnIds = new ArrayList<>(numTxns); + private List<Long> openTxns(Connection dbConn, Statement stmt, OpenTxnRequest rqst) + throws SQLException, MetaException { + int numTxns = rqst.getNum_txns(); + ResultSet rs = null; + try { + if (rqst.isSetReplPolicy()) { + List<Long> targetTxnIdList = getTargetTxnIdList(rqst.getReplPolicy(), rqst.getReplSrcTxnIds(), stmt); - List<String> rows = new ArrayList<>(); - for (long i = first; i < first + numTxns; i++) { - txnIds.add(i); - rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + "," + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname())); - } - List<String> queries = sqlGenerator.createInsertValuesStmt( - "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host)", rows); - for (String q : queries) { - LOG.debug("Going to execute update <" + q + ">"); - stmt.execute(q); + if (!targetTxnIdList.isEmpty()) { + if (targetTxnIdList.size() != rqst.getReplSrcTxnIds().size()) { + LOG.warn("target txn id number " + targetTxnIdList.toString() + + " is not matching with source txn id number " + rqst.getReplSrcTxnIds().toString()); + } + LOG.info("Target transactions " + targetTxnIdList.toString() + " are present for repl policy :" + + rqst.getReplPolicy() + " and Source transaction id : " + rqst.getReplSrcTxnIds().toString()); + return targetTxnIdList; } + } - // Need to register minimum open txnid for current transactions into MIN_HISTORY table. - s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_OPEN); - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); - if (!rs.next()) { - throw new IllegalStateException("Scalar query returned no rows?!?!!"); - } + String s = sqlGenerator.addForUpdateClause("select ntxn_next from NEXT_TXN_ID"); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new MetaException("Transaction database not properly " + + "configured, can't find next transaction id."); + } + long first = rs.getLong(1); + s = "update NEXT_TXN_ID set ntxn_next = " + (first + numTxns); + LOG.debug("Going to execute update <" + s + ">"); + stmt.executeUpdate(s); - // TXNS table should have atleast one entry because we just inserted the newly opened txns. - // So, min(txn_id) would be a non-zero txnid. - long minOpenTxnId = rs.getLong(1); - assert(minOpenTxnId > 0); - rows.clear(); - for (long txnId = first; txnId < first + numTxns; txnId++) { - rows.add(txnId + ", " + minOpenTxnId); - } + long now = getDbTime(dbConn); + List<Long> txnIds = new ArrayList<>(numTxns); - // Insert transaction entries into MIN_HISTORY_LEVEL. - List<String> inserts = sqlGenerator.createInsertValuesStmt( - "MIN_HISTORY_LEVEL (mhl_txnid, mhl_min_open_txnid)", rows); - for (String insert : inserts) { - LOG.debug("Going to execute insert <" + insert + ">"); - stmt.execute(insert); - } - LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds - + ") with min_open_txn: " + minOpenTxnId); + List<String> rows = new ArrayList<>(); + for (long i = first; i < first + numTxns; i++) { + txnIds.add(i); + rows.add(i + "," + quoteChar(TXN_OPEN) + "," + now + "," + now + "," + + quoteString(rqst.getUser()) + "," + quoteString(rqst.getHostname())); + } + List<String> queries = sqlGenerator.createInsertValuesStmt( + "TXNS (txn_id, txn_state, txn_started, txn_last_heartbeat, txn_user, txn_host)", rows); + for (String q : queries) { + LOG.debug("Going to execute update <" + q + ">"); + stmt.execute(q); + } - if (rqst.isSetReplPolicy()) { - List<String> rowsRepl = new ArrayList<>(); + // Need to register minimum open txnid for current transactions into MIN_HISTORY table. + s = "select min(txn_id) from TXNS where txn_state = " + quoteChar(TXN_OPEN); + LOG.debug("Going to execute query <" + s + ">"); + rs = stmt.executeQuery(s); + if (!rs.next()) { + throw new IllegalStateException("Scalar query returned no rows?!?!!"); + } - for (int i = 0; i < numTxns; i++) { - rowsRepl.add( - quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); - } + // TXNS table should have atleast one entry because we just inserted the newly opened txns. + // So, min(txn_id) would be a non-zero txnid. + long minOpenTxnId = rs.getLong(1); + assert (minOpenTxnId > 0); + rows.clear(); + for (long txnId = first; txnId < first + numTxns; txnId++) { + rows.add(txnId + ", " + minOpenTxnId); + } - List<String> queriesRepl = sqlGenerator.createInsertValuesStmt( - "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl); + // Insert transaction entries into MIN_HISTORY_LEVEL. + List<String> inserts = sqlGenerator.createInsertValuesStmt( + "MIN_HISTORY_LEVEL (mhl_txnid, mhl_min_open_txnid)", rows); + for (String insert : inserts) { + LOG.debug("Going to execute insert <" + insert + ">"); + stmt.execute(insert); + } + LOG.info("Added entries to MIN_HISTORY_LEVEL for current txns: (" + txnIds + + ") with min_open_txn: " + minOpenTxnId); - for (String query : queriesRepl) { - LOG.info("Going to execute insert <" + query + ">"); - stmt.execute(query); - } + if (rqst.isSetReplPolicy()) { + List<String> rowsRepl = new ArrayList<>(); + + for (int i = 0; i < numTxns; i++) { + rowsRepl.add( + quoteString(rqst.getReplPolicy()) + "," + rqst.getReplSrcTxnIds().get(i) + "," + txnIds.get(i)); } - if (transactionalListeners != null) { - MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, - EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator); + List<String> queriesRepl = sqlGenerator.createInsertValuesStmt( + "REPL_TXN_MAP (RTM_REPL_POLICY, RTM_SRC_TXN_ID, RTM_TARGET_TXN_ID)", rowsRepl); + + for (String query : queriesRepl) { + LOG.info("Going to execute insert <" + query + ">"); + stmt.execute(query); } + } - LOG.debug("Going to commit"); - dbConn.commit(); - return new OpenTxnsResponse(txnIds); - } catch (SQLException e) { - LOG.debug("Going to rollback"); - rollbackDBConn(dbConn); - checkRetryable(dbConn, e, "openTxns(" + rqst + ")"); - throw new MetaException("Unable to select from transaction database " - + StringUtils.stringifyException(e)); - } finally { - close(rs, stmt, dbConn); - unlockInternal(); + if (transactionalListeners != null) { + MetaStoreListenerNotifier.notifyEventWithDirectSql(transactionalListeners, + EventMessage.EventType.OPEN_TXN, new OpenTxnEvent(txnIds, null), dbConn, sqlGenerator); } - } catch (RetryException e) { - return openTxns(rqst); + return txnIds; + } finally { + close(rs); } } @@ -1097,6 +1111,136 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { } } + /** + * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark. + * @param rqst info on table/partitions and writeid snapshot to replicate. + * @throws MetaException + */ + @Override + @RetrySemantics.Idempotent("No-op if already replicated the writeid state") + public void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException { + String dbName = rqst.getDbName().toLowerCase(); + String tblName = rqst.getTableName().toLowerCase(); + ValidWriteIdList validWriteIdList = new ValidReaderWriteIdList(rqst.getValidWriteIdlist()); + + // Get the abortedWriteIds which are already sorted in ascending order. + List<Long> abortedWriteIds = getAbortedWriteIds(validWriteIdList); + int numAbortedWrites = abortedWriteIds.size(); + try { + Connection dbConn = null; + Statement stmt = null; + ResultSet rs = null; + TxnStore.MutexAPI.LockHandle handle = null; + try { + lockInternal(); + dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); + stmt = dbConn.createStatement(); + + // Clean the txn to writeid map/TXN_COMPONENTS for the given table as we bootstrap here + String sql = "delete from TXN_TO_WRITE_ID where t2w_database = " + quoteString(dbName) + + " and t2w_table = " + quoteString(tblName); + LOG.debug("Going to execute delete <" + sql + ">"); + stmt.executeUpdate(sql); + + if (numAbortedWrites > 0) { + // Allocate/Map one txn per aborted writeId and abort the txn to mark writeid as aborted. + List<Long> txnIds = openTxns(dbConn, stmt, + new OpenTxnRequest(numAbortedWrites, rqst.getUser(), rqst.getHostName())); + assert(numAbortedWrites == txnIds.size()); + + // Map each aborted write id with each allocated txn. + List<String> rows = new ArrayList<>(); + int i = 0; + for (long txn : txnIds) { + long writeId = abortedWriteIds.get(i++); + rows.add(txn + ", " + quoteString(dbName) + ", " + quoteString(tblName) + ", " + writeId); + LOG.info("Allocated writeID: " + writeId + " for txnId: " + txn); + } + + // Insert entries to TXN_TO_WRITE_ID for aborted write ids + List<String> inserts = sqlGenerator.createInsertValuesStmt( + "TXN_TO_WRITE_ID (t2w_txnid, t2w_database, t2w_table, t2w_writeid)", rows); + for (String insert : inserts) { + LOG.debug("Going to execute insert <" + insert + ">"); + stmt.execute(insert); + } + + // Abort all the allocated txns so that the mapped write ids are referred as aborted ones. + int numAborts = abortTxns(dbConn, txnIds, true); + assert(numAborts == numAbortedWrites); + } + handle = getMutexAPI().acquireLock(MUTEX_KEY.WriteIdAllocator.name()); + + // There are some txns in the list which has no write id allocated and hence go ahead and do it. + // Get the next write id for the given table and update it with new next write id. + // This is select for update query which takes a lock if the table entry is already there in NEXT_WRITE_ID + sql = sqlGenerator.addForUpdateClause( + "select nwi_next from NEXT_WRITE_ID where nwi_database = " + quoteString(dbName) + + " and nwi_table = " + quoteString(tblName)); + LOG.debug("Going to execute query <" + sql + ">"); + + long nextWriteId = validWriteIdList.getHighWatermark() + 1; + rs = stmt.executeQuery(sql); + if (!rs.next()) { + // First allocation of write id (hwm+1) should add the table to the next_write_id meta table. + sql = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" + + quoteString(dbName) + "," + quoteString(tblName) + "," + + Long.toString(nextWriteId) + ")"; + LOG.debug("Going to execute insert <" + sql + ">"); + stmt.execute(sql); + } else { + // Update the NEXT_WRITE_ID for the given table with hwm+1 from source + sql = "update NEXT_WRITE_ID set nwi_next = " + (nextWriteId) + + " where nwi_database = " + quoteString(dbName) + + " and nwi_table = " + quoteString(tblName); + LOG.debug("Going to execute update <" + sql + ">"); + stmt.executeUpdate(sql); + } + + LOG.debug("Going to commit"); + dbConn.commit(); + } catch (SQLException e) { + LOG.debug("Going to rollback"); + rollbackDBConn(dbConn); + checkRetryable(dbConn, e, "replTableWriteIdState(" + rqst + ")"); + throw new MetaException("Unable to update transaction database " + + StringUtils.stringifyException(e)); + } finally { + close(rs, stmt, dbConn); + if(handle != null) { + handle.releaseLocks(); + } + unlockInternal(); + } + } catch (RetryException e) { + replTableWriteIdState(rqst); + } + + // Schedule Major compaction on all the partitions/table to clean aborted data + if (numAbortedWrites > 0) { + CompactionRequest compactRqst = new CompactionRequest(rqst.getDbName(), rqst.getTableName(), + CompactionType.MAJOR); + if (rqst.isSetPartNames()) { + for (String partName : rqst.getPartNames()) { + compactRqst.setPartitionname(partName); + compact(compactRqst); + } + } else { + compact(compactRqst); + } + } + } + + private List<Long> getAbortedWriteIds(ValidWriteIdList validWriteIdList) { + List<Long> abortedWriteIds = new ArrayList<>(); + for (long writeId : validWriteIdList.getInvalidWriteIds()) { + if (validWriteIdList.isWriteIdAborted(writeId)) { + abortedWriteIds.add(writeId); + } + } + return abortedWriteIds; + } + @Override @RetrySemantics.ReadOnly public GetValidWriteIdsResponse getValidWriteIds(GetValidWriteIdsRequest rqst) @@ -1336,7 +1480,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { // The initial value for write id should be 1 and hence we add 1 with number of write ids allocated here writeId = 1; s = "insert into NEXT_WRITE_ID (nwi_database, nwi_table, nwi_next) values (" - + quoteString(dbName) + "," + quoteString(tblName) + "," + String.valueOf(numOfWriteIds + 1) + ")"; + + quoteString(dbName) + "," + quoteString(tblName) + "," + Long.toString(numOfWriteIds + 1) + ")"; LOG.debug("Going to execute insert <" + s + ">"); stmt.execute(s); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 6d8b845..b8e398f 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -21,7 +21,6 @@ import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.common.classification.RetrySemantics; import org.apache.hadoop.hive.metastore.api.*; @@ -117,13 +116,21 @@ public interface TxnStore extends Configurable { throws NoSuchTxnException, TxnAbortedException, MetaException; /** + * Replicate Table Write Ids state to mark aborted write ids and writeid high water mark. + * @param rqst info on table/partitions and writeid snapshot to replicate. + * @throws MetaException in case of failure + */ + @RetrySemantics.Idempotent + void replTableWriteIdState(ReplTblWriteIdStateRequest rqst) throws MetaException; + + /** * Get the first transaction corresponding to given database and table after transactions * referenced in the transaction snapshot. * @return * @throws MetaException */ @RetrySemantics.Idempotent - public BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit( + BasicTxnInfo getFirstCompletedTransactionForTableAfterCommit( String inputDbName, String inputTableName, ValidWriteIdList txnList) throws MetaException; /** http://git-wip-us.apache.org/repos/asf/hive/blob/4c0475ff/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java ---------------------------------------------------------------------- diff --git a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java index 1880d44..fa291d5 100644 --- a/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java +++ b/standalone-metastore/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java @@ -36,6 +36,7 @@ import org.apache.hadoop.hive.metastore.utils.JavaUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; import java.util.ArrayList; import java.util.Arrays; import java.util.BitSet; @@ -56,24 +57,46 @@ public class TxnUtils { * @return a valid txn list. */ public static ValidTxnList createValidReadTxnList(GetOpenTxnsResponse txns, long currentTxn) { - /*todo: should highWater be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0 + /* + * The highWaterMark should be min(currentTxn,txns.getTxn_high_water_mark()) assuming currentTxn>0 * otherwise if currentTxn=7 and 8 commits before 7, then 7 will see result of 8 which - * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should - * inlude the latest committed set.*/ - long highWater = txns.getTxn_high_water_mark(); - List<Long> open = txns.getOpen_txns(); - BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits()); - long[] exceptions = new long[open.size() - (currentTxn > 0 ? 1 : 0)]; + * doesn't make sense for Snapshot Isolation. Of course for Read Committed, the list should + * include the latest committed set. + */ + long highWaterMark = (currentTxn > 0) ? Math.min(currentTxn, txns.getTxn_high_water_mark()) + : txns.getTxn_high_water_mark(); + + // Open txns are already sorted in ascending order. This list may or may not include HWM + // but it is guaranteed that list won't have txn > HWM. But, if we overwrite the HWM with currentTxn + // then need to truncate the exceptions list accordingly. + List<Long> openTxns = txns.getOpen_txns(); + + // We care only about open/aborted txns below currentTxn and hence the size should be determined + // for the exceptions list. The currentTxn will be missing in openTxns list only in rare case like + // txn is aborted by AcidHouseKeeperService and compactor actually cleans up the aborted txns. + // So, for such cases, we get negative value for sizeToHwm with found position for currentTxn, and so, + // we just negate it to get the size. + int sizeToHwm = (currentTxn > 0) ? Collections.binarySearch(openTxns, currentTxn) : openTxns.size(); + sizeToHwm = (sizeToHwm < 0) ? (-sizeToHwm) : sizeToHwm; + long[] exceptions = new long[sizeToHwm]; + BitSet inAbortedBits = BitSet.valueOf(txns.getAbortedBits()); + BitSet outAbortedBits = new BitSet(); + long minOpenTxnId = Long.MAX_VALUE; int i = 0; - for (long txn : open) { - if (currentTxn > 0 && currentTxn == txn) continue; + for (long txn : openTxns) { + // For snapshot isolation, we don't care about txns greater than current txn and so stop here. + // Also, we need not include current txn to exceptions list. + if ((currentTxn > 0) && (txn >= currentTxn)) { + break; + } + if (inAbortedBits.get(i)) { + outAbortedBits.set(i); + } else if (minOpenTxnId == Long.MAX_VALUE) { + minOpenTxnId = txn; + } exceptions[i++] = txn; } - if (txns.isSetMin_open_txn()) { - return new ValidReadTxnList(exceptions, abortedBits, highWater, txns.getMin_open_txn()); - } else { - return new ValidReadTxnList(exceptions, abortedBits, highWater); - } + return new ValidReadTxnList(exceptions, outAbortedBits, highWaterMark, minOpenTxnId); } /**