This is an automated email from the ASF dual-hosted git repository. lpinter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 28f0512 HIVE-25650 Make workerId and workerVersionId optional in the FindNextCompactRequest (#2749) (Viktor Csomor, reviewed by Laszlo Pinter) 28f0512 is described below commit 28f05124c50f9f89452f4ffa7910786a8ab1c706 Author: Viktor Csomor <csomor.vik...@gmail.com> AuthorDate: Thu Oct 28 10:58:11 2021 +0200 HIVE-25650 Make workerId and workerVersionId optional in the FindNextCompactRequest (#2749) (Viktor Csomor, reviewed by Laszlo Pinter) --- .../hadoop/hive/ql/txn/compactor/Worker.java | 9 +- .../metastore/txn/TestCompactionTxnHandler.java | 67 +++++++------ .../hive/ql/txn/compactor/CompactorTest.java | 5 +- .../hadoop/hive/ql/txn/compactor/TestCleaner.java | 6 +- .../hive/ql/txn/compactor/TestInitiator.java | 13 ++- .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 37 ++++---- .../src/gen/thrift/gen-cpp/hive_metastore_types.h | 15 ++- .../hive/metastore/api/FindNextCompactRequest.java | 105 +++++++++++---------- .../gen-php/metastore/FindNextCompactRequest.php | 4 +- .../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 4 - .../src/gen/thrift/gen-rb/hive_metastore_types.rb | 6 +- .../src/main/thrift/hive_metastore.thrift | 4 +- .../hive/metastore/txn/CompactionTxnHandler.java | 41 +++++--- .../hive/metastore/TestHiveMetaStoreTxns.java | 4 +- 14 files changed, 188 insertions(+), 132 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 1b8a13f..defa9b4 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -371,8 +371,11 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { return false; } } - ci = CompactionInfo.optionalCompactionInfoStructToInfo( - msc.findNextCompact(new FindNextCompactRequest(workerName, runtimeVersion))); + + FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest(); + findNextCompactRequest.setWorkerId(workerName); + findNextCompactRequest.setWorkerVersion(runtimeVersion); + ci = CompactionInfo.optionalCompactionInfoStructToInfo(msc.findNextCompact(findNextCompactRequest)); LOG.debug("Processing compaction request " + ci); if (ci == null) { @@ -803,4 +806,4 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { } } } -} \ No newline at end of file +} diff --git a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java index 66ee3d6..ea1abc6 100644 --- a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java +++ b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java @@ -92,14 +92,14 @@ public class TestCompactionTxnHandler { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); long now = System.currentTimeMillis(); - CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); assertEquals("foo", ci.dbname); assertEquals("bar", ci.tableName); assertEquals("ds=today", ci.partName); assertEquals(CompactionType.MINOR, ci.type); assertNull(ci.runAs); - assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION))); + assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); ci.runAs = "bob"; txnHandler.updateCompactorState(ci, openTxn()); @@ -129,7 +129,7 @@ public class TestCompactionTxnHandler { long now = System.currentTimeMillis(); boolean expectToday = false; - CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); assertEquals("foo", ci.dbname); assertEquals("bar", ci.tableName); @@ -138,7 +138,7 @@ public class TestCompactionTxnHandler { else fail("partition name should have been today or yesterday but was " + ci.partName); assertEquals(CompactionType.MINOR, ci.type); - ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); assertEquals("foo", ci.dbname); assertEquals("bar", ci.tableName); @@ -146,7 +146,7 @@ public class TestCompactionTxnHandler { else assertEquals("ds=yesterday", ci.partName); assertEquals(CompactionType.MINOR, ci.type); - assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION))); + assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List<ShowCompactResponseElement> compacts = rsp.getCompacts(); @@ -160,7 +160,7 @@ public class TestCompactionTxnHandler { @Test public void testFindNextToCompactNothingToCompact() throws Exception { - assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION))); + assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); } @Test @@ -168,11 +168,11 @@ public class TestCompactionTxnHandler { CompactionRequest rqst = new CompactionRequest("foo", "bar", CompactionType.MINOR); rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION))); + assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); @@ -194,16 +194,16 @@ public class TestCompactionTxnHandler { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); - CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION))); + assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0); assertEquals(1, toClean.size()); - assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION))); + assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); List<ShowCompactResponseElement> compacts = rsp.getCompacts(); @@ -223,18 +223,18 @@ public class TestCompactionTxnHandler { rqst.setPartitionname("ds=today"); txnHandler.compact(rqst); assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); - CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); txnHandler.markCompacted(ci); - assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION))); + assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); List<CompactionInfo> toClean = txnHandler.findReadyToClean(0, 0); assertEquals(1, toClean.size()); - assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION))); + assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); txnHandler.markCleaned(ci); - assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION))); + assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION))); assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -392,13 +392,13 @@ public class TestCompactionTxnHandler { rqst.setPartitionname(partitionName); txnHandler.compact(rqst); assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); - CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest(workerId, WORKER_VERSION)); + CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest(workerId, WORKER_VERSION)); assertNotNull(ci); assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); ci.errorMessage = errorMessage; txnHandler.markFailed(ci); - assertNull(txnHandler.findNextToCompact(new FindNextCompactRequest(workerId, WORKER_VERSION))); + assertNull(txnHandler.findNextToCompact(aFindNextCompactRequest(workerId, WORKER_VERSION))); boolean failedCheck = txnHandler.checkFailedCompactions(ci); assertFalse(failedCheck); try { @@ -460,7 +460,7 @@ public class TestCompactionTxnHandler { rqst.setPartitionname(partitionName); } txnHandler.compact(rqst); - ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); ci.errorMessage = errorMessage; txnHandler.markFailed(ci); @@ -473,7 +473,7 @@ public class TestCompactionTxnHandler { rqst.setPartitionname(partitionName); } txnHandler.compact(rqst); - ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); txnHandler.markCleaned(ci); } @@ -486,7 +486,7 @@ public class TestCompactionTxnHandler { rqst.setPartitionname(partitionName); } txnHandler.compact(rqst); - ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); ci.errorMessage = errorMessage; txnHandler.markCompacted(ci); @@ -612,9 +612,9 @@ public class TestCompactionTxnHandler { txnHandler.compact(rqst); rqst = new CompactionRequest("foo", "bazzoo", CompactionType.MINOR); txnHandler.compact(rqst); - assertNotNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred-193892", WORKER_VERSION))); - assertNotNull(txnHandler.findNextToCompact(new FindNextCompactRequest("bob-193892", WORKER_VERSION))); - assertNotNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred-193893", WORKER_VERSION))); + assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION))); + assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("bob-193892", WORKER_VERSION))); + assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193893", WORKER_VERSION))); txnHandler.revokeFromLocalWorkers("fred"); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -643,9 +643,9 @@ public class TestCompactionTxnHandler { rqst = new CompactionRequest("foo", "baz", CompactionType.MINOR); txnHandler.compact(rqst); - assertNotNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred-193892", WORKER_VERSION))); + assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION))); Thread.sleep(200); - assertNotNull(txnHandler.findNextToCompact(new FindNextCompactRequest("fred-193892", WORKER_VERSION))); + assertNotNull(txnHandler.findNextToCompact(aFindNextCompactRequest("fred-193892", WORKER_VERSION))); txnHandler.revokeTimedoutWorkers(100); ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); @@ -708,7 +708,7 @@ public class TestCompactionTxnHandler { //simulate prev failed compaction CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MINOR); txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); txnHandler.markFailed(ci); potentials = txnHandler.findPotentialCompactions(100, -1, 1); @@ -775,7 +775,7 @@ public class TestCompactionTxnHandler { CompactionRequest rqst = new CompactionRequest("mydb", "mytable", CompactionType.MAJOR); txnHandler.compact(rqst); assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); - ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); txnHandler.markCompacted(ci); @@ -799,7 +799,7 @@ public class TestCompactionTxnHandler { rqst.setPartitionname("bar"); txnHandler.compact(rqst); assertEquals(0, txnHandler.findReadyToClean(0, 0).size()); - ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); assertNotNull(ci); txnHandler.markCompacted(ci); @@ -881,7 +881,7 @@ public class TestCompactionTxnHandler { assertTrue(enqueueTime <= after); assertTrue(enqueueTime >= before); - CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); txnHandler.markFailed(ci); checkEnqueueTime(enqueueTime); @@ -904,7 +904,7 @@ public class TestCompactionTxnHandler { assertTrue(enqueueTime <= after); assertTrue(enqueueTime >= before); - CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + CompactionInfo ci = txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION)); ci.runAs = "bob"; txnHandler.updateCompactorState(ci, openTxn()); checkEnqueueTime(enqueueTime); @@ -916,6 +916,13 @@ public class TestCompactionTxnHandler { checkEnqueueTime(enqueueTime); } + private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) { + FindNextCompactRequest request = new FindNextCompactRequest(); + request.setWorkerId(workerId); + request.setWorkerVersion(workerVersion); + return request; + } + private void checkEnqueueTime(long enqueueTime) throws MetaException { ShowCompactResponse showCompactResponse = txnHandler.showCompact(new ShowCompactRequest()); ShowCompactResponseElement element = showCompactResponse.getCompacts().get(0); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java index b3788e4..5dc01f9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java @@ -638,7 +638,10 @@ public abstract class CompactorTest { protected long compactInTxn(CompactionRequest rqst) throws Exception { txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", WORKER_VERSION)); + FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest(); + findNextCompactRequest.setWorkerId("fred"); + findNextCompactRequest.setWorkerVersion(WORKER_VERSION); + CompactionInfo ci = txnHandler.findNextToCompact(findNextCompactRequest); ci.runAs = System.getProperty("user.name"); long compactorTxnId = openTxn(TxnType.COMPACTION); // Need to create a valid writeIdList to set the highestWriteId in ci diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java index 665d47c..a1205f4 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java @@ -106,7 +106,11 @@ public class TestCleaner extends CompactorTest { CompactionRequest rqst = new CompactionRequest("default", "camtc", CompactionType.MAJOR); txnHandler.compact(rqst); - CompactionInfo ci = txnHandler.findNextToCompact(new FindNextCompactRequest("fred", "4.0.0")); + + FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest(); + findNextCompactRequest.setWorkerId("fred"); + findNextCompactRequest.setWorkerVersion("4.0.0"); + CompactionInfo ci = txnHandler.findNextToCompact(findNextCompactRequest); ci.runAs = System.getProperty("user.name"); long compactTxn = openTxn(TxnType.COMPACTION); diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java index ba5b485..ca8e1e0 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestInitiator.java @@ -79,8 +79,8 @@ public class TestInitiator extends CompactorTest { rqst = new CompactionRequest("default", "rflw2", CompactionType.MINOR); txnHandler.compact(rqst); - txnHandler.findNextToCompact(new FindNextCompactRequest(ServerUtils.hostname() + "-193892", "4.0.0")); - txnHandler.findNextToCompact(new FindNextCompactRequest("nosuchhost-193892", "4.0.0")); + txnHandler.findNextToCompact(aFindNextCompactRequest(ServerUtils.hostname() + "-193892", "4.0.0")); + txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", "4.0.0")); startInitiator(); @@ -106,7 +106,7 @@ public class TestInitiator extends CompactorTest { CompactionRequest rqst = new CompactionRequest("default", "rfrw1", CompactionType.MINOR); txnHandler.compact(rqst); - txnHandler.findNextToCompact(new FindNextCompactRequest("nosuchhost-193892", "4.0.0")); + txnHandler.findNextToCompact(aFindNextCompactRequest("nosuchhost-193892", "4.0.0")); conf.setTimeVar(HiveConf.ConfVars.HIVE_COMPACTOR_WORKER_TIMEOUT, 1L, TimeUnit.MILLISECONDS); @@ -1068,6 +1068,13 @@ public class TestInitiator extends CompactorTest { Assert.assertEquals(ServerUtils.hostname(), String.join("-", Arrays.copyOfRange(parts, 0, parts.length - 1))); } + private static FindNextCompactRequest aFindNextCompactRequest(String workerId, String workerVersion) { + FindNextCompactRequest request = new FindNextCompactRequest(); + request.setWorkerId(workerId); + request.setWorkerVersion(workerVersion); + return request; + } + @Override boolean useHive130DeltaDirName() { return false; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index b19b42a..2ff5912 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -28000,10 +28000,12 @@ FindNextCompactRequest::~FindNextCompactRequest() noexcept { void FindNextCompactRequest::__set_workerId(const std::string& val) { this->workerId = val; +__isset.workerId = true; } void FindNextCompactRequest::__set_workerVersion(const std::string& val) { this->workerVersion = val; +__isset.workerVersion = true; } std::ostream& operator<<(std::ostream& out, const FindNextCompactRequest& obj) { @@ -28024,8 +28026,6 @@ uint32_t FindNextCompactRequest::read(::apache::thrift::protocol::TProtocol* ipr using ::apache::thrift::protocol::TProtocolException; - bool isset_workerId = false; - bool isset_workerVersion = false; while (true) { @@ -28038,7 +28038,7 @@ uint32_t FindNextCompactRequest::read(::apache::thrift::protocol::TProtocol* ipr case 1: if (ftype == ::apache::thrift::protocol::T_STRING) { xfer += iprot->readString(this->workerId); - isset_workerId = true; + this->__isset.workerId = true; } else { xfer += iprot->skip(ftype); } @@ -28046,7 +28046,7 @@ uint32_t FindNextCompactRequest::read(::apache::thrift::protocol::TProtocol* ipr case 2: if (ftype == ::apache::thrift::protocol::T_STRING) { xfer += iprot->readString(this->workerVersion); - isset_workerVersion = true; + this->__isset.workerVersion = true; } else { xfer += iprot->skip(ftype); } @@ -28060,10 +28060,6 @@ uint32_t FindNextCompactRequest::read(::apache::thrift::protocol::TProtocol* ipr xfer += iprot->readStructEnd(); - if (!isset_workerId) - throw TProtocolException(TProtocolException::INVALID_DATA); - if (!isset_workerVersion) - throw TProtocolException(TProtocolException::INVALID_DATA); return xfer; } @@ -28072,14 +28068,16 @@ uint32_t FindNextCompactRequest::write(::apache::thrift::protocol::TProtocol* op ::apache::thrift::protocol::TOutputRecursionTracker tracker(*oprot); xfer += oprot->writeStructBegin("FindNextCompactRequest"); - xfer += oprot->writeFieldBegin("workerId", ::apache::thrift::protocol::T_STRING, 1); - xfer += oprot->writeString(this->workerId); - xfer += oprot->writeFieldEnd(); - - xfer += oprot->writeFieldBegin("workerVersion", ::apache::thrift::protocol::T_STRING, 2); - xfer += oprot->writeString(this->workerVersion); - xfer += oprot->writeFieldEnd(); - + if (this->__isset.workerId) { + xfer += oprot->writeFieldBegin("workerId", ::apache::thrift::protocol::T_STRING, 1); + xfer += oprot->writeString(this->workerId); + xfer += oprot->writeFieldEnd(); + } + if (this->__isset.workerVersion) { + xfer += oprot->writeFieldBegin("workerVersion", ::apache::thrift::protocol::T_STRING, 2); + xfer += oprot->writeString(this->workerVersion); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -28089,22 +28087,25 @@ void swap(FindNextCompactRequest &a, FindNextCompactRequest &b) { using ::std::swap; swap(a.workerId, b.workerId); swap(a.workerVersion, b.workerVersion); + swap(a.__isset, b.__isset); } FindNextCompactRequest::FindNextCompactRequest(const FindNextCompactRequest& other1006) { workerId = other1006.workerId; workerVersion = other1006.workerVersion; + __isset = other1006.__isset; } FindNextCompactRequest& FindNextCompactRequest::operator=(const FindNextCompactRequest& other1007) { workerId = other1007.workerId; workerVersion = other1007.workerVersion; + __isset = other1007.__isset; return *this; } void FindNextCompactRequest::printTo(std::ostream& out) const { using ::apache::thrift::to_string; out << "FindNextCompactRequest("; - out << "workerId=" << to_string(workerId); - out << ", " << "workerVersion=" << to_string(workerVersion); + out << "workerId="; (__isset.workerId ? (out << to_string(workerId)) : (out << "<null>")); + out << ", " << "workerVersion="; (__isset.workerVersion ? (out << to_string(workerVersion)) : (out << "<null>")); out << ")"; } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h index 6787844..025e733 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -10598,6 +10598,11 @@ void swap(GetLatestCommittedCompactionInfoResponse &a, GetLatestCommittedCompact std::ostream& operator<<(std::ostream& out, const GetLatestCommittedCompactionInfoResponse& obj); +typedef struct _FindNextCompactRequest__isset { + _FindNextCompactRequest__isset() : workerId(false), workerVersion(false) {} + bool workerId :1; + bool workerVersion :1; +} _FindNextCompactRequest__isset; class FindNextCompactRequest : public virtual ::apache::thrift::TBase { public: @@ -10611,15 +10616,21 @@ class FindNextCompactRequest : public virtual ::apache::thrift::TBase { std::string workerId; std::string workerVersion; + _FindNextCompactRequest__isset __isset; + void __set_workerId(const std::string& val); void __set_workerVersion(const std::string& val); bool operator == (const FindNextCompactRequest & rhs) const { - if (!(workerId == rhs.workerId)) + if (__isset.workerId != rhs.__isset.workerId) return false; - if (!(workerVersion == rhs.workerVersion)) + else if (__isset.workerId && !(workerId == rhs.workerId)) + return false; + if (__isset.workerVersion != rhs.__isset.workerVersion) + return false; + else if (__isset.workerVersion && !(workerVersion == rhs.workerVersion)) return false; return true; } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FindNextCompactRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FindNextCompactRequest.java index 3b20e8a..568887a 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FindNextCompactRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/FindNextCompactRequest.java @@ -17,8 +17,8 @@ package org.apache.hadoop.hive.metastore.api; private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new FindNextCompactRequestStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new FindNextCompactRequestTupleSchemeFactory(); - private @org.apache.thrift.annotation.Nullable java.lang.String workerId; // required - private @org.apache.thrift.annotation.Nullable java.lang.String workerVersion; // required + private @org.apache.thrift.annotation.Nullable java.lang.String workerId; // optional + private @org.apache.thrift.annotation.Nullable java.lang.String workerVersion; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -84,12 +84,13 @@ package org.apache.hadoop.hive.metastore.api; } // isset id assignments + private static final _Fields optionals[] = {_Fields.WORKER_ID,_Fields.WORKER_VERSION}; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); - tmpMap.put(_Fields.WORKER_ID, new org.apache.thrift.meta_data.FieldMetaData("workerId", org.apache.thrift.TFieldRequirementType.REQUIRED, + tmpMap.put(_Fields.WORKER_ID, new org.apache.thrift.meta_data.FieldMetaData("workerId", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); - tmpMap.put(_Fields.WORKER_VERSION, new org.apache.thrift.meta_data.FieldMetaData("workerVersion", org.apache.thrift.TFieldRequirementType.REQUIRED, + tmpMap.put(_Fields.WORKER_VERSION, new org.apache.thrift.meta_data.FieldMetaData("workerVersion", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(FindNextCompactRequest.class, metaDataMap); @@ -98,15 +99,6 @@ package org.apache.hadoop.hive.metastore.api; public FindNextCompactRequest() { } - public FindNextCompactRequest( - java.lang.String workerId, - java.lang.String workerVersion) - { - this(); - this.workerId = workerId; - this.workerVersion = workerVersion; - } - /** * Performs a deep copy on <i>other</i>. */ @@ -324,35 +316,31 @@ package org.apache.hadoop.hive.metastore.api; java.lang.StringBuilder sb = new java.lang.StringBuilder("FindNextCompactRequest("); boolean first = true; - sb.append("workerId:"); - if (this.workerId == null) { - sb.append("null"); - } else { - sb.append(this.workerId); - } - first = false; - if (!first) sb.append(", "); - sb.append("workerVersion:"); - if (this.workerVersion == null) { - sb.append("null"); - } else { - sb.append(this.workerVersion); - } - first = false; + if (isSetWorkerId()) { + sb.append("workerId:"); + if (this.workerId == null) { + sb.append("null"); + } else { + sb.append(this.workerId); + } + first = false; + } + if (isSetWorkerVersion()) { + if (!first) sb.append(", "); + sb.append("workerVersion:"); + if (this.workerVersion == null) { + sb.append("null"); + } else { + sb.append(this.workerVersion); + } + first = false; + } sb.append(")"); return sb.toString(); } public void validate() throws org.apache.thrift.TException { // check for required fields - if (!isSetWorkerId()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'workerId' is unset! Struct:" + toString()); - } - - if (!isSetWorkerVersion()) { - throw new org.apache.thrift.protocol.TProtocolException("Required field 'workerVersion' is unset! Struct:" + toString()); - } - // check for sub-struct validity } @@ -420,14 +408,18 @@ package org.apache.hadoop.hive.metastore.api; oprot.writeStructBegin(STRUCT_DESC); if (struct.workerId != null) { - oprot.writeFieldBegin(WORKER_ID_FIELD_DESC); - oprot.writeString(struct.workerId); - oprot.writeFieldEnd(); + if (struct.isSetWorkerId()) { + oprot.writeFieldBegin(WORKER_ID_FIELD_DESC); + oprot.writeString(struct.workerId); + oprot.writeFieldEnd(); + } } if (struct.workerVersion != null) { - oprot.writeFieldBegin(WORKER_VERSION_FIELD_DESC); - oprot.writeString(struct.workerVersion); - oprot.writeFieldEnd(); + if (struct.isSetWorkerVersion()) { + oprot.writeFieldBegin(WORKER_VERSION_FIELD_DESC); + oprot.writeString(struct.workerVersion); + oprot.writeFieldEnd(); + } } oprot.writeFieldStop(); oprot.writeStructEnd(); @@ -446,17 +438,34 @@ package org.apache.hadoop.hive.metastore.api; @Override public void write(org.apache.thrift.protocol.TProtocol prot, FindNextCompactRequest struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol oprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - oprot.writeString(struct.workerId); - oprot.writeString(struct.workerVersion); + java.util.BitSet optionals = new java.util.BitSet(); + if (struct.isSetWorkerId()) { + optionals.set(0); + } + if (struct.isSetWorkerVersion()) { + optionals.set(1); + } + oprot.writeBitSet(optionals, 2); + if (struct.isSetWorkerId()) { + oprot.writeString(struct.workerId); + } + if (struct.isSetWorkerVersion()) { + oprot.writeString(struct.workerVersion); + } } @Override public void read(org.apache.thrift.protocol.TProtocol prot, FindNextCompactRequest struct) throws org.apache.thrift.TException { org.apache.thrift.protocol.TTupleProtocol iprot = (org.apache.thrift.protocol.TTupleProtocol) prot; - struct.workerId = iprot.readString(); - struct.setWorkerIdIsSet(true); - struct.workerVersion = iprot.readString(); - struct.setWorkerVersionIsSet(true); + java.util.BitSet incoming = iprot.readBitSet(2); + if (incoming.get(0)) { + struct.workerId = iprot.readString(); + struct.setWorkerIdIsSet(true); + } + if (incoming.get(1)) { + struct.workerVersion = iprot.readString(); + struct.setWorkerVersionIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/FindNextCompactRequest.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/FindNextCompactRequest.php index 53e2cee..9950693 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/FindNextCompactRequest.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/FindNextCompactRequest.php @@ -23,12 +23,12 @@ class FindNextCompactRequest static public $_TSPEC = array( 1 => array( 'var' => 'workerId', - 'isRequired' => true, + 'isRequired' => false, 'type' => TType::STRING, ), 2 => array( 'var' => 'workerVersion', - 'isRequired' => true, + 'isRequired' => false, 'type' => TType::STRING, ), ); diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 02a1ba9..756e31e 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -16065,10 +16065,6 @@ class FindNextCompactRequest(object): oprot.writeStructEnd() def validate(self): - if self.workerId is None: - raise TProtocolException(message='Required field workerId is unset!') - if self.workerVersion is None: - raise TProtocolException(message='Required field workerVersion is unset!') return def __repr__(self): diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 3d92e9c..57749a9 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -4621,15 +4621,13 @@ class FindNextCompactRequest WORKERVERSION = 2 FIELDS = { - WORKERID => {:type => ::Thrift::Types::STRING, :name => 'workerId'}, - WORKERVERSION => {:type => ::Thrift::Types::STRING, :name => 'workerVersion'} + WORKERID => {:type => ::Thrift::Types::STRING, :name => 'workerId', :optional => true}, + WORKERVERSION => {:type => ::Thrift::Types::STRING, :name => 'workerVersion', :optional => true} } def struct_fields; FIELDS; end def validate - raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field workerId is unset!') unless @workerId - raise ::Thrift::ProtocolException.new(::Thrift::ProtocolException::UNKNOWN, 'Required field workerVersion is unset!') unless @workerVersion end ::Thrift::Struct.generate_accessors self diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index 941fe58..e1d7006 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1331,8 +1331,8 @@ struct GetLatestCommittedCompactionInfoResponse { } struct FindNextCompactRequest { - 1: required string workerId, - 2: required string workerVersion + 1: optional string workerId, + 2: optional string workerVersion } struct AddDynamicPartitions { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index d37e6ad..79c06be 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -175,7 +175,9 @@ class CompactionTxnHandler extends TxnHandler { @Override @RetrySemantics.SafeToRetry public CompactionInfo findNextToCompact(String workerId) throws MetaException { - return findNextToCompact(new FindNextCompactRequest(workerId, null)); + FindNextCompactRequest findNextCompactRequest = new FindNextCompactRequest(); + findNextCompactRequest.setWorkerId(workerId); + return findNextToCompact(findNextCompactRequest); } /** @@ -190,6 +192,7 @@ class CompactionTxnHandler extends TxnHandler { if (rqst == null) { throw new MetaException("FindNextCompactRequest is null"); } + Connection dbConn = null; Statement stmt = null; //need a separate stmt for executeUpdate() otherwise it will close the ResultSet(HIVE-12725) @@ -198,10 +201,10 @@ class CompactionTxnHandler extends TxnHandler { try { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED); stmt = dbConn.createStatement(); - String s = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + + String query = "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + "\"CQ_TYPE\", \"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "'"; - LOG.debug("Going to execute query <" + s + ">"); - rs = stmt.executeQuery(s); + LOG.debug("Going to execute query <" + query + ">"); + rs = stmt.executeQuery(query); if (!rs.next()) { LOG.debug("No compactions found ready to compact"); dbConn.rollback(); @@ -216,25 +219,37 @@ class CompactionTxnHandler extends TxnHandler { info.partName = rs.getString(4); info.type = dbCompactionType2ThriftType(rs.getString(5).charAt(0)); info.properties = rs.getString(6); + + String workerId = rqst.getWorkerId(); + String workerVersion = rqst.getWorkerVersion(); + String workerIdSqlValue = (workerId == null) ? "NULL" : ("'" + workerId + "'"); + // Now, update this record as being worked on by this worker. long now = getDbTime(dbConn); - s = "UPDATE \"COMPACTION_QUEUE\" SET \"CQ_WORKER_ID\" = '" + rqst.getWorkerId() + "', " + - "\"CQ_WORKER_VERSION\" = '" + rqst.getWorkerVersion() + "', " + - "\"CQ_START\" = " + now + ", \"CQ_STATE\" = '" + WORKING_STATE + "' WHERE \"CQ_ID\" = " + info.id + - " AND \"CQ_STATE\"='" + INITIATED_STATE + "'"; - LOG.debug("Going to execute update <" + s + ">"); - int updCount = updStmt.executeUpdate(s); + query = "" + + "UPDATE " + + " \"COMPACTION_QUEUE\" " + + "SET " + + " \"CQ_WORKER_ID\" = " + workerIdSqlValue + ", " + + " \"CQ_WORKER_VERSION\" = '" + workerVersion + "', " + + " \"CQ_START\" = " + now + ", " + + " \"CQ_STATE\" = '" + WORKING_STATE + "' " + + "WHERE \"CQ_ID\" = " + info.id + + " AND \"CQ_STATE\"='" + INITIATED_STATE + "'"; + + LOG.debug("Going to execute update <" + query + ">"); + int updCount = updStmt.executeUpdate(query); if(updCount == 1) { dbConn.commit(); return info; } if(updCount == 0) { - LOG.debug("Worker {} (version: {}) picked up {}", rqst.getWorkerId(), rqst.getWorkerVersion(), info); + LOG.debug("Worker {} (version: {}) picked up {}", workerId, workerVersion, info); continue; } LOG.error("Unable to set to cq_state=" + WORKING_STATE + " for compaction record: " + - info + ". updCnt=" + updCount + ". workerId=" + rqst.getWorkerId() + - ". workerVersion=" + rqst.getWorkerVersion()); + info + ". updCnt=" + updCount + ". workerId=" + workerId + + ". workerVersion=" + workerVersion); dbConn.rollback(); return null; } while( rs.next()); diff --git a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java index 10f5f40..ff7e2d1 100644 --- a/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java +++ b/standalone-metastore/metastore-server/src/test/java/org/apache/hadoop/hive/metastore/TestHiveMetaStoreTxns.java @@ -419,7 +419,9 @@ public class TestHiveMetaStoreTxns { tbl = client.getTable(dbName, tblName); client.compact2(tbl.getDbName(), tbl.getTableName(), null, CompactionType.MINOR, new HashMap<>()); - OptionalCompactionInfoStruct optionalCi = client.findNextCompact(new FindNextCompactRequest("myworker", null)); + FindNextCompactRequest compactRequest = new FindNextCompactRequest(); + compactRequest.setWorkerId("myworker"); + OptionalCompactionInfoStruct optionalCi = client.findNextCompact(compactRequest); client.markCleaned(optionalCi.getCi()); GetLatestCommittedCompactionInfoRequest rqst = new GetLatestCommittedCompactionInfoRequest();