This is an automated email from the ASF dual-hosted git repository. lpinter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 2080e47 HIVE-25916: Optimise updateCompactionMetricsData (#2989) (Laszlo Pinter, reviewed by Karen Coppage) 2080e47 is described below commit 2080e47a03673b46f102e383631d5c760eef31d4 Author: László Pintér <47777102+lcspin...@users.noreply.github.com> AuthorDate: Fri Feb 4 12:31:48 2022 +0100 HIVE-25916: Optimise updateCompactionMetricsData (#2989) (Laszlo Pinter, reviewed by Karen Coppage) --- .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 23 ++++ .../src/gen/thrift/gen-cpp/hive_metastore_types.h | 7 +- .../metastore/api/CompactionMetricsDataStruct.java | 96 ++++++++++++++++- .../metastore/CompactionMetricsDataStruct.php | 24 +++++ .../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 16 ++- .../src/gen/thrift/gen-rb/hive_metastore_types.rb | 5 +- .../hadoop/hive/metastore/IMetaStoreClient.java | 13 ++- .../src/main/thrift/hive_metastore.thrift | 1 + .../hive/metastore/metrics/AcidMetricService.java | 85 ++++----------- .../hive/metastore/txn/CompactionMetricsData.java | 12 +++ .../txn/CompactionMetricsDataConverter.java | 16 +-- .../hive/metastore/txn/CompactionTxnHandler.java | 118 +++++++++++++-------- .../apache/hadoop/hive/metastore/txn/TxnStore.java | 19 +++- 13 files changed, 309 insertions(+), 126 deletions(-) diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 36b94a9..bca4d2c 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -27377,6 +27377,10 @@ void CompactionMetricsDataStruct::__set_metricvalue(const int32_t val) { void CompactionMetricsDataStruct::__set_version(const int32_t val) { this->version = val; } + +void CompactionMetricsDataStruct::__set_threshold(const int32_t val) { + this->threshold = val; +} std::ostream& operator<<(std::ostream& out, const CompactionMetricsDataStruct& obj) { obj.printTo(out); @@ -27401,6 +27405,7 @@ uint32_t CompactionMetricsDataStruct::read(::apache::thrift::protocol::TProtocol bool isset_type = false; bool isset_metricvalue = false; bool isset_version = false; + bool isset_threshold = false; while (true) { @@ -27460,6 +27465,14 @@ uint32_t CompactionMetricsDataStruct::read(::apache::thrift::protocol::TProtocol xfer += iprot->skip(ftype); } break; + case 7: + if (ftype == ::apache::thrift::protocol::T_I32) { + xfer += iprot->readI32(this->threshold); + isset_threshold = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -27479,6 +27492,8 @@ uint32_t CompactionMetricsDataStruct::read(::apache::thrift::protocol::TProtocol throw TProtocolException(TProtocolException::INVALID_DATA); if (!isset_version) throw TProtocolException(TProtocolException::INVALID_DATA); + if (!isset_threshold) + throw TProtocolException(TProtocolException::INVALID_DATA); return xfer; } @@ -27512,6 +27527,10 @@ uint32_t CompactionMetricsDataStruct::write(::apache::thrift::protocol::TProtoco xfer += oprot->writeI32(this->version); xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldBegin("threshold", ::apache::thrift::protocol::T_I32, 7); + xfer += oprot->writeI32(this->threshold); + xfer += oprot->writeFieldEnd(); + xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -27525,6 +27544,7 @@ void swap(CompactionMetricsDataStruct &a, CompactionMetricsDataStruct &b) { swap(a.type, b.type); swap(a.metricvalue, b.metricvalue); swap(a.version, b.version); + swap(a.threshold, b.threshold); swap(a.__isset, b.__isset); } @@ -27535,6 +27555,7 @@ CompactionMetricsDataStruct::CompactionMetricsDataStruct(const CompactionMetrics type = other986.type; metricvalue = other986.metricvalue; version = other986.version; + threshold = other986.threshold; __isset = other986.__isset; } CompactionMetricsDataStruct& CompactionMetricsDataStruct::operator=(const CompactionMetricsDataStruct& other987) { @@ -27544,6 +27565,7 @@ CompactionMetricsDataStruct& CompactionMetricsDataStruct::operator=(const Compac type = other987.type; metricvalue = other987.metricvalue; version = other987.version; + threshold = other987.threshold; __isset = other987.__isset; return *this; } @@ -27556,6 +27578,7 @@ void CompactionMetricsDataStruct::printTo(std::ostream& out) const { out << ", " << "type=" << to_string(type); out << ", " << "metricvalue=" << to_string(metricvalue); out << ", " << "version=" << to_string(version); + out << ", " << "threshold=" << to_string(threshold); out << ")"; } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h index 5de3042..fb9f87f 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -10376,7 +10376,7 @@ class CompactionMetricsDataStruct : public virtual ::apache::thrift::TBase { CompactionMetricsDataStruct(const CompactionMetricsDataStruct&); CompactionMetricsDataStruct& operator=(const CompactionMetricsDataStruct&); - CompactionMetricsDataStruct() : dbname(), tblname(), partitionname(), type((CompactionMetricsMetricType::type)0), metricvalue(0), version(0) { + CompactionMetricsDataStruct() : dbname(), tblname(), partitionname(), type((CompactionMetricsMetricType::type)0), metricvalue(0), version(0), threshold(0) { } virtual ~CompactionMetricsDataStruct() noexcept; @@ -10390,6 +10390,7 @@ class CompactionMetricsDataStruct : public virtual ::apache::thrift::TBase { CompactionMetricsMetricType::type type; int32_t metricvalue; int32_t version; + int32_t threshold; _CompactionMetricsDataStruct__isset __isset; @@ -10405,6 +10406,8 @@ class CompactionMetricsDataStruct : public virtual ::apache::thrift::TBase { void __set_version(const int32_t val); + void __set_threshold(const int32_t val); + bool operator == (const CompactionMetricsDataStruct & rhs) const { if (!(dbname == rhs.dbname)) @@ -10421,6 +10424,8 @@ class CompactionMetricsDataStruct : public virtual ::apache::thrift::TBase { return false; if (!(version == rhs.version)) return false; + if (!(threshold == rhs.threshold)) + return false; return true; } bool operator != (const CompactionMetricsDataStruct &rhs) const { diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionMetricsDataStruct.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionMetricsDataStruct.java index 1b024e7..7a5c1bf 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionMetricsDataStruct.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionMetricsDataStruct.java @@ -17,6 +17,7 @@ package org.apache.hadoop.hive.metastore.api; private static final org.apache.thrift.protocol.TField TYPE_FIELD_DESC = new org.apache.thrift.protocol.TField("type", org.apache.thrift.protocol.TType.I32, (short)4); private static final org.apache.thrift.protocol.TField METRICVALUE_FIELD_DESC = new org.apache.thrift.protocol.TField("metricvalue", org.apache.thrift.protocol.TType.I32, (short)5); private static final org.apache.thrift.protocol.TField VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("version", org.apache.thrift.protocol.TType.I32, (short)6); + private static final org.apache.thrift.protocol.TField THRESHOLD_FIELD_DESC = new org.apache.thrift.protocol.TField("threshold", org.apache.thrift.protocol.TType.I32, (short)7); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CompactionMetricsDataStructStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CompactionMetricsDataStructTupleSchemeFactory(); @@ -27,6 +28,7 @@ package org.apache.hadoop.hive.metastore.api; private @org.apache.thrift.annotation.Nullable CompactionMetricsMetricType type; // required private int metricvalue; // required private int version; // required + private int threshold; // required /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -39,7 +41,8 @@ package org.apache.hadoop.hive.metastore.api; */ TYPE((short)4, "type"), METRICVALUE((short)5, "metricvalue"), - VERSION((short)6, "version"); + VERSION((short)6, "version"), + THRESHOLD((short)7, "threshold"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -67,6 +70,8 @@ package org.apache.hadoop.hive.metastore.api; return METRICVALUE; case 6: // VERSION return VERSION; + case 7: // THRESHOLD + return THRESHOLD; default: return null; } @@ -110,6 +115,7 @@ package org.apache.hadoop.hive.metastore.api; // isset id assignments private static final int __METRICVALUE_ISSET_ID = 0; private static final int __VERSION_ISSET_ID = 1; + private static final int __THRESHOLD_ISSET_ID = 2; private byte __isset_bitfield = 0; private static final _Fields optionals[] = {_Fields.PARTITIONNAME}; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; @@ -127,6 +133,8 @@ package org.apache.hadoop.hive.metastore.api; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); tmpMap.put(_Fields.VERSION, new org.apache.thrift.meta_data.FieldMetaData("version", org.apache.thrift.TFieldRequirementType.REQUIRED, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + tmpMap.put(_Fields.THRESHOLD, new org.apache.thrift.meta_data.FieldMetaData("threshold", org.apache.thrift.TFieldRequirementType.REQUIRED, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionMetricsDataStruct.class, metaDataMap); } @@ -139,7 +147,8 @@ package org.apache.hadoop.hive.metastore.api; java.lang.String tblname, CompactionMetricsMetricType type, int metricvalue, - int version) + int version, + int threshold) { this(); this.dbname = dbname; @@ -149,6 +158,8 @@ package org.apache.hadoop.hive.metastore.api; setMetricvalueIsSet(true); this.version = version; setVersionIsSet(true); + this.threshold = threshold; + setThresholdIsSet(true); } /** @@ -170,6 +181,7 @@ package org.apache.hadoop.hive.metastore.api; } this.metricvalue = other.metricvalue; this.version = other.version; + this.threshold = other.threshold; } public CompactionMetricsDataStruct deepCopy() { @@ -186,6 +198,8 @@ package org.apache.hadoop.hive.metastore.api; this.metricvalue = 0; setVersionIsSet(false); this.version = 0; + setThresholdIsSet(false); + this.threshold = 0; } @org.apache.thrift.annotation.Nullable @@ -336,6 +350,28 @@ package org.apache.hadoop.hive.metastore.api; __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __VERSION_ISSET_ID, value); } + public int getThreshold() { + return this.threshold; + } + + public void setThreshold(int threshold) { + this.threshold = threshold; + setThresholdIsSet(true); + } + + public void unsetThreshold() { + __isset_bitfield = org.apache.thrift.EncodingUtils.clearBit(__isset_bitfield, __THRESHOLD_ISSET_ID); + } + + /** Returns true if field threshold is set (has been assigned a value) and false otherwise */ + public boolean isSetThreshold() { + return org.apache.thrift.EncodingUtils.testBit(__isset_bitfield, __THRESHOLD_ISSET_ID); + } + + public void setThresholdIsSet(boolean value) { + __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __THRESHOLD_ISSET_ID, value); + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case DBNAME: @@ -386,6 +422,14 @@ package org.apache.hadoop.hive.metastore.api; } break; + case THRESHOLD: + if (value == null) { + unsetThreshold(); + } else { + setThreshold((java.lang.Integer)value); + } + break; + } } @@ -410,6 +454,9 @@ package org.apache.hadoop.hive.metastore.api; case VERSION: return getVersion(); + case THRESHOLD: + return getThreshold(); + } throw new java.lang.IllegalStateException(); } @@ -433,6 +480,8 @@ package org.apache.hadoop.hive.metastore.api; return isSetMetricvalue(); case VERSION: return isSetVersion(); + case THRESHOLD: + return isSetThreshold(); } throw new java.lang.IllegalStateException(); } @@ -504,6 +553,15 @@ package org.apache.hadoop.hive.metastore.api; return false; } + boolean this_present_threshold = true; + boolean that_present_threshold = true; + if (this_present_threshold || that_present_threshold) { + if (!(this_present_threshold && that_present_threshold)) + return false; + if (this.threshold != that.threshold) + return false; + } + return true; } @@ -531,6 +589,8 @@ package org.apache.hadoop.hive.metastore.api; hashCode = hashCode * 8191 + version; + hashCode = hashCode * 8191 + threshold; + return hashCode; } @@ -602,6 +662,16 @@ package org.apache.hadoop.hive.metastore.api; return lastComparison; } } + lastComparison = java.lang.Boolean.compare(isSetThreshold(), other.isSetThreshold()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetThreshold()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.threshold, other.threshold); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -664,6 +734,10 @@ package org.apache.hadoop.hive.metastore.api; sb.append("version:"); sb.append(this.version); first = false; + if (!first) sb.append(", "); + sb.append("threshold:"); + sb.append(this.threshold); + first = false; sb.append(")"); return sb.toString(); } @@ -690,6 +764,10 @@ package org.apache.hadoop.hive.metastore.api; throw new org.apache.thrift.protocol.TProtocolException("Required field 'version' is unset! Struct:" + toString()); } + if (!isSetThreshold()) { + throw new org.apache.thrift.protocol.TProtocolException("Required field 'threshold' is unset! Struct:" + toString()); + } + // check for sub-struct validity } @@ -777,6 +855,14 @@ package org.apache.hadoop.hive.metastore.api; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 7: // THRESHOLD + if (schemeField.type == org.apache.thrift.protocol.TType.I32) { + struct.threshold = iprot.readI32(); + struct.setThresholdIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -818,6 +904,9 @@ package org.apache.hadoop.hive.metastore.api; oprot.writeFieldBegin(VERSION_FIELD_DESC); oprot.writeI32(struct.version); oprot.writeFieldEnd(); + oprot.writeFieldBegin(THRESHOLD_FIELD_DESC); + oprot.writeI32(struct.threshold); + oprot.writeFieldEnd(); oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -840,6 +929,7 @@ package org.apache.hadoop.hive.metastore.api; oprot.writeI32(struct.type.getValue()); oprot.writeI32(struct.metricvalue); oprot.writeI32(struct.version); + oprot.writeI32(struct.threshold); java.util.BitSet optionals = new java.util.BitSet(); if (struct.isSetPartitionname()) { optionals.set(0); @@ -863,6 +953,8 @@ package org.apache.hadoop.hive.metastore.api; struct.setMetricvalueIsSet(true); struct.version = iprot.readI32(); struct.setVersionIsSet(true); + struct.threshold = iprot.readI32(); + struct.setThresholdIsSet(true); java.util.BitSet incoming = iprot.readBitSet(1); if (incoming.get(0)) { struct.partitionname = iprot.readString(); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionMetricsDataStruct.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionMetricsDataStruct.php index 9eab694..2c05960 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionMetricsDataStruct.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionMetricsDataStruct.php @@ -52,6 +52,11 @@ class CompactionMetricsDataStruct 'isRequired' => true, 'type' => TType::I32, ), + 7 => array( + 'var' => 'threshold', + 'isRequired' => true, + 'type' => TType::I32, + ), ); /** @@ -78,6 +83,10 @@ class CompactionMetricsDataStruct * @var int */ public $version = null; + /** + * @var int + */ + public $threshold = null; public function __construct($vals = null) { @@ -100,6 +109,9 @@ class CompactionMetricsDataStruct if (isset($vals['version'])) { $this->version = $vals['version']; } + if (isset($vals['threshold'])) { + $this->threshold = $vals['threshold']; + } } } @@ -164,6 +176,13 @@ class CompactionMetricsDataStruct $xfer += $input->skip($ftype); } break; + case 7: + if ($ftype == TType::I32) { + $xfer += $input->readI32($this->threshold); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -208,6 +227,11 @@ class CompactionMetricsDataStruct $xfer += $output->writeI32($this->version); $xfer += $output->writeFieldEnd(); } + if ($this->threshold !== null) { + $xfer += $output->writeFieldBegin('threshold', TType::I32, 7); + $xfer += $output->writeI32($this->threshold); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 29d9403..f10a30f 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -15665,17 +15665,19 @@ class CompactionMetricsDataStruct(object): - type - metricvalue - version + - threshold """ - def __init__(self, dbname=None, tblname=None, partitionname=None, type=None, metricvalue=None, version=None,): + def __init__(self, dbname=None, tblname=None, partitionname=None, type=None, metricvalue=None, version=None, threshold=None,): self.dbname = dbname self.tblname = tblname self.partitionname = partitionname self.type = type self.metricvalue = metricvalue self.version = version + self.threshold = threshold def read(self, iprot): if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: @@ -15716,6 +15718,11 @@ class CompactionMetricsDataStruct(object): self.version = iprot.readI32() else: iprot.skip(ftype) + elif fid == 7: + if ftype == TType.I32: + self.threshold = iprot.readI32() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -15750,6 +15757,10 @@ class CompactionMetricsDataStruct(object): oprot.writeFieldBegin('version', TType.I32, 6) oprot.writeI32(self.version) oprot.writeFieldEnd() + if self.threshold is not None: + oprot.writeFieldBegin('threshold', TType.I32, 7) + oprot.writeI32(self.threshold) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -15764,6 +15775,8 @@ class CompactionMetricsDataStruct(object): raise TProtocolException(message='Required field metricvalue is unset!') if self.version is None: raise TProtocolException(message='Required field version is unset!') + if self.threshold is None: + raise TProtocolException(message='Required field threshold is unset!') return def __repr__(self): @@ -30754,6 +30767,7 @@ CompactionMetricsDataStruct.thrift_spec = ( (4, TType.I32, 'type', None, None, ), # 4 (5, TType.I32, 'metricvalue', None, None, ), # 5 (6, TType.I32, 'version', None, None, ), # 6 + (7, TType.I32, 'threshold', None, None, ), # 7 ) all_structs.append(CompactionMetricsDataResponse) CompactionMetricsDataResponse.thrift_spec = ( diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 7c41d36..dfd1afa 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -4547,6 +4547,7 @@ class CompactionMetricsDataStruct TYPE = 4 METRICVALUE = 5 VERSION = 6 + THRESHOLD = 7 FIELDS = { DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'}, @@ -4554,7 +4555,8 @@ class CompactionMetricsDataStruct PARTITIONNAME => {:type => ::Thrift::Types::STRING, :name => 'partitionname', :optional => true}, TYPE => {:type => ::Thrift::Types::I32, :name => 'type', :enum_class => ::CompactionMetricsMetricType}, METRICVALUE => {:type => ::Thrift::Types::I32, :name => 'metricvalue'}, - VERSION => {:type => ::Thrift::Types::I32, :name => 'version'} + VERSION => {:type => ::Thrift::Types::I32, :name => 'version'}, + THRESHOLD => {:type => ::Thrift::Types::I32, :name => 'threshold'} } def struct_fields; FIELDS; end @@ -4565,6 +4567,7 @@ class CompactionMetricsDataStruct raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field type is unset!') unless @type raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field metricvalue is unset!') unless @metricvalue raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field version is unset!') unless @version + raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field threshold is unset!') unless @threshold unless @type.nil? || ::CompactionMetricsMetricType::VALID_VALUES.include?(@type) raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Invalid value of field type!') end diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java index 2749f15..f46b276 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/IMetaStoreClient.java @@ -4309,8 +4309,17 @@ public interface IMetaStoreClient { void markFailed(CompactionInfoStruct cr) throws MetaException, TException; /** - * Update or create one record in the compaction metrics cache. This operation uses an optimistic locking mechanism. - * If update fails, due to version mismatch, the operation won't be retried. + * Create, update or delete one record in the compaction metrics cache. + * <p> + * If the metric is not found in the metrics cache, it will be created. + * </p> + * <p> + * If the metric is found, it will be updated. This operation uses an optimistic locking mechanism, meaning if another + * operation changed the value of this metric, the update will abort and won't be retried. + * </p> + * <p> + * If the new metric value is below {@link CompactionMetricsDataStruct#getThreshold()}, it will be deleted. + * </p> * @param struct the object that is used for the update, always non-null * @return true, if update finished successfully * @throws MetaException diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index d081246..3c33c3f 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1316,6 +1316,7 @@ struct CompactionMetricsDataStruct { 4: required CompactionMetricsMetricType type 5: required i32 metricvalue 6: required i32 version + 7: required i32 threshold } struct CompactionMetricsDataResponse { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java index 7782614..898efb2 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/metrics/AcidMetricService.java @@ -46,7 +46,6 @@ import org.slf4j.LoggerFactory; import javax.management.MBeanServer; import javax.management.ObjectName; import java.lang.management.ManagementFactory; -import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -102,7 +101,6 @@ public class AcidMetricService implements MetastoreTaskThread { private static boolean metricsEnabled; - private final List<ObjectName> registeredObjects = new ArrayList<>(); private MetricsMBeanImpl deltaObject, smallDeltaObject, obsoleteDeltaObject; private Configuration conf; @@ -169,25 +167,11 @@ public class AcidMetricService implements MetastoreTaskThread { int numObsoleteDeltas = filterOutBaseAndOriginalFiles(obsoleteDeltaPaths).size(); - if (numDeltas >= deltasThreshold) { - updateDeltaMetrics(dbName, tableName, partitionName, NUM_DELTAS, numDeltas, txnHandler); - } else { - removeDeltaMetrics(dbName, tableName, partitionName, NUM_DELTAS, txnHandler); - } - - if (numSmallDeltas >= deltasThreshold) { - updateDeltaMetrics(dbName, tableName, partitionName, NUM_SMALL_DELTAS, numSmallDeltas, txnHandler); - } else { - removeDeltaMetrics(dbName, tableName, partitionName, NUM_SMALL_DELTAS, txnHandler); - } - - if (numObsoleteDeltas >= obsoleteDeltasThreshold) { - updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS, - numObsoleteDeltas, txnHandler); - } else { - removeDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS, - txnHandler); - } + updateDeltaMetrics(dbName, tableName, partitionName, NUM_DELTAS, numDeltas, deltasThreshold, txnHandler); + updateDeltaMetrics(dbName, tableName, partitionName, NUM_SMALL_DELTAS, numSmallDeltas, deltasThreshold, + txnHandler); + updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS, + numObsoleteDeltas, obsoleteDeltasThreshold, txnHandler); LOG.debug("Finished updating delta file metrics from initiator.\n deltaPctThreshold = {}, deltasThreshold = {}, " + "obsoleteDeltasThreshold = {}, numDeltas = {}, numSmallDeltas = {}, numObsoleteDeltas = {}", @@ -216,13 +200,8 @@ public class AcidMetricService implements MetastoreTaskThread { // we have an instance of the AcidDirectory before the compaction worker was started // from this we can get how many delta directories existed // the previously active delta directories are now moved to obsolete - int numObsoleteDeltas = preWorkerActiveDeltaCount; - if (numObsoleteDeltas >= obsoleteDeltasThreshold) { - updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS, - numObsoleteDeltas, client); - } else { - removeDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS, client); - } + updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_OBSOLETE_DELTAS, + preWorkerActiveDeltaCount, obsoleteDeltasThreshold, client); // We don't know the size of the newly create delta directories, that would require a fresh AcidDirectory // Clear the small delta num counter from the cache for this key @@ -247,17 +226,13 @@ public class AcidMetricService implements MetastoreTaskThread { } // recalculate the delta count - if (numNewDeltas >= deltasThreshold) { - updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS, numNewDeltas, - client); - } else { - removeDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS, client); - } + updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsMetricType.NUM_DELTAS, numNewDeltas, + deltasThreshold, client); } LOG.debug("Finished updating delta file metrics from worker.\n deltasThreshold = {}, " + "obsoleteDeltasThreshold = {}, numObsoleteDeltas = {}", deltasThreshold, obsoleteDeltasThreshold, - numObsoleteDeltas); + preWorkerActiveDeltaCount); } catch (Throwable t) { LOG.warn("Unknown throwable caught while updating delta metrics. Metrics will not be updated.", t); @@ -281,12 +256,8 @@ public class AcidMetricService implements MetastoreTaskThread { int numObsoleteDeltas = 0; if (prevObsoleteDelta != null) { numObsoleteDeltas = prevObsoleteDelta.getMetricValue() - filterOutBaseAndOriginalFiles(deletedFiles).size(); - if (numObsoleteDeltas >= obsoleteDeltasThreshold) { - updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS, - numObsoleteDeltas, txnHandler); - } else { - removeDeltaMetrics(dbName, tableName, partitionName, NUM_OBSOLETE_DELTAS, txnHandler); - } + updateDeltaMetrics(dbName, tableName, partitionName, CompactionMetricsData.MetricType.NUM_OBSOLETE_DELTAS, + numObsoleteDeltas, obsoleteDeltasThreshold, txnHandler); } LOG.debug("Finished updating delta file metrics from cleaner.\n obsoleteDeltasThreshold = {}, " @@ -519,22 +490,14 @@ public class AcidMetricService implements MetastoreTaskThread { MBeanServer mbs = ManagementFactory.getPlatformMBeanServer(); obsoleteDeltaObject = new MetricsMBeanImpl(); - registeredObjects.add( - mbs.registerMBean(obsoleteDeltaObject, - new ObjectName(OBJECT_NAME_PREFIX + COMPACTION_NUM_OBSOLETE_DELTAS)) - .getObjectName()); - + mbs.registerMBean(obsoleteDeltaObject, + new ObjectName(OBJECT_NAME_PREFIX + COMPACTION_NUM_OBSOLETE_DELTAS)); deltaObject = new MetricsMBeanImpl(); - registeredObjects.add( - mbs.registerMBean(deltaObject, - new ObjectName(OBJECT_NAME_PREFIX + COMPACTION_NUM_DELTAS)) - .getObjectName()); - + mbs.registerMBean(deltaObject, + new ObjectName(OBJECT_NAME_PREFIX + COMPACTION_NUM_DELTAS)); smallDeltaObject = new MetricsMBeanImpl(); - registeredObjects.add( - mbs.registerMBean(smallDeltaObject, - new ObjectName(OBJECT_NAME_PREFIX + COMPACTION_NUM_SMALL_DELTAS)) - .getObjectName()); + mbs.registerMBean(smallDeltaObject, + new ObjectName(OBJECT_NAME_PREFIX + COMPACTION_NUM_SMALL_DELTAS)); } static String getDeltaCountKey(String dbName, String tableName, String partitionName) { @@ -562,22 +525,17 @@ public class AcidMetricService implements MetastoreTaskThread { } private static void updateDeltaMetrics(String dbName, String tblName, String partitionName, - CompactionMetricsData.MetricType type, int numDeltas, TxnStore txnHandler) throws MetaException { + CompactionMetricsData.MetricType type, int numDeltas, int deltasThreshold, TxnStore txnHandler) throws MetaException { CompactionMetricsData data = new CompactionMetricsData.Builder() .dbName(dbName).tblName(tblName).partitionName(partitionName).metricType(type).metricValue(numDeltas).version(0) - .build(); + .threshold(deltasThreshold).build(); if (!txnHandler.updateCompactionMetricsData(data)) { LOG.warn("Compaction metric data cannot be updated because of version mismatch."); } } - private static void removeDeltaMetrics(String dbName, String tblName, String partitionName, - CompactionMetricsData.MetricType type, TxnStore txnHandler) throws MetaException { - txnHandler.removeCompactionMetricsData(dbName, tblName, partitionName, type); - } - private static void updateDeltaMetrics(String dbName, String tblName, String partitionName, - CompactionMetricsMetricType type, int numDeltas, IMetaStoreClient client) throws TException { + CompactionMetricsMetricType type, int numDeltas, int deltasThreshold, IMetaStoreClient client) throws TException { CompactionMetricsDataStruct struct = new CompactionMetricsDataStruct(); struct.setDbname(dbName); struct.setTblname(tblName); @@ -585,6 +543,7 @@ public class AcidMetricService implements MetastoreTaskThread { struct.setType(type); struct.setMetricvalue(numDeltas); struct.setVersion(0); + struct.setThreshold(deltasThreshold); if (!client.updateCompactionMetricsData(struct)) { LOG.warn("Compaction metric data cannot be updated because of version mismatch."); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionMetricsData.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionMetricsData.java index fae8472..b560095 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionMetricsData.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionMetricsData.java @@ -25,6 +25,7 @@ public class CompactionMetricsData { private final MetricType metricType; private final int metricValue; private final int version; + private final int threshold; public enum MetricType { NUM_OBSOLETE_DELTAS("HIVE_ACID_NUM_OBSOLETE_DELTAS"), @@ -50,6 +51,7 @@ public class CompactionMetricsData { this.metricType = builder.metricType; this.metricValue = builder.metricValue; this.version = builder.version; + this.threshold = builder.threshold; } public String getDbName() { @@ -76,6 +78,10 @@ public class CompactionMetricsData { return version; } + public int getThreshold() { + return threshold; + } + public boolean isEmpty() { return dbName == null && tblName == null && partitionName == null && metricType == null && metricValue == 0 && version == 0; @@ -95,6 +101,7 @@ public class CompactionMetricsData { private MetricType metricType; private int metricValue; private int version; + private int threshold; public CompactionMetricsData build() { return new CompactionMetricsData(this); @@ -129,5 +136,10 @@ public class CompactionMetricsData { this.version = version; return this; } + + public Builder threshold(int threshold) { + this.threshold = threshold; + return this; + } } } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionMetricsDataConverter.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionMetricsDataConverter.java index 748829b..e579311 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionMetricsDataConverter.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionMetricsDataConverter.java @@ -26,14 +26,13 @@ public class CompactionMetricsDataConverter { public static CompactionMetricsDataStruct dataToStruct(CompactionMetricsData data) throws MetaException { CompactionMetricsDataStruct struct = new CompactionMetricsDataStruct(); - if (!data.isEmpty()) { - struct.setDbname(data.getDbName()); - struct.setTblname(data.getTblName()); - struct.setPartitionname(data.getPartitionName()); - struct.setType(dbCompactionMetricType2ThriftType(data.getMetricType())); - struct.setMetricvalue(data.getMetricValue()); - struct.setVersion(data.getVersion()); - } + struct.setDbname(data.getDbName()); + struct.setTblname(data.getTblName()); + struct.setPartitionname(data.getPartitionName()); + struct.setType(dbCompactionMetricType2ThriftType(data.getMetricType())); + struct.setMetricvalue(data.getMetricValue()); + struct.setVersion(data.getVersion()); + struct.setThreshold(data.getThreshold()); return struct; } @@ -45,6 +44,7 @@ public class CompactionMetricsDataConverter { .metricType(thriftCompactionMetricType2DbType(struct.getType())) .metricValue(struct.getMetricvalue()) .version(struct.getVersion()) + .threshold(struct.getThreshold()) .build(); } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 2ade421..64679a0 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -1585,37 +1585,20 @@ class CompactionTxnHandler extends TxnHandler { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); boolean updateRes; CompactionMetricsData prevMetricsData = getCompactionMetricsData(data, dbConn); - if (prevMetricsData != null) { - String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY; - if (data.getPartitionName() != null) { - query += " AND \"CMC_PARTITION\" = ?"; + if (data.getMetricValue() >= data.getThreshold()) { + if (prevMetricsData != null) { + updateRes = updateCompactionMetricsData(dbConn, data, prevMetricsData); } else { - query += " AND \"CMC_PARTITION\" IS NULL"; - } - try (PreparedStatement pstmt = dbConn.prepareStatement(query)) { - pstmt.setInt(1, data.getMetricValue()); - pstmt.setInt(2, prevMetricsData.getVersion() + 1); - pstmt.setString(3, data.getDbName()); - pstmt.setString(4, data.getTblName()); - pstmt.setString(5, data.getMetricType().toString()); - pstmt.setInt(6, prevMetricsData.getVersion()); - if (data.getPartitionName() != null) { - pstmt.setString(7, data.getPartitionName()); - } - updateRes = pstmt.executeUpdate() > 0; + updateRes = createCompactionMetricsData(dbConn, data); } } else { - try (PreparedStatement pstmt = dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) { - pstmt.setString(1, data.getDbName()); - pstmt.setString(2, data.getTblName()); - pstmt.setString(3, data.getPartitionName()); - pstmt.setString(4, data.getMetricType().toString()); - pstmt.setInt(5, data.getMetricValue()); - pstmt.setInt(6, 1); - updateRes = pstmt.executeUpdate() > 0; + if (prevMetricsData != null) { + updateRes = + removeCompactionMetricsData(dbConn, data.getDbName(), data.getTblName(), data.getPartitionName(), data.getMetricType()); + } else { + return true; } } - dbConn.commit(); return updateRes; } catch (SQLException e) { rollbackDBConn(dbConn); @@ -1627,7 +1610,7 @@ class CompactionTxnHandler extends TxnHandler { } catch (RetryException e) { updateCompactionMetricsData(data); } - return true; + return false; } @Override @@ -1719,28 +1702,14 @@ class CompactionTxnHandler extends TxnHandler { } } + @Override public void removeCompactionMetricsData(String dbName, String tblName, String partitionName, CompactionMetricsData.MetricType type) throws MetaException { Connection dbConn = null; try { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); - String query = DELETE_COMPACTION_METRICS_CACHE_QUERY; - if (partitionName != null) { - query += " AND \"CMC_PARTITION\" = ?"; - } else { - query += " AND \"CMC_PARTITION\" IS NULL"; - } - try (PreparedStatement pstmt = dbConn.prepareStatement(query)) { - pstmt.setString(1, dbName); - pstmt.setString(2, tblName); - pstmt.setString(3, type.toString()); - if (partitionName != null) { - pstmt.setString(4, partitionName); - } - pstmt.executeUpdate(); - dbConn.commit(); - } + removeCompactionMetricsData(dbConn, dbName, tblName, partitionName, type); } catch (SQLException e) { rollbackDBConn(dbConn); checkRetryable(e, "removeCompactionMetricsData(" + dbName + ", " + tblName + ", " + partitionName + ", " + @@ -1753,6 +1722,69 @@ class CompactionTxnHandler extends TxnHandler { removeCompactionMetricsData(dbName, tblName, partitionName, type); } } + + private boolean removeCompactionMetricsData(Connection dbConn, String dbName, String tblName, String partitionName, + CompactionMetricsData.MetricType type) throws SQLException { + boolean removeRes; + String query = DELETE_COMPACTION_METRICS_CACHE_QUERY; + if (partitionName != null) { + query += " AND \"CMC_PARTITION\" = ?"; + } else { + query += " AND \"CMC_PARTITION\" IS NULL"; + } + try (PreparedStatement pstmt = dbConn.prepareStatement(query)) { + pstmt.setString(1, dbName); + pstmt.setString(2, tblName); + pstmt.setString(3, type.toString()); + if (partitionName != null) { + pstmt.setString(4, partitionName); + } + removeRes = pstmt.executeUpdate() > 0; + dbConn.commit(); + } + return removeRes; + } + + private boolean updateCompactionMetricsData(Connection dbConn, CompactionMetricsData data, + CompactionMetricsData prevData) throws SQLException { + boolean updateRes; + String query = UPDATE_COMPACTION_METRICS_CACHE_QUERY; + if (data.getPartitionName() != null) { + query += " AND \"CMC_PARTITION\" = ?"; + } else { + query += " AND \"CMC_PARTITION\" IS NULL"; + } + try (PreparedStatement pstmt = dbConn.prepareStatement(query)) { + pstmt.setInt(1, data.getMetricValue()); + pstmt.setInt(2, prevData.getVersion() + 1); + pstmt.setString(3, data.getDbName()); + pstmt.setString(4, data.getTblName()); + pstmt.setString(5, data.getMetricType().toString()); + pstmt.setInt(6, prevData.getVersion()); + if (data.getPartitionName() != null) { + pstmt.setString(7, data.getPartitionName()); + } + updateRes = pstmt.executeUpdate() > 0; + dbConn.commit(); + } + return updateRes; + } + + private boolean createCompactionMetricsData(Connection dbConn, CompactionMetricsData data) throws SQLException { + boolean createRes; + try (PreparedStatement pstmt = dbConn.prepareStatement(INSERT_COMPACTION_METRICS_CACHE_QUERY)) { + pstmt.setString(1, data.getDbName()); + pstmt.setString(2, data.getTblName()); + pstmt.setString(3, data.getPartitionName()); + pstmt.setString(4, data.getMetricType().toString()); + pstmt.setInt(5, data.getMetricValue()); + pstmt.setInt(6, 1); + createRes = pstmt.executeUpdate() > 0; + dbConn.commit(); + } + return createRes; + } + } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java index 86c50db..c59ff40 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java @@ -702,12 +702,12 @@ public interface TxnStore extends Configurable { /** * Returns ACID metrics related info for a specific resource and metric type. If no record is found matching the - * filter criteria, an empty object ({@link CompactionMetricsData#isEmpty()} == true) will be returned. + * filter criteria, null will be returned. * @param dbName name of database, non-null * @param tblName name of the table, non-null * @param partitionName name of the partition, can be null * @param type type of the delta metric, non-null - * @return instance of delta metrics info, always not null. + * @return instance of delta metrics info, can be null * @throws MetaException */ @RetrySemantics.ReadOnly @@ -737,9 +737,18 @@ public interface TxnStore extends Configurable { throws MetaException; /** - * Update or create one record in the compaction metrics cache. This operation uses an optimistic locking mechanism. - * If update fails, due to version mismatch, the operation won't be retried. - * @param data the object that is used for the update or create operation + * Create, update or delete one record in the compaction metrics cache. + * <p> + * If the metric is not found in the metrics cache, it will be created. + * </p> + * <p> + * If the metric is found, it will be updated. This operation uses an optimistic locking mechanism, meaning if another + * operation changed the value of this metric, the update will abort and won't be retried. + * </p> + * <p> + * If the new metric value is below {@link CompactionMetricsData#getThreshold()}, it will be deleted. + * </p> + * @param data the object that is used for the operation * @return true, if update finished successfully * @throws MetaException */