This is an automated email from the ASF dual-hosted git repository. veghlaci05 pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new abbabdd86b3 HIVE-26735: Ability to sort the data during rebalancing compaction (Laszlo Vegh, reviewed by Krisztian Kasa, Denys Kuzmenko, Sourabh Badhya) abbabdd86b3 is described below commit abbabdd86b39e499a16105787028a9acb8baaef4 Author: veghlaci05 <veghlac...@gmail.com> AuthorDate: Wed Mar 8 19:14:51 2023 +0100 HIVE-26735: Ability to sort the data during rebalancing compaction (Laszlo Vegh, reviewed by Krisztian Kasa, Denys Kuzmenko, Sourabh Badhya) * HIVE-26735: Ability to sort the data during rebalancing compaction * fix failing tests * address review comments * address review comments - 2 * remove empty test methods --- .../org/apache/hadoop/hive/conf/Constants.java | 1 + .../ql/txn/compactor/CompactionPoolOnTezTest.java | 13 +- .../hive/ql/txn/compactor/CompactorOnTezTest.java | 3 +- .../ql/txn/compactor/TestCrudCompactorOnTez.java | 322 ++++++++++++++------- .../hadoop/hive/ql/parse/AlterClauseParser.g | 4 +- .../java/org/apache/hadoop/hive/ql/Compiler.java | 3 +- ql/src/java/org/apache/hadoop/hive/ql/Driver.java | 1 + .../storage/compact/AlterTableCompactAnalyzer.java | 7 +- .../storage/compact/AlterTableCompactDesc.java | 9 +- .../compact/AlterTableCompactOperation.java | 1 + .../concatenate/AlterTableConcatenateAnalyzer.java | 2 +- .../ql/txn/compactor/CompactionQueryBuilder.java | 53 +++- .../ql/txn/compactor/RebalanceQueryCompactor.java | 6 +- .../hadoop/hive/ql/txn/compactor/Worker.java | 9 +- .../hadoop/hive/ql/txn/compactor/TestWorker.java | 6 +- .../gen/thrift/gen-cpp/hive_metastore_types.cpp | 52 +++- .../src/gen/thrift/gen-cpp/hive_metastore_types.h | 29 +- .../hive/metastore/api/CompactionInfoStruct.java | 114 +++++++- .../hive/metastore/api/CompactionRequest.java | 114 +++++++- .../apache/hadoop/hive/metastore/api/TxnType.java | 5 +- .../gen-php/metastore/CompactionInfoStruct.php | 24 ++ .../thrift/gen-php/metastore/CompactionRequest.php | 24 ++ .../src/gen/thrift/gen-php/metastore/TxnType.php | 3 + .../src/gen/thrift/gen-py/hive_metastore/ttypes.py | 31 +- .../src/gen/thrift/gen-rb/hive_metastore_types.rb | 13 +- .../hadoop/hive/metastore/txn/TxnQueries.java | 9 +- .../src/main/thrift/hive_metastore.thrift | 5 +- .../hadoop/hive/metastore/txn/CompactionInfo.java | 61 ++-- .../hive/metastore/txn/CompactionTxnHandler.java | 17 +- .../hadoop/hive/metastore/txn/TxnHandler.java | 9 +- .../hive/metastore/utils/MetaStoreServerUtils.java | 5 + .../src/main/sql/derby/hive-schema-4.0.0.derby.sql | 6 +- .../derby/upgrade-4.0.0-alpha-2-to-4.0.0.derby.sql | 6 +- .../src/main/sql/mssql/hive-schema-4.0.0.mssql.sql | 2 + .../mssql/upgrade-4.0.0-alpha-2-to-4.0.0.mssql.sql | 6 +- .../src/main/sql/mysql/hive-schema-4.0.0.mysql.sql | 6 +- .../mysql/upgrade-4.0.0-alpha-2-to-4.0.0.mysql.sql | 6 +- .../main/sql/oracle/hive-schema-4.0.0.oracle.sql | 6 +- .../upgrade-4.0.0-alpha-2-to-4.0.0.oracle.sql | 6 +- .../sql/postgres/hive-schema-4.0.0.postgres.sql | 6 +- .../upgrade-4.0.0-alpha-2-to-4.0.0.postgres.sql | 6 +- .../upgrade-3.1.3000-to-4.0.0.postgres.sql | 6 +- 42 files changed, 800 insertions(+), 217 deletions(-) diff --git a/common/src/java/org/apache/hadoop/hive/conf/Constants.java b/common/src/java/org/apache/hadoop/hive/conf/Constants.java index d39c671cac4..ef05123560f 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/Constants.java +++ b/common/src/java/org/apache/hadoop/hive/conf/Constants.java @@ -102,6 +102,7 @@ public class Constants { public static final Pattern COMPACTION_POOLS_PATTERN = Pattern.compile("hive\\.compactor\\.worker\\.(.*)\\.threads"); public static final String HIVE_COMPACTOR_WORKER_POOL = "hive.compactor.worker.pool"; + public static final String HIVE_COMPACTOR_REBALANCE_ORDERBY = "hive.compactor.rebalance.orderby"; public static final String HTTP_HEADER_REQUEST_TRACK = "X-Request-ID"; public static final String TIME_POSTFIX_REQUEST_TRACK = "_TIME"; diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionPoolOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionPoolOnTezTest.java index f7e6985cbc7..ee552208bbe 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionPoolOnTezTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionPoolOnTezTest.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; @@ -68,7 +69,7 @@ public class CompactionPoolOnTezTest extends CompactorOnTezTest { @Test public void testAlterTableCompactCommandRespectsPoolName() throws Exception { Map<String, String> properties = new HashMap<>(); - properties.put("hive.compactor.worker.pool", "pool1"); + properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "pool1"); provider.createFullAcidTable(null, DEFAULT_TABLE_NAME, false, false, properties); provider.insertTestData(DEFAULT_TABLE_NAME, false); @@ -91,7 +92,7 @@ public class CompactionPoolOnTezTest extends CompactorOnTezTest { @Test public void testInitiatorRespectsTableLevelPoolName() throws Exception { Map<String, String> properties = new HashMap<>(); - properties.put("hive.compactor.worker.pool", "pool1"); + properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "pool1"); provider.createFullAcidTable(null, DEFAULT_TABLE_NAME, false, false, properties); provider.insertTestData(DEFAULT_TABLE_NAME, false); @@ -114,7 +115,7 @@ public class CompactionPoolOnTezTest extends CompactorOnTezTest { public void testInitiatorRespectsTableLevelPoolNameOverDbLevel() throws Exception { provider.createDb(NON_DEFAULT_DB_NAME, "db_pool"); Map<String, String> properties = new HashMap<>(); - properties.put("hive.compactor.worker.pool", "table_pool"); + properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "table_pool"); provider.createFullAcidTable(NON_DEFAULT_DB_NAME, DEFAULT_TABLE_NAME, false, false, properties); provider.insertTestData(NON_DEFAULT_DB_NAME, DEFAULT_TABLE_NAME); @@ -126,7 +127,7 @@ public class CompactionPoolOnTezTest extends CompactorOnTezTest { @Test public void testShowCompactionsContainsPoolName() throws Exception { Map<String, String> properties = new HashMap<>(); - properties.put("hive.compactor.worker.pool", "pool1"); + properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "pool1"); provider.createFullAcidTable(null, DEFAULT_TABLE_NAME, false, false, properties); provider.insertTestData(DEFAULT_TABLE_NAME, false); provider.createFullAcidTable(null, "table2", false, false); @@ -150,10 +151,10 @@ public class CompactionPoolOnTezTest extends CompactorOnTezTest { @Test public void testShowCompactionsRespectPoolName() throws Exception { Map<String, String> properties = new HashMap<>(); - properties.put("hive.compactor.worker.pool", "pool1"); + properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "pool1"); provider.createFullAcidTable(null, DEFAULT_TABLE_NAME, false, false, properties); provider.insertTestData(DEFAULT_TABLE_NAME, false); - properties.put("hive.compactor.worker.pool", "pool2"); + properties.put(Constants.HIVE_COMPACTOR_WORKER_POOL, "pool2"); provider.createFullAcidTable(null, "table2", false, false, properties); provider.insertTestData("table2", false); diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java index 4c6209d55c7..56ca350b739 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.Constants; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; import org.apache.hadoop.hive.metastore.IMetaStoreClient; @@ -286,7 +287,7 @@ public abstract class CompactorOnTezTest { void createDb(String dbName, String poolName) throws Exception { executeStatementOnDriver("drop database if exists " + dbName + " cascade", driver); - executeStatementOnDriver("create database " + dbName + " WITH DBPROPERTIES('hive.compactor.worker.pool'='" + poolName + "')", driver); + executeStatementOnDriver("create database " + dbName + " WITH DBPROPERTIES('" + Constants.HIVE_COMPACTOR_WORKER_POOL + "'='" + poolName + "')", driver); } /** diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java index dd5686ac84c..62d4a008027 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java @@ -25,6 +25,8 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -61,6 +63,7 @@ import org.apache.hadoop.hive.ql.hooks.proto.HiveHookEvents; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.BucketCodec; +import org.apache.hadoop.hive.ql.lockmgr.LockException; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.processors.CommandProcessorException; import org.apache.hive.streaming.HiveStreamingConnection; @@ -76,6 +79,7 @@ import org.apache.orc.impl.RecordReaderImpl; import org.junit.Assert; import org.junit.Test; import org.mockito.ArgumentCaptor; +import org.mockito.Mockito; import org.mockito.internal.util.reflection.FieldSetter; import static org.apache.hadoop.hive.ql.TxnCommandsBaseForTests.runWorker; @@ -88,73 +92,178 @@ import static org.mockito.Mockito.*; public class TestCrudCompactorOnTez extends CompactorOnTezTest { @Test - public void testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable() throws Exception { + public void testRebalanceCompactionWithParallelDeleteAsSecondOptimisticLock() throws Exception { + testRebalanceCompactionWithParallelDeleteAsSecond(true); + } + + @Test + public void testRebalanceCompactionWithParallelDeleteAsSecondPessimisticLock() throws Exception { + testRebalanceCompactionWithParallelDeleteAsSecond(false); + } + + private void testRebalanceCompactionWithParallelDeleteAsSecond(boolean optimisticLock) throws Exception { conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false); conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, optimisticLock); //set grouping size to have 3 buckets, and re-create driver with the new config conf.set("tez.grouping.min-size", "1000"); conf.set("tez.grouping.max-size", "80000"); driver = new Driver(conf); - final String stageTableName = "stage_rebalance_test"; final String tableName = "rebalance_test"; - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf); + TestDataProvider testDataProvider = prepareRebalanceTestData(tableName); - TestDataProvider testDataProvider = new TestDataProvider(); - testDataProvider.createFullAcidTable(stageTableName, true, false); - testDataProvider.insertTestData(stageTableName, true); + //Try to do a rebalancing compaction + executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance' ORDER BY b DESC", driver); + + CountDownLatch startDelete = new CountDownLatch(1); + CountDownLatch endDelete = new CountDownLatch(1); + CompactorFactory factory = Mockito.spy(CompactorFactory.getInstance()); + doAnswer(invocation -> { + Object result = invocation.callRealMethod(); + startDelete.countDown(); + Thread.sleep(1000); + return result; + }).when(factory).getCompactorPipeline(any(), any(), any(), any()); - executeStatementOnDriver("drop table if exists " + tableName, driver); - executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) " + - "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver); - executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select a, b from " + stageTableName, driver); + Worker worker = new Worker(factory); + worker.setConf(conf); + worker.init(new AtomicBoolean(true)); + worker.start(); - //do some single inserts to have more data in the first bucket. - executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('12',12)", driver); - executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('13',13)", driver); - executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('14',14)", driver); - executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('15',15)", driver); - executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('16',16)", driver); - executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('17',17)", driver); + if (!startDelete.await(10, TimeUnit.SECONDS)) { + throw new RuntimeException("Waiting for the compaction to start timed out!"); + } + + boolean aborted = false; + try { + executeStatementOnDriver("DELETE FROM " + tableName + " WHERE b = 12", driver); + } catch (CommandProcessorException e) { + if (optimisticLock) { + Assert.fail("In case of TXN_WRITE_X_LOCK = true, the transaction must be retried instead of being aborted."); + } + aborted = true; + Assert.assertEquals(e.getCause().getClass(), LockException.class); + Assert.assertEquals(e.getCauseMessage(), "Transaction manager has aborted the transaction txnid:21. Reason: Aborting [txnid:21,24] due to a write conflict on default/rebalance_test committed by [txnid:20,24] d/u"); + // Delete the record, so the rest of the test can be the same in both cases + executeStatementOnDriver("DELETE FROM " + tableName + " WHERE b = 12", driver); + } finally { + if(!optimisticLock && !aborted) { + Assert.fail("In case of TXN_WRITE_X_LOCK = false, the transaction must be aborted instead of being retried."); + } + } + endDelete.countDown(); + + worker.join(); + + driver.close(); + driver = new Driver(conf); + + List<String> result = execSelectAndDumpData("select * from " + tableName + " WHERE b = 12", driver, + "Dumping data for " + tableName + " after load:"); + Assert.assertEquals(0, result.size()); + + //Check if the compaction succeed + verifyCompaction(1, TxnStore.CLEANING_RESPONSE); - // Verify buckets and their content before rebalance - Table table = msClient.getTable("default", tableName); - FileSystem fs = FileSystem.get(conf); - Assert.assertEquals("Test setup does not match the expected: different buckets", - Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"), - CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001")); String[][] expectedBuckets = new String[][] { { - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4", - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2", - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3", - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4", - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2", - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3", - "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12", - "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13", - "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14", - "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15", - "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16", "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t16\t16", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":2}\t15\t15", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":3}\t14\t14", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":4}\t13\t13", }, { - "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4", - "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3", - "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4", - "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3", + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":6}\t3\t4", + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":7}\t4\t4", + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":8}\t2\t4", + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":9}\t5\t4", + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":10}\t6\t4", + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":11}\t4\t3", }, { - "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3", - "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":12}\t2\t3", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":13}\t3\t3", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":14}\t6\t3", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":15}\t5\t3", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":16}\t6\t2", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t5\t2", }, }; - for(int i = 0; i < 3; i++) { - Assert.assertEquals("unbalanced bucket " + i, Arrays.asList(expectedBuckets[i]), - testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + "")); - } + verifyRebalance(testDataProvider, tableName, null, expectedBuckets, + new String[] {"bucket_00000", "bucket_00001", "bucket_00002"}, "base_0000007_v0000020"); + } + + @Test + public void testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTableWithOrder() throws Exception { + conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false); + conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + + //set grouping size to have 3 buckets, and re-create driver with the new config + conf.set("tez.grouping.min-size", "1000"); + conf.set("tez.grouping.max-size", "80000"); + driver = new Driver(conf); + + final String tableName = "rebalance_test"; + TestDataProvider testDataProvider = prepareRebalanceTestData(tableName); + + //Try to do a rebalancing compaction + executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance' ORDER BY b DESC", driver); + runWorker(conf); + + driver.close(); + driver = new Driver(conf); + + //Check if the compaction succeed + verifyCompaction(1, TxnStore.CLEANING_RESPONSE); + + String[][] expectedBuckets = new String[][] { + { + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":1}\t16\t16", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":2}\t15\t15", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":3}\t14\t14", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":4}\t13\t13", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":5}\t12\t12", + }, + { + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":6}\t3\t4", + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":7}\t4\t4", + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":8}\t2\t4", + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":9}\t5\t4", + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":10}\t6\t4", + "{\"writeid\":7,\"bucketid\":536936448,\"rowid\":11}\t4\t3", + }, + { + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":12}\t2\t3", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":13}\t3\t3", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":14}\t6\t3", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":15}\t5\t3", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":16}\t6\t2", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t5\t2", + }, + }; + verifyRebalance(testDataProvider, tableName, null, expectedBuckets, + new String[] {"bucket_00000", "bucket_00001", "bucket_00002"}, "base_0000007_v0000020"); + } + + @Test + public void testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable() throws Exception { + conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false); + conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + + //set grouping size to have 3 buckets, and re-create driver with the new config + conf.set("tez.grouping.min-size", "1000"); + conf.set("tez.grouping.max-size", "80000"); + driver = new Driver(conf); + + final String tableName = "rebalance_test"; + TestDataProvider testDataProvider = prepareRebalanceTestData(tableName); //Try to do a rebalancing compaction executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance'", driver); @@ -163,11 +272,7 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { //Check if the compaction succeed verifyCompaction(1, TxnStore.CLEANING_RESPONSE); - // Verify buckets and their content after rebalance - Assert.assertEquals("Buckets does not match after compaction", - Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"), - CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000007_v0000020")); - expectedBuckets = new String[][] { + String[][] expectedBuckets = new String[][] { { "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4", "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2", @@ -193,10 +298,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17", }, }; - for(int i = 0; i < 3; i++) { - Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]), - testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + "")); - } + verifyRebalance(testDataProvider, tableName, null, expectedBuckets, + new String[] {"bucket_00000", "bucket_00001", "bucket_00002"}, "base_0000007_v0000020"); } @Test @@ -278,10 +381,6 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { //Check if the compaction succeed verifyCompaction(1, TxnStore.CLEANING_RESPONSE); - // Verify buckets and their content after rebalance in partition ds=tomorrow - Assert.assertEquals("Buckets does not match after compaction", - Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"), - CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow", "base_0000007_v0000016")); expectedBuckets = new String[][] { { "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow", @@ -308,10 +407,8 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17\ttomorrow", }, }; - for(int i = 0; i < 3; i++) { - Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]), - testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + "")); - } + verifyRebalance(testDataProvider, tableName, "ds=tomorrow", expectedBuckets, + new String[] {"bucket_00000", "bucket_00001", "bucket_00002"}, "base_0000007_v0000016"); } @Test @@ -347,9 +444,49 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { conf.set("tez.grouping.max-size", "80000"); driver = new Driver(conf); - final String stageTableName = "stage_rebalance_test"; final String tableName = "rebalance_test"; - AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf); + TestDataProvider testDataProvider = prepareRebalanceTestData(tableName); + + //Try to do a rebalancing compaction + executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance' CLUSTERED INTO 4 BUCKETS", driver); + runWorker(conf); + + verifyCompaction(1, TxnStore.CLEANING_RESPONSE); + + String[][] expectedBuckets = new String[][] { + { + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2", + }, + { + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":5}\t5\t3", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t2\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t3", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t4\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t4\t3", + }, + { + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":10}\t2\t3", + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":11}\t3\t4", + "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12", + "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13", + "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14", + }, + { + "{\"writeid\":5,\"bucketid\":537067520,\"rowid\":15}\t15\t15", + "{\"writeid\":6,\"bucketid\":537067520,\"rowid\":16}\t16\t16", + "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":17}\t17\t17", + }, + }; + verifyRebalance(testDataProvider, tableName, null, expectedBuckets, + new String[] {"bucket_00000", "bucket_00001", "bucket_00002", "bucket_00003"}, "base_0000007_v0000020"); + } + + private TestDataProvider prepareRebalanceTestData(String tableName) throws Exception { + final String stageTableName = "stage_" + tableName; TestDataProvider testDataProvider = new TestDataProvider(); testDataProvider.createFullAcidTable(stageTableName, true, false); @@ -400,57 +537,24 @@ public class TestCrudCompactorOnTez extends CompactorOnTezTest { "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4", }, }; + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf); for(int i = 0; i < 3; i++) { - Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]), + Assert.assertEquals("unbalanced bucket " + i, Arrays.asList(expectedBuckets[i]), testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + "")); } + return testDataProvider; + } - //Try to do a rebalancing compaction - executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance' CLUSTERED INTO 4 BUCKETS", driver); - runWorker(conf); - - //Check if the compaction succeed - TxnStore txnHandler = TxnUtils.getTxnStore(conf); - ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); - List<ShowCompactResponseElement> compacts = rsp.getCompacts(); - Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, compacts.size()); - Assert.assertEquals("Expecting compaction state 'ready for cleaning' and found:" + compacts.get(0).getState(), - "ready for cleaning", compacts.get(0).getState()); - + private void verifyRebalance(TestDataProvider testDataProvider, String tableName, String partitionName, + String[][] expectedBucketContent, String[] bucketNames, String folderName) throws Exception { // Verify buckets and their content after rebalance - Assert.assertEquals("Buckets does not match after compaction", - Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002", "bucket_00003"), - CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000007_v0000020")); - expectedBuckets = new String[][] { - { - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4", - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2", - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3", - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4", - "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2", - }, - { - "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":5}\t5\t3", - "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t2\t4", - "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t3", - "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t4\t4", - "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t4\t3", - }, - { - "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":10}\t2\t3", - "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":11}\t3\t4", - "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12", - "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13", - "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14", - }, - { - "{\"writeid\":5,\"bucketid\":537067520,\"rowid\":15}\t15\t15", - "{\"writeid\":6,\"bucketid\":537067520,\"rowid\":16}\t16\t16", - "{\"writeid\":7,\"bucketid\":537067520,\"rowid\":17}\t17\t17", - }, - }; - for(int i = 0; i < expectedBuckets.length; i++) { - Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]), + Table table = msClient.getTable("default", tableName); + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals("Buckets does not match after compaction", Arrays.asList(bucketNames), + CompactorTestUtil.getBucketFileNames(fs, table, partitionName, folderName)); + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf); + for(int i = 0; i < expectedBucketContent.length; i++) { + Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBucketContent[i]), testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + "")); } } diff --git a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g index 2542909cb80..befca385ac0 100644 --- a/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g +++ b/parser/src/java/org/apache/hadoop/hive/ql/parse/AlterClauseParser.g @@ -440,8 +440,8 @@ compactPool alterStatementSuffixCompact @init { gParent.msgs.push("compaction request"); } @after { gParent.msgs.pop(); } - : KW_COMPACT compactType=StringLiteral tableImplBuckets? blocking? compactPool? (KW_WITH KW_OVERWRITE KW_TBLPROPERTIES tableProperties)? - -> ^(TOK_ALTERTABLE_COMPACT $compactType tableImplBuckets? blocking? compactPool? tableProperties?) + : KW_COMPACT compactType=StringLiteral tableImplBuckets? blocking? compactPool? (KW_WITH KW_OVERWRITE KW_TBLPROPERTIES tableProperties)? orderByClause? + -> ^(TOK_ALTERTABLE_COMPACT $compactType tableImplBuckets? blocking? compactPool? tableProperties? orderByClause?) ; alterStatementSuffixSetOwner diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java index 26377ed9374..9d677cd6d24 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Compiler.java @@ -32,6 +32,7 @@ import org.apache.hadoop.hive.metastore.HiveMetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Schema; import org.apache.hadoop.hive.metastore.api.TxnType; +import org.apache.hadoop.hive.metastore.utils.MetaStoreServerUtils; import org.apache.hadoop.hive.ql.exec.ExplainTask; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.repl.util.ReplUtils; @@ -263,7 +264,7 @@ public class Compiler { private void openTransaction(TxnType txnType) throws LockException, CommandProcessorException { if (DriverUtils.checkConcurrency(driverContext) && startImplicitTxn(driverContext.getTxnManager()) && - !driverContext.getTxnManager().isTxnOpen() && txnType != TxnType.COMPACTION) { + !driverContext.getTxnManager().isTxnOpen() && !MetaStoreServerUtils.isCompactionTxn(txnType)) { String userFromUGI = DriverUtils.getUserFromUGI(driverContext); if (HiveOperation.REPLDUMP.equals(driverContext.getQueryState().getHiveOperation()) || HiveOperation.REPLLOAD.equals(driverContext.getQueryState().getHiveOperation())) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java index d52c661872c..663585ef9c6 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java @@ -289,6 +289,7 @@ public class Driver implements IDriver { } // Since we're reusing the compiled plan, we need to update its start time for current run driverContext.getPlan().setQueryStartTime(driverContext.getQueryDisplay().getQueryStartTime()); + driverContext.setRetrial(false); } // Re-check snapshot only in case we had to release locks and open a new transaction, // otherwise exclusive locks should protect output tables/partitions in snapshot from concurrent writes. diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java index f0662f28713..7c80a92ac61 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactAnalyzer.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.ddl.table.storage.compact; import java.util.Map; +import org.antlr.runtime.TokenRewriteStream; import org.antlr.runtime.tree.Tree; import org.apache.hadoop.hive.common.TableName; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -56,6 +57,7 @@ public class AlterTableCompactAnalyzer extends AbstractAlterTableAnalyzer { Map<String, String> mapProp = null; boolean isBlocking = false; String poolName = null; + String orderBy = null; for (int i = 0; i < command.getChildCount(); i++) { Tree node = command.getChild(i); switch (node.getType()) { @@ -75,13 +77,16 @@ public class AlterTableCompactAnalyzer extends AbstractAlterTableAnalyzer { throw new SemanticException("Could not parse bucket number: " + node.getChild(0).getText()); } break; + case HiveParser.TOK_ORDERBY: + orderBy = this.ctx.getTokenRewriteStream().toOriginalString(node.getTokenStartIndex(), node.getTokenStopIndex()); + break; default: break; } } AlterTableCompactDesc desc = new AlterTableCompactDesc(tableName, partitionSpec, type, isBlocking, poolName, - numberOfBuckets, mapProp); + numberOfBuckets, mapProp, orderBy); addInputsOutputsAlterTable(tableName, partitionSpec, desc, desc.getType(), false); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactDesc.java index 48876498756..eb0d0c2d2ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactDesc.java @@ -41,10 +41,11 @@ public class AlterTableCompactDesc extends AbstractAlterTableDesc implements DDL private final String poolName; private final int numberOfBuckets; private final Map<String, String> properties; + private final String orderByClause; private Long writeId; public AlterTableCompactDesc(TableName tableName, Map<String, String> partitionSpec, String compactionType, - boolean isBlocking, String poolName, int numberOfBuckets, Map<String, String> properties) + boolean isBlocking, String poolName, int numberOfBuckets, Map<String, String> properties, String orderByClause) throws SemanticException{ super(AlterTableType.COMPACT, tableName, partitionSpec, null, false, false, properties); this.tableName = tableName.getNotEmptyDbTable(); @@ -54,6 +55,7 @@ public class AlterTableCompactDesc extends AbstractAlterTableDesc implements DDL this.poolName = poolName; this.numberOfBuckets = numberOfBuckets; this.properties = properties; + this.orderByClause = orderByClause; } @Explain(displayName = "table name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) @@ -92,6 +94,11 @@ public class AlterTableCompactDesc extends AbstractAlterTableDesc implements DDL return properties; } + @Explain(displayName = "order by", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + public String getOrderByClause() { + return orderByClause; + } + @Override public void setWriteId(long writeId) { this.writeId = writeId; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java index e1bcd9a92c8..9187101367b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java @@ -103,6 +103,7 @@ public class AlterTableCompactOperation extends DDLOperation<AlterTableCompactDe req.setProperties(desc.getProperties()); req.setInitiatorId(JavaUtils.hostname() + "-" + HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION); req.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion()); + req.setOrderByClause(desc.getOrderByClause()); if (desc.getNumberOfBuckets() > 0) { req.setNumberOfBuckets(desc.getNumberOfBuckets()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/concatenate/AlterTableConcatenateAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/concatenate/AlterTableConcatenateAnalyzer.java index c81bd2b72ef..8e4a9d1b6fd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/concatenate/AlterTableConcatenateAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/concatenate/AlterTableConcatenateAnalyzer.java @@ -104,7 +104,7 @@ public class AlterTableConcatenateAnalyzer extends AbstractAlterTableAnalyzer { boolean isBlocking = !HiveConf.getBoolVar(conf, ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, false); AlterTableCompactDesc desc = new AlterTableCompactDesc(tableName, partitionSpec, CompactionType.MAJOR.name(), isBlocking, - poolName, 0, null); + poolName, 0, null, null); addInputsOutputsAlterTable(tableName, partitionSpec, desc, desc.getType(), false); rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); setAcidDdlDesc(getTable(tableName), desc); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java index 50c616749a3..3ca930d5cf5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.commons.lang3.ArrayUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.ColumnType; @@ -37,7 +38,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.util.DirectionUtils; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.util.StringUtils; import org.apache.hive.common.util.HiveStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -73,6 +73,7 @@ class CompactionQueryBuilder { private Partition sourcePartition; // for Insert in major and insert-only minor private String sourceTabForInsert; // for Insert private int numberOfBuckets; //for rebalance + private String orderByClause; //for rebalance // settable booleans private boolean isPartitioned; // for Create @@ -172,6 +173,16 @@ class CompactionQueryBuilder { return this; } + /** + * Sets the order by clause for a rebalancing compaction. It will be used to re-order the data in the table during + * the compaction. + * @param orderByClause The ORDER BY clause to use for data reordering. + */ + public CompactionQueryBuilder setOrderByClause(String orderByClause) { + this.orderByClause = orderByClause; + return this; + } + /** * If true, Create operations will result in a table with partition column `file_name`. */ @@ -237,7 +248,7 @@ class CompactionQueryBuilder { query.append(" temporary external"); } if (operation == Operation.INSERT) { - query.append(" into"); + query.append(CompactionType.REBALANCE.equals(compactionType) ? " overwrite" : " into"); } query.append(" table "); @@ -306,12 +317,18 @@ class CompactionQueryBuilder { } switch (compactionType) { case REBALANCE: { - query.append("0, t2.writeId, t2.rowId / CEIL(numRows / "); - query.append(numberOfBuckets); - query.append("), t2.rowId, t2.writeId, t2.data from (select "); - query.append("count(ROW__ID.writeId) over() as numRows, ROW__ID.writeId as writeId, " + - "(row_number() OVER (order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC)) -1 AS rowId, " + - "NAMED_STRUCT("); + query.append("0, t2.writeId, t2.rowId DIV CEIL(numRows / ") + .append(numberOfBuckets) + .append("), t2.rowId, t2.writeId, t2.data from (select ") + .append("count(ROW__ID.writeId) over() as numRows, "); + if (StringUtils.isNotBlank(orderByClause)) { + // in case of reordering the data the writeids cannot be kept. + query.append("MAX(ROW__ID.writeId) over() as writeId, row_number() OVER (") + .append(orderByClause); + } else { + query.append("ROW__ID.writeId as writeId, row_number() OVER (order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC"); + } + query.append(") - 1 AS rowId, NAMED_STRUCT("); for (int i = 0; i < cols.size(); ++i) { query.append(i == 0 ? "'" : ", '").append(cols.get(i).getName()).append("', `") .append(cols.get(i).getName()).append("`"); @@ -368,8 +385,16 @@ class CompactionQueryBuilder { } else { query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName()); } + query.append(" "); if (CompactionType.REBALANCE.equals(compactionType)) { - query.append(" order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC) t2"); + if (StringUtils.isNotBlank(orderByClause)) { + query.append(orderByClause); + } else { + query.append("order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC"); + } + query.append(") t2"); + } else if (CompactionType.MAJOR.equals(compactionType) && insertOnly && StringUtils.isNotBlank(orderByClause)) { + query.append(orderByClause); } } @@ -399,7 +424,7 @@ class CompactionQueryBuilder { long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds(); if (invalidWriteIds.length > 0) { query.append(" where `originalTransaction` not in (").append( - org.apache.commons.lang3.StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")) + StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ",")) .append(")"); } } @@ -462,7 +487,7 @@ class CompactionQueryBuilder { String columnDesc = "`" + col.getName() + "` " + (!insertOnly ? ":" : "") + columnType; columnDescs.add(columnDesc); } - query.append(StringUtils.join(',',columnDescs)); + query.append(StringUtils.join(columnDescs, ',')); query.append(!insertOnly ? ">" : ""); query.append(") "); } @@ -477,7 +502,7 @@ class CompactionQueryBuilder { boolean isFirst; List<String> buckCols = sourceTab.getSd().getBucketCols(); if (buckCols.size() > 0) { - query.append("CLUSTERED BY (").append(StringUtils.join(",", buckCols)).append(") "); + query.append("CLUSTERED BY (").append(StringUtils.join(buckCols, ",")).append(") "); List<Order> sortCols = sourceTab.getSd().getSortCols(); if (sortCols.size() > 0) { query.append("SORTED BY ("); @@ -531,14 +556,14 @@ class CompactionQueryBuilder { SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo(); if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) { query.append(" SKEWED BY (") - .append(StringUtils.join(", ", skewedInfo.getSkewedColNames())).append(") ON "); + .append(StringUtils.join(skewedInfo.getSkewedColNames(), ", ")).append(") ON "); isFirst = true; for (List<String> colValues : skewedInfo.getSkewedColValues()) { if (!isFirst) { query.append(", "); } isFirst = false; - query.append("('").append(StringUtils.join("','", colValues)).append("')"); + query.append("('").append(StringUtils.join(colValues, "','")).append("')"); } query.append(") STORED AS DIRECTORIES"); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java index 591a4b4dfb3..a9849404c7a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java @@ -63,7 +63,8 @@ final class RebalanceQueryCompactor extends QueryCompactor { List<String> createQueries = getCreateQueries(tmpTableName, table, tmpTablePath.toString()); - List<String> compactionQueries = getCompactionQueries(table, context.getPartition(), tmpTableName, numBuckets); + List<String> compactionQueries = getCompactionQueries(table, context.getPartition(), tmpTableName, numBuckets, + context.getCompactionInfo().orderByClause); List<String> dropQueries = getDropQueries(tmpTableName); runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, context.getCompactionInfo(), Lists.newArrayList(tmpTablePath), createQueries, compactionQueries, dropQueries, @@ -82,7 +83,7 @@ final class RebalanceQueryCompactor extends QueryCompactor { .build()); } - private List<String> getCompactionQueries(Table t, Partition p, String tmpName, int numberOfBuckets) { + private List<String> getCompactionQueries(Table t, Partition p, String tmpName, int numberOfBuckets, String orderByClause) { return Lists.newArrayList( new CompactionQueryBuilder( CompactionType.REBALANCE, @@ -92,6 +93,7 @@ final class RebalanceQueryCompactor extends QueryCompactor { .setSourceTab(t) .setSourcePartition(p) .setNumberOfBuckets(numberOfBuckets) + .setOrderByClause(orderByClause) .build()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java index 247164cc8a1..459b7d2bbd5 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java @@ -605,10 +605,15 @@ public class Worker extends RemoteCompactorThread implements MetaStoreThread { * @throws TException */ void open(CompactionInfo ci) throws TException { - this.txnId = msc.openTxn(ci.runAs, TxnType.COMPACTION); + this.txnId = msc.openTxn(ci.runAs, ci.type == CompactionType.REBALANCE ? TxnType.REBALANCE_COMPACTION : TxnType.COMPACTION); status = TxnStatus.OPEN; - LockRequest lockRequest = createLockRequest(ci, txnId, LockType.SHARED_READ, DataOperationType.SELECT); + LockRequest lockRequest; + if (CompactionType.REBALANCE.equals(ci.type)) { + lockRequest = createLockRequest(ci, txnId, LockType.EXCL_WRITE, DataOperationType.UPDATE); + } else { + lockRequest = createLockRequest(ci, txnId, LockType.SHARED_READ, DataOperationType.SELECT); + } LockResponse res = msc.lock(lockRequest); if (res.getState() != LockState.ACQUIRED) { throw new TException("Unable to acquire lock(s) on {" + ci.getFullPartitionName() diff --git a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java index 905d82a0659..8e1f3c52991 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java @@ -1180,19 +1180,19 @@ public class TestWorker extends CompactorTest { } // With high timeout, but fast run we should finish without a problem - @Test(timeout=1000) + @Test(timeout=2000) public void testNormalRun() throws Exception { runTimeoutTest(10000, false, true); } // With low timeout, but slow run we should finish without a problem - @Test(timeout=1000) + @Test(timeout=2000) public void testTimeoutWithInterrupt() throws Exception { runTimeoutTest(1, true, false); } // With low timeout, but slow run we should finish without a problem, even if the interrupt is swallowed - @Test(timeout=1000) + @Test(timeout=2000) public void testTimeoutWithoutInterrupt() throws Exception { runTimeoutTest(1, true, true); } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp index 00343d9054a..8de1ff8573b 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.cpp @@ -618,7 +618,8 @@ int _kTxnTypeValues[] = { TxnType::READ_ONLY, TxnType::COMPACTION, TxnType::MATER_VIEW_REBUILD, - TxnType::SOFT_DELETE + TxnType::SOFT_DELETE, + TxnType::REBALANCE_COMPACTION }; const char* _kTxnTypeNames[] = { "DEFAULT", @@ -626,9 +627,10 @@ const char* _kTxnTypeNames[] = { "READ_ONLY", "COMPACTION", "MATER_VIEW_REBUILD", - "SOFT_DELETE" + "SOFT_DELETE", + "REBALANCE_COMPACTION" }; -const std::map<int, const char*> _TxnType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(6, _kTxnTypeValues, _kTxnTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); +const std::map<int, const char*> _TxnType_VALUES_TO_NAMES(::apache::thrift::TEnumIterator(7, _kTxnTypeValues, _kTxnTypeNames), ::apache::thrift::TEnumIterator(-1, nullptr, nullptr)); std::ostream& operator<<(std::ostream& out, const TxnType::type& val) { std::map<int, const char*>::const_iterator it = _TxnType_VALUES_TO_NAMES.find(val); @@ -26845,6 +26847,11 @@ void CompactionRequest::__set_numberOfBuckets(const int32_t val) { this->numberOfBuckets = val; __isset.numberOfBuckets = true; } + +void CompactionRequest::__set_orderByClause(const std::string& val) { + this->orderByClause = val; +__isset.orderByClause = true; +} std::ostream& operator<<(std::ostream& out, const CompactionRequest& obj) { obj.printTo(out); @@ -26973,6 +26980,14 @@ uint32_t CompactionRequest::read(::apache::thrift::protocol::TProtocol* iprot) { xfer += iprot->skip(ftype); } break; + case 11: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->orderByClause); + this->__isset.orderByClause = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -27052,6 +27067,11 @@ uint32_t CompactionRequest::write(::apache::thrift::protocol::TProtocol* oprot) xfer += oprot->writeI32(this->numberOfBuckets); xfer += oprot->writeFieldEnd(); } + if (this->__isset.orderByClause) { + xfer += oprot->writeFieldBegin("orderByClause", ::apache::thrift::protocol::T_STRING, 11); + xfer += oprot->writeString(this->orderByClause); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -27069,6 +27089,7 @@ void swap(CompactionRequest &a, CompactionRequest &b) { swap(a.initiatorVersion, b.initiatorVersion); swap(a.poolName, b.poolName); swap(a.numberOfBuckets, b.numberOfBuckets); + swap(a.orderByClause, b.orderByClause); swap(a.__isset, b.__isset); } @@ -27083,6 +27104,7 @@ CompactionRequest::CompactionRequest(const CompactionRequest& other978) { initiatorVersion = other978.initiatorVersion; poolName = other978.poolName; numberOfBuckets = other978.numberOfBuckets; + orderByClause = other978.orderByClause; __isset = other978.__isset; } CompactionRequest& CompactionRequest::operator=(const CompactionRequest& other979) { @@ -27096,6 +27118,7 @@ CompactionRequest& CompactionRequest::operator=(const CompactionRequest& other97 initiatorVersion = other979.initiatorVersion; poolName = other979.poolName; numberOfBuckets = other979.numberOfBuckets; + orderByClause = other979.orderByClause; __isset = other979.__isset; return *this; } @@ -27112,6 +27135,7 @@ void CompactionRequest::printTo(std::ostream& out) const { out << ", " << "initiatorVersion="; (__isset.initiatorVersion ? (out << to_string(initiatorVersion)) : (out << "<null>")); out << ", " << "poolName="; (__isset.poolName ? (out << to_string(poolName)) : (out << "<null>")); out << ", " << "numberOfBuckets="; (__isset.numberOfBuckets ? (out << to_string(numberOfBuckets)) : (out << "<null>")); + out << ", " << "orderByClause="; (__isset.orderByClause ? (out << to_string(orderByClause)) : (out << "<null>")); out << ")"; } @@ -27205,6 +27229,11 @@ void CompactionInfoStruct::__set_numberOfBuckets(const int32_t val) { this->numberOfBuckets = val; __isset.numberOfBuckets = true; } + +void CompactionInfoStruct::__set_orderByClause(const std::string& val) { + this->orderByClause = val; +__isset.orderByClause = true; +} std::ostream& operator<<(std::ostream& out, const CompactionInfoStruct& obj) { obj.printTo(out); @@ -27383,6 +27412,14 @@ uint32_t CompactionInfoStruct::read(::apache::thrift::protocol::TProtocol* iprot xfer += iprot->skip(ftype); } break; + case 19: + if (ftype == ::apache::thrift::protocol::T_STRING) { + xfer += iprot->readString(this->orderByClause); + this->__isset.orderByClause = true; + } else { + xfer += iprot->skip(ftype); + } + break; default: xfer += iprot->skip(ftype); break; @@ -27494,6 +27531,11 @@ uint32_t CompactionInfoStruct::write(::apache::thrift::protocol::TProtocol* opro xfer += oprot->writeI32(this->numberOfBuckets); xfer += oprot->writeFieldEnd(); } + if (this->__isset.orderByClause) { + xfer += oprot->writeFieldBegin("orderByClause", ::apache::thrift::protocol::T_STRING, 19); + xfer += oprot->writeString(this->orderByClause); + xfer += oprot->writeFieldEnd(); + } xfer += oprot->writeFieldStop(); xfer += oprot->writeStructEnd(); return xfer; @@ -27519,6 +27561,7 @@ void swap(CompactionInfoStruct &a, CompactionInfoStruct &b) { swap(a.retryRetention, b.retryRetention); swap(a.poolname, b.poolname); swap(a.numberOfBuckets, b.numberOfBuckets); + swap(a.orderByClause, b.orderByClause); swap(a.__isset, b.__isset); } @@ -27541,6 +27584,7 @@ CompactionInfoStruct::CompactionInfoStruct(const CompactionInfoStruct& other981) retryRetention = other981.retryRetention; poolname = other981.poolname; numberOfBuckets = other981.numberOfBuckets; + orderByClause = other981.orderByClause; __isset = other981.__isset; } CompactionInfoStruct& CompactionInfoStruct::operator=(const CompactionInfoStruct& other982) { @@ -27562,6 +27606,7 @@ CompactionInfoStruct& CompactionInfoStruct::operator=(const CompactionInfoStruct retryRetention = other982.retryRetention; poolname = other982.poolname; numberOfBuckets = other982.numberOfBuckets; + orderByClause = other982.orderByClause; __isset = other982.__isset; return *this; } @@ -27586,6 +27631,7 @@ void CompactionInfoStruct::printTo(std::ostream& out) const { out << ", " << "retryRetention="; (__isset.retryRetention ? (out << to_string(retryRetention)) : (out << "<null>")); out << ", " << "poolname="; (__isset.poolname ? (out << to_string(poolname)) : (out << "<null>")); out << ", " << "numberOfBuckets="; (__isset.numberOfBuckets ? (out << to_string(numberOfBuckets)) : (out << "<null>")); + out << ", " << "orderByClause="; (__isset.orderByClause ? (out << to_string(orderByClause)) : (out << "<null>")); out << ")"; } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h index ab8c1c03322..a580adfbe30 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-cpp/hive_metastore_types.h @@ -300,7 +300,8 @@ struct TxnType { READ_ONLY = 2, COMPACTION = 3, MATER_VIEW_REBUILD = 4, - SOFT_DELETE = 5 + SOFT_DELETE = 5, + REBALANCE_COMPACTION = 6 }; }; @@ -10648,7 +10649,7 @@ void swap(HeartbeatTxnRangeResponse &a, HeartbeatTxnRangeResponse &b); std::ostream& operator<<(std::ostream& out, const HeartbeatTxnRangeResponse& obj); typedef struct _CompactionRequest__isset { - _CompactionRequest__isset() : partitionname(false), runas(false), properties(false), initiatorId(false), initiatorVersion(false), poolName(false), numberOfBuckets(false) {} + _CompactionRequest__isset() : partitionname(false), runas(false), properties(false), initiatorId(false), initiatorVersion(false), poolName(false), numberOfBuckets(false), orderByClause(false) {} bool partitionname :1; bool runas :1; bool properties :1; @@ -10656,6 +10657,7 @@ typedef struct _CompactionRequest__isset { bool initiatorVersion :1; bool poolName :1; bool numberOfBuckets :1; + bool orderByClause :1; } _CompactionRequest__isset; class CompactionRequest : public virtual ::apache::thrift::TBase { @@ -10672,7 +10674,8 @@ class CompactionRequest : public virtual ::apache::thrift::TBase { initiatorId(), initiatorVersion(), poolName(), - numberOfBuckets(0) { + numberOfBuckets(0), + orderByClause() { } virtual ~CompactionRequest() noexcept; @@ -10690,6 +10693,7 @@ class CompactionRequest : public virtual ::apache::thrift::TBase { std::string initiatorVersion; std::string poolName; int32_t numberOfBuckets; + std::string orderByClause; _CompactionRequest__isset __isset; @@ -10713,6 +10717,8 @@ class CompactionRequest : public virtual ::apache::thrift::TBase { void __set_numberOfBuckets(const int32_t val); + void __set_orderByClause(const std::string& val); + bool operator == (const CompactionRequest & rhs) const { if (!(dbname == rhs.dbname)) @@ -10749,6 +10755,10 @@ class CompactionRequest : public virtual ::apache::thrift::TBase { return false; else if (__isset.numberOfBuckets && !(numberOfBuckets == rhs.numberOfBuckets)) return false; + if (__isset.orderByClause != rhs.__isset.orderByClause) + return false; + else if (__isset.orderByClause && !(orderByClause == rhs.orderByClause)) + return false; return true; } bool operator != (const CompactionRequest &rhs) const { @@ -10768,7 +10778,7 @@ void swap(CompactionRequest &a, CompactionRequest &b); std::ostream& operator<<(std::ostream& out, const CompactionRequest& obj); typedef struct _CompactionInfoStruct__isset { - _CompactionInfoStruct__isset() : partitionname(false), runas(false), properties(false), toomanyaborts(false), state(false), workerId(false), start(false), highestWriteId(false), errorMessage(false), hasoldabort(false), enqueueTime(false), retryRetention(false), poolname(false), numberOfBuckets(false) {} + _CompactionInfoStruct__isset() : partitionname(false), runas(false), properties(false), toomanyaborts(false), state(false), workerId(false), start(false), highestWriteId(false), errorMessage(false), hasoldabort(false), enqueueTime(false), retryRetention(false), poolname(false), numberOfBuckets(false), orderByClause(false) {} bool partitionname :1; bool runas :1; bool properties :1; @@ -10783,6 +10793,7 @@ typedef struct _CompactionInfoStruct__isset { bool retryRetention :1; bool poolname :1; bool numberOfBuckets :1; + bool orderByClause :1; } _CompactionInfoStruct__isset; class CompactionInfoStruct : public virtual ::apache::thrift::TBase { @@ -10808,7 +10819,8 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase { enqueueTime(0), retryRetention(0), poolname(), - numberOfBuckets(0) { + numberOfBuckets(0), + orderByClause() { } virtual ~CompactionInfoStruct() noexcept; @@ -10834,6 +10846,7 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase { int64_t retryRetention; std::string poolname; int32_t numberOfBuckets; + std::string orderByClause; _CompactionInfoStruct__isset __isset; @@ -10873,6 +10886,8 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase { void __set_numberOfBuckets(const int32_t val); + void __set_orderByClause(const std::string& val); + bool operator == (const CompactionInfoStruct & rhs) const { if (!(id == rhs.id)) @@ -10939,6 +10954,10 @@ class CompactionInfoStruct : public virtual ::apache::thrift::TBase { return false; else if (__isset.numberOfBuckets && !(numberOfBuckets == rhs.numberOfBuckets)) return false; + if (__isset.orderByClause != rhs.__isset.orderByClause) + return false; + else if (__isset.orderByClause && !(orderByClause == rhs.orderByClause)) + return false; return true; } bool operator != (const CompactionInfoStruct &rhs) const { diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java index 04865221775..cd6b544580b 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionInfoStruct.java @@ -29,6 +29,7 @@ package org.apache.hadoop.hive.metastore.api; private static final org.apache.thrift.protocol.TField RETRY_RETENTION_FIELD_DESC = new org.apache.thrift.protocol.TField("retryRetention", org.apache.thrift.protocol.TType.I64, (short)16); private static final org.apache.thrift.protocol.TField POOLNAME_FIELD_DESC = new org.apache.thrift.protocol.TField("poolname", org.apache.thrift.protocol.TType.STRING, (short)17); private static final org.apache.thrift.protocol.TField NUMBER_OF_BUCKETS_FIELD_DESC = new org.apache.thrift.protocol.TField("numberOfBuckets", org.apache.thrift.protocol.TType.I32, (short)18); + private static final org.apache.thrift.protocol.TField ORDER_BY_CLAUSE_FIELD_DESC = new org.apache.thrift.protocol.TField("orderByClause", org.apache.thrift.protocol.TType.STRING, (short)19); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CompactionInfoStructStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CompactionInfoStructTupleSchemeFactory(); @@ -51,6 +52,7 @@ package org.apache.hadoop.hive.metastore.api; private long retryRetention; // optional private @org.apache.thrift.annotation.Nullable java.lang.String poolname; // optional private int numberOfBuckets; // optional + private @org.apache.thrift.annotation.Nullable java.lang.String orderByClause; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -75,7 +77,8 @@ package org.apache.hadoop.hive.metastore.api; ENQUEUE_TIME((short)15, "enqueueTime"), RETRY_RETENTION((short)16, "retryRetention"), POOLNAME((short)17, "poolname"), - NUMBER_OF_BUCKETS((short)18, "numberOfBuckets"); + NUMBER_OF_BUCKETS((short)18, "numberOfBuckets"), + ORDER_BY_CLAUSE((short)19, "orderByClause"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -127,6 +130,8 @@ package org.apache.hadoop.hive.metastore.api; return POOLNAME; case 18: // NUMBER_OF_BUCKETS return NUMBER_OF_BUCKETS; + case 19: // ORDER_BY_CLAUSE + return ORDER_BY_CLAUSE; default: return null; } @@ -177,7 +182,7 @@ package org.apache.hadoop.hive.metastore.api; private static final int __RETRYRETENTION_ISSET_ID = 6; private static final int __NUMBEROFBUCKETS_ISSET_ID = 7; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE,_Fields.HASOLDABORT,_Fields.ENQUEUE_TIME,_Fields.RETRY_RETENTION,_Fields.POOLNAME,_Fields.NUMBER_OF_BUCKETS}; + private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.TOOMANYABORTS,_Fields.STATE,_Fields.WORKER_ID,_Fields.START,_Fields.HIGHEST_WRITE_ID,_Fields.ERROR_MESSAGE,_Fields.HASOLDABORT,_Fields.ENQUEUE_TIME,_Fields.RETRY_RETENTION,_Fields.POOLNAME,_Fields.NUMBER_OF_BUCKETS,_Fields.ORDER_BY_CLAUSE}; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -217,6 +222,8 @@ package org.apache.hadoop.hive.metastore.api; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.NUMBER_OF_BUCKETS, new org.apache.thrift.meta_data.FieldMetaData("numberOfBuckets", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + tmpMap.put(_Fields.ORDER_BY_CLAUSE, new org.apache.thrift.meta_data.FieldMetaData("orderByClause", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionInfoStruct.class, metaDataMap); } @@ -281,6 +288,9 @@ package org.apache.hadoop.hive.metastore.api; this.poolname = other.poolname; } this.numberOfBuckets = other.numberOfBuckets; + if (other.isSetOrderByClause()) { + this.orderByClause = other.orderByClause; + } } public CompactionInfoStruct deepCopy() { @@ -315,6 +325,7 @@ package org.apache.hadoop.hive.metastore.api; this.poolname = null; setNumberOfBucketsIsSet(false); this.numberOfBuckets = 0; + this.orderByClause = null; } public long getId() { @@ -741,6 +752,30 @@ package org.apache.hadoop.hive.metastore.api; __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __NUMBEROFBUCKETS_ISSET_ID, value); } + @org.apache.thrift.annotation.Nullable + public java.lang.String getOrderByClause() { + return this.orderByClause; + } + + public void setOrderByClause(@org.apache.thrift.annotation.Nullable java.lang.String orderByClause) { + this.orderByClause = orderByClause; + } + + public void unsetOrderByClause() { + this.orderByClause = null; + } + + /** Returns true if field orderByClause is set (has been assigned a value) and false otherwise */ + public boolean isSetOrderByClause() { + return this.orderByClause != null; + } + + public void setOrderByClauseIsSet(boolean value) { + if (!value) { + this.orderByClause = null; + } + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case ID: @@ -887,6 +922,14 @@ package org.apache.hadoop.hive.metastore.api; } break; + case ORDER_BY_CLAUSE: + if (value == null) { + unsetOrderByClause(); + } else { + setOrderByClause((java.lang.String)value); + } + break; + } } @@ -947,6 +990,9 @@ package org.apache.hadoop.hive.metastore.api; case NUMBER_OF_BUCKETS: return getNumberOfBuckets(); + case ORDER_BY_CLAUSE: + return getOrderByClause(); + } throw new java.lang.IllegalStateException(); } @@ -994,6 +1040,8 @@ package org.apache.hadoop.hive.metastore.api; return isSetPoolname(); case NUMBER_OF_BUCKETS: return isSetNumberOfBuckets(); + case ORDER_BY_CLAUSE: + return isSetOrderByClause(); } throw new java.lang.IllegalStateException(); } @@ -1173,6 +1221,15 @@ package org.apache.hadoop.hive.metastore.api; return false; } + boolean this_present_orderByClause = true && this.isSetOrderByClause(); + boolean that_present_orderByClause = true && that.isSetOrderByClause(); + if (this_present_orderByClause || that_present_orderByClause) { + if (!(this_present_orderByClause && that_present_orderByClause)) + return false; + if (!this.orderByClause.equals(that.orderByClause)) + return false; + } + return true; } @@ -1250,6 +1307,10 @@ package org.apache.hadoop.hive.metastore.api; if (isSetNumberOfBuckets()) hashCode = hashCode * 8191 + numberOfBuckets; + hashCode = hashCode * 8191 + ((isSetOrderByClause()) ? 131071 : 524287); + if (isSetOrderByClause()) + hashCode = hashCode * 8191 + orderByClause.hashCode(); + return hashCode; } @@ -1441,6 +1502,16 @@ package org.apache.hadoop.hive.metastore.api; return lastComparison; } } + lastComparison = java.lang.Boolean.compare(isSetOrderByClause(), other.isSetOrderByClause()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetOrderByClause()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.orderByClause, other.orderByClause); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1601,6 +1672,16 @@ package org.apache.hadoop.hive.metastore.api; sb.append(this.numberOfBuckets); first = false; } + if (isSetOrderByClause()) { + if (!first) sb.append(", "); + sb.append("orderByClause:"); + if (this.orderByClause == null) { + sb.append("null"); + } else { + sb.append(this.orderByClause); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -1806,6 +1887,14 @@ package org.apache.hadoop.hive.metastore.api; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 19: // ORDER_BY_CLAUSE + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.orderByClause = iprot.readString(); + struct.setOrderByClauseIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1921,6 +2010,13 @@ package org.apache.hadoop.hive.metastore.api; oprot.writeI32(struct.numberOfBuckets); oprot.writeFieldEnd(); } + if (struct.orderByClause != null) { + if (struct.isSetOrderByClause()) { + oprot.writeFieldBegin(ORDER_BY_CLAUSE_FIELD_DESC); + oprot.writeString(struct.orderByClause); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1985,7 +2081,10 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetNumberOfBuckets()) { optionals.set(13); } - oprot.writeBitSet(optionals, 14); + if (struct.isSetOrderByClause()) { + optionals.set(14); + } + oprot.writeBitSet(optionals, 15); if (struct.isSetPartitionname()) { oprot.writeString(struct.partitionname); } @@ -2028,6 +2127,9 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetNumberOfBuckets()) { oprot.writeI32(struct.numberOfBuckets); } + if (struct.isSetOrderByClause()) { + oprot.writeString(struct.orderByClause); + } } @Override @@ -2041,7 +2143,7 @@ package org.apache.hadoop.hive.metastore.api; struct.setTablenameIsSet(true); struct.type = org.apache.hadoop.hive.metastore.api.CompactionType.findByValue(iprot.readI32()); struct.setTypeIsSet(true); - java.util.BitSet incoming = iprot.readBitSet(14); + java.util.BitSet incoming = iprot.readBitSet(15); if (incoming.get(0)) { struct.partitionname = iprot.readString(); struct.setPartitionnameIsSet(true); @@ -2098,6 +2200,10 @@ package org.apache.hadoop.hive.metastore.api; struct.numberOfBuckets = iprot.readI32(); struct.setNumberOfBucketsIsSet(true); } + if (incoming.get(14)) { + struct.orderByClause = iprot.readString(); + struct.setOrderByClauseIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionRequest.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionRequest.java index fee7d81d95c..07fb642b7c4 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionRequest.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/CompactionRequest.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.metastore.api; private static final org.apache.thrift.protocol.TField INITIATOR_VERSION_FIELD_DESC = new org.apache.thrift.protocol.TField("initiatorVersion", org.apache.thrift.protocol.TType.STRING, (short)8); private static final org.apache.thrift.protocol.TField POOL_NAME_FIELD_DESC = new org.apache.thrift.protocol.TField("poolName", org.apache.thrift.protocol.TType.STRING, (short)9); private static final org.apache.thrift.protocol.TField NUMBER_OF_BUCKETS_FIELD_DESC = new org.apache.thrift.protocol.TField("numberOfBuckets", org.apache.thrift.protocol.TType.I32, (short)10); + private static final org.apache.thrift.protocol.TField ORDER_BY_CLAUSE_FIELD_DESC = new org.apache.thrift.protocol.TField("orderByClause", org.apache.thrift.protocol.TType.STRING, (short)11); private static final org.apache.thrift.scheme.SchemeFactory STANDARD_SCHEME_FACTORY = new CompactionRequestStandardSchemeFactory(); private static final org.apache.thrift.scheme.SchemeFactory TUPLE_SCHEME_FACTORY = new CompactionRequestTupleSchemeFactory(); @@ -35,6 +36,7 @@ package org.apache.hadoop.hive.metastore.api; private @org.apache.thrift.annotation.Nullable java.lang.String initiatorVersion; // optional private @org.apache.thrift.annotation.Nullable java.lang.String poolName; // optional private int numberOfBuckets; // optional + private @org.apache.thrift.annotation.Nullable java.lang.String orderByClause; // optional /** The set of fields this struct contains, along with convenience methods for finding and manipulating them. */ public enum _Fields implements org.apache.thrift.TFieldIdEnum { @@ -51,7 +53,8 @@ package org.apache.hadoop.hive.metastore.api; INITIATOR_ID((short)7, "initiatorId"), INITIATOR_VERSION((short)8, "initiatorVersion"), POOL_NAME((short)9, "poolName"), - NUMBER_OF_BUCKETS((short)10, "numberOfBuckets"); + NUMBER_OF_BUCKETS((short)10, "numberOfBuckets"), + ORDER_BY_CLAUSE((short)11, "orderByClause"); private static final java.util.Map<java.lang.String, _Fields> byName = new java.util.HashMap<java.lang.String, _Fields>(); @@ -87,6 +90,8 @@ package org.apache.hadoop.hive.metastore.api; return POOL_NAME; case 10: // NUMBER_OF_BUCKETS return NUMBER_OF_BUCKETS; + case 11: // ORDER_BY_CLAUSE + return ORDER_BY_CLAUSE; default: return null; } @@ -130,7 +135,7 @@ package org.apache.hadoop.hive.metastore.api; // isset id assignments private static final int __NUMBEROFBUCKETS_ISSET_ID = 0; private byte __isset_bitfield = 0; - private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.INITIATOR_ID,_Fields.INITIATOR_VERSION,_Fields.POOL_NAME,_Fields.NUMBER_OF_BUCKETS}; + private static final _Fields optionals[] = {_Fields.PARTITIONNAME,_Fields.RUNAS,_Fields.PROPERTIES,_Fields.INITIATOR_ID,_Fields.INITIATOR_VERSION,_Fields.POOL_NAME,_Fields.NUMBER_OF_BUCKETS,_Fields.ORDER_BY_CLAUSE}; public static final java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> metaDataMap; static { java.util.Map<_Fields, org.apache.thrift.meta_data.FieldMetaData> tmpMap = new java.util.EnumMap<_Fields, org.apache.thrift.meta_data.FieldMetaData>(_Fields.class); @@ -156,6 +161,8 @@ package org.apache.hadoop.hive.metastore.api; new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); tmpMap.put(_Fields.NUMBER_OF_BUCKETS, new org.apache.thrift.meta_data.FieldMetaData("numberOfBuckets", org.apache.thrift.TFieldRequirementType.OPTIONAL, new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.I32))); + tmpMap.put(_Fields.ORDER_BY_CLAUSE, new org.apache.thrift.meta_data.FieldMetaData("orderByClause", org.apache.thrift.TFieldRequirementType.OPTIONAL, + new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))); metaDataMap = java.util.Collections.unmodifiableMap(tmpMap); org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(CompactionRequest.class, metaDataMap); } @@ -208,6 +215,9 @@ package org.apache.hadoop.hive.metastore.api; this.poolName = other.poolName; } this.numberOfBuckets = other.numberOfBuckets; + if (other.isSetOrderByClause()) { + this.orderByClause = other.orderByClause; + } } public CompactionRequest deepCopy() { @@ -227,6 +237,7 @@ package org.apache.hadoop.hive.metastore.api; this.poolName = null; setNumberOfBucketsIsSet(false); this.numberOfBuckets = 0; + this.orderByClause = null; } @org.apache.thrift.annotation.Nullable @@ -486,6 +497,30 @@ package org.apache.hadoop.hive.metastore.api; __isset_bitfield = org.apache.thrift.EncodingUtils.setBit(__isset_bitfield, __NUMBEROFBUCKETS_ISSET_ID, value); } + @org.apache.thrift.annotation.Nullable + public java.lang.String getOrderByClause() { + return this.orderByClause; + } + + public void setOrderByClause(@org.apache.thrift.annotation.Nullable java.lang.String orderByClause) { + this.orderByClause = orderByClause; + } + + public void unsetOrderByClause() { + this.orderByClause = null; + } + + /** Returns true if field orderByClause is set (has been assigned a value) and false otherwise */ + public boolean isSetOrderByClause() { + return this.orderByClause != null; + } + + public void setOrderByClauseIsSet(boolean value) { + if (!value) { + this.orderByClause = null; + } + } + public void setFieldValue(_Fields field, @org.apache.thrift.annotation.Nullable java.lang.Object value) { switch (field) { case DBNAME: @@ -568,6 +603,14 @@ package org.apache.hadoop.hive.metastore.api; } break; + case ORDER_BY_CLAUSE: + if (value == null) { + unsetOrderByClause(); + } else { + setOrderByClause((java.lang.String)value); + } + break; + } } @@ -604,6 +647,9 @@ package org.apache.hadoop.hive.metastore.api; case NUMBER_OF_BUCKETS: return getNumberOfBuckets(); + case ORDER_BY_CLAUSE: + return getOrderByClause(); + } throw new java.lang.IllegalStateException(); } @@ -635,6 +681,8 @@ package org.apache.hadoop.hive.metastore.api; return isSetPoolName(); case NUMBER_OF_BUCKETS: return isSetNumberOfBuckets(); + case ORDER_BY_CLAUSE: + return isSetOrderByClause(); } throw new java.lang.IllegalStateException(); } @@ -742,6 +790,15 @@ package org.apache.hadoop.hive.metastore.api; return false; } + boolean this_present_orderByClause = true && this.isSetOrderByClause(); + boolean that_present_orderByClause = true && that.isSetOrderByClause(); + if (this_present_orderByClause || that_present_orderByClause) { + if (!(this_present_orderByClause && that_present_orderByClause)) + return false; + if (!this.orderByClause.equals(that.orderByClause)) + return false; + } + return true; } @@ -789,6 +846,10 @@ package org.apache.hadoop.hive.metastore.api; if (isSetNumberOfBuckets()) hashCode = hashCode * 8191 + numberOfBuckets; + hashCode = hashCode * 8191 + ((isSetOrderByClause()) ? 131071 : 524287); + if (isSetOrderByClause()) + hashCode = hashCode * 8191 + orderByClause.hashCode(); + return hashCode; } @@ -900,6 +961,16 @@ package org.apache.hadoop.hive.metastore.api; return lastComparison; } } + lastComparison = java.lang.Boolean.compare(isSetOrderByClause(), other.isSetOrderByClause()); + if (lastComparison != 0) { + return lastComparison; + } + if (isSetOrderByClause()) { + lastComparison = org.apache.thrift.TBaseHelper.compareTo(this.orderByClause, other.orderByClause); + if (lastComparison != 0) { + return lastComparison; + } + } return 0; } @@ -1010,6 +1081,16 @@ package org.apache.hadoop.hive.metastore.api; sb.append(this.numberOfBuckets); first = false; } + if (isSetOrderByClause()) { + if (!first) sb.append(", "); + sb.append("orderByClause:"); + if (this.orderByClause == null) { + sb.append("null"); + } else { + sb.append(this.orderByClause); + } + first = false; + } sb.append(")"); return sb.toString(); } @@ -1159,6 +1240,14 @@ package org.apache.hadoop.hive.metastore.api; org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } break; + case 11: // ORDER_BY_CLAUSE + if (schemeField.type == org.apache.thrift.protocol.TType.STRING) { + struct.orderByClause = iprot.readString(); + struct.setOrderByClauseIsSet(true); + } else { + org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); + } + break; default: org.apache.thrift.protocol.TProtocolUtil.skip(iprot, schemeField.type); } @@ -1242,6 +1331,13 @@ package org.apache.hadoop.hive.metastore.api; oprot.writeI32(struct.numberOfBuckets); oprot.writeFieldEnd(); } + if (struct.orderByClause != null) { + if (struct.isSetOrderByClause()) { + oprot.writeFieldBegin(ORDER_BY_CLAUSE_FIELD_DESC); + oprot.writeString(struct.orderByClause); + oprot.writeFieldEnd(); + } + } oprot.writeFieldStop(); oprot.writeStructEnd(); } @@ -1284,7 +1380,10 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetNumberOfBuckets()) { optionals.set(6); } - oprot.writeBitSet(optionals, 7); + if (struct.isSetOrderByClause()) { + optionals.set(7); + } + oprot.writeBitSet(optionals, 8); if (struct.isSetPartitionname()) { oprot.writeString(struct.partitionname); } @@ -1313,6 +1412,9 @@ package org.apache.hadoop.hive.metastore.api; if (struct.isSetNumberOfBuckets()) { oprot.writeI32(struct.numberOfBuckets); } + if (struct.isSetOrderByClause()) { + oprot.writeString(struct.orderByClause); + } } @Override @@ -1324,7 +1426,7 @@ package org.apache.hadoop.hive.metastore.api; struct.setTablenameIsSet(true); struct.type = org.apache.hadoop.hive.metastore.api.CompactionType.findByValue(iprot.readI32()); struct.setTypeIsSet(true); - java.util.BitSet incoming = iprot.readBitSet(7); + java.util.BitSet incoming = iprot.readBitSet(8); if (incoming.get(0)) { struct.partitionname = iprot.readString(); struct.setPartitionnameIsSet(true); @@ -1364,6 +1466,10 @@ package org.apache.hadoop.hive.metastore.api; struct.numberOfBuckets = iprot.readI32(); struct.setNumberOfBucketsIsSet(true); } + if (incoming.get(7)) { + struct.orderByClause = iprot.readString(); + struct.setOrderByClauseIsSet(true); + } } } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java index a785fd4fb4e..8b2aed925c3 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/TxnType.java @@ -14,7 +14,8 @@ public enum TxnType implements org.apache.thrift.TEnum { READ_ONLY(2), COMPACTION(3), MATER_VIEW_REBUILD(4), - SOFT_DELETE(5); + SOFT_DELETE(5), + REBALANCE_COMPACTION(6); private final int value; @@ -48,6 +49,8 @@ public enum TxnType implements org.apache.thrift.TEnum { return MATER_VIEW_REBUILD; case 5: return SOFT_DELETE; + case 6: + return REBALANCE_COMPACTION; default: return null; } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php index efbf2543bb4..338e1cb962b 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionInfoStruct.php @@ -112,6 +112,11 @@ class CompactionInfoStruct 'isRequired' => false, 'type' => TType::I32, ), + 19 => array( + 'var' => 'orderByClause', + 'isRequired' => false, + 'type' => TType::STRING, + ), ); /** @@ -186,6 +191,10 @@ class CompactionInfoStruct * @var int */ public $numberOfBuckets = null; + /** + * @var string + */ + public $orderByClause = null; public function __construct($vals = null) { @@ -244,6 +253,9 @@ class CompactionInfoStruct if (isset($vals['numberOfBuckets'])) { $this->numberOfBuckets = $vals['numberOfBuckets']; } + if (isset($vals['orderByClause'])) { + $this->orderByClause = $vals['orderByClause']; + } } } @@ -392,6 +404,13 @@ class CompactionInfoStruct $xfer += $input->skip($ftype); } break; + case 19: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->orderByClause); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -496,6 +515,11 @@ class CompactionInfoStruct $xfer += $output->writeI32($this->numberOfBuckets); $xfer += $output->writeFieldEnd(); } + if ($this->orderByClause !== null) { + $xfer += $output->writeFieldBegin('orderByClause', TType::STRING, 19); + $xfer += $output->writeString($this->orderByClause); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionRequest.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionRequest.php index 8672354e151..a087f829c3b 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionRequest.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/CompactionRequest.php @@ -80,6 +80,11 @@ class CompactionRequest 'isRequired' => false, 'type' => TType::I32, ), + 11 => array( + 'var' => 'orderByClause', + 'isRequired' => false, + 'type' => TType::STRING, + ), ); /** @@ -122,6 +127,10 @@ class CompactionRequest * @var int */ public $numberOfBuckets = null; + /** + * @var string + */ + public $orderByClause = null; public function __construct($vals = null) { @@ -156,6 +165,9 @@ class CompactionRequest if (isset($vals['numberOfBuckets'])) { $this->numberOfBuckets = $vals['numberOfBuckets']; } + if (isset($vals['orderByClause'])) { + $this->orderByClause = $vals['orderByClause']; + } } } @@ -260,6 +272,13 @@ class CompactionRequest $xfer += $input->skip($ftype); } break; + case 11: + if ($ftype == TType::STRING) { + $xfer += $input->readString($this->orderByClause); + } else { + $xfer += $input->skip($ftype); + } + break; default: $xfer += $input->skip($ftype); break; @@ -332,6 +351,11 @@ class CompactionRequest $xfer += $output->writeI32($this->numberOfBuckets); $xfer += $output->writeFieldEnd(); } + if ($this->orderByClause !== null) { + $xfer += $output->writeFieldBegin('orderByClause', TType::STRING, 11); + $xfer += $output->writeString($this->orderByClause); + $xfer += $output->writeFieldEnd(); + } $xfer += $output->writeFieldStop(); $xfer += $output->writeStructEnd(); return $xfer; diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php index d9200832e4a..6183e577df2 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-php/metastore/TxnType.php @@ -30,6 +30,8 @@ final class TxnType const SOFT_DELETE = 5; + const REBALANCE_COMPACTION = 6; + static public $__names = array( 0 => 'DEFAULT', 1 => 'REPL_CREATED', @@ -37,6 +39,7 @@ final class TxnType 3 => 'COMPACTION', 4 => 'MATER_VIEW_REBUILD', 5 => 'SOFT_DELETE', + 6 => 'REBALANCE_COMPACTION', ); } diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py index 6f66497ca19..e95e0285f6c 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-py/hive_metastore/ttypes.py @@ -381,6 +381,7 @@ class TxnType(object): COMPACTION = 3 MATER_VIEW_REBUILD = 4 SOFT_DELETE = 5 + REBALANCE_COMPACTION = 6 _VALUES_TO_NAMES = { 0: "DEFAULT", @@ -389,6 +390,7 @@ class TxnType(object): 3: "COMPACTION", 4: "MATER_VIEW_REBUILD", 5: "SOFT_DELETE", + 6: "REBALANCE_COMPACTION", } _NAMES_TO_VALUES = { @@ -398,6 +400,7 @@ class TxnType(object): "COMPACTION": 3, "MATER_VIEW_REBUILD": 4, "SOFT_DELETE": 5, + "REBALANCE_COMPACTION": 6, } @@ -15355,11 +15358,12 @@ class CompactionRequest(object): - initiatorVersion - poolName - numberOfBuckets + - orderByClause """ - def __init__(self, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, initiatorId=None, initiatorVersion=None, poolName=None, numberOfBuckets=None,): + def __init__(self, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, initiatorId=None, initiatorVersion=None, poolName=None, numberOfBuckets=None, orderByClause=None,): self.dbname = dbname self.tablename = tablename self.partitionname = partitionname @@ -15370,6 +15374,7 @@ class CompactionRequest(object): self.initiatorVersion = initiatorVersion self.poolName = poolName self.numberOfBuckets = numberOfBuckets + self.orderByClause = orderByClause def read(self, iprot): if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: @@ -15436,6 +15441,11 @@ class CompactionRequest(object): self.numberOfBuckets = iprot.readI32() else: iprot.skip(ftype) + elif fid == 11: + if ftype == TType.STRING: + self.orderByClause = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -15490,6 +15500,10 @@ class CompactionRequest(object): oprot.writeFieldBegin('numberOfBuckets', TType.I32, 10) oprot.writeI32(self.numberOfBuckets) oprot.writeFieldEnd() + if self.orderByClause is not None: + oprot.writeFieldBegin('orderByClause', TType.STRING, 11) + oprot.writeString(self.orderByClause.encode('utf-8') if sys.version_info[0] == 2 else self.orderByClause) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -15535,11 +15549,12 @@ class CompactionInfoStruct(object): - retryRetention - poolname - numberOfBuckets + - orderByClause """ - def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None, hasoldabort=None, enqueueTime=None, retryRetention=None, poolname=None, numberOfBuckets=None,): + def __init__(self, id=None, dbname=None, tablename=None, partitionname=None, type=None, runas=None, properties=None, toomanyaborts=None, state=None, workerId=None, start=None, highestWriteId=None, errorMessage=None, hasoldabort=None, enqueueTime=None, retryRetention=None, poolname=None, numberOfBuckets=None, orderByClause=None,): self.id = id self.dbname = dbname self.tablename = tablename @@ -15558,6 +15573,7 @@ class CompactionInfoStruct(object): self.retryRetention = retryRetention self.poolname = poolname self.numberOfBuckets = numberOfBuckets + self.orderByClause = orderByClause def read(self, iprot): if iprot._fast_decode is not None and isinstance(iprot.trans, TTransport.CReadableTransport) and self.thrift_spec is not None: @@ -15658,6 +15674,11 @@ class CompactionInfoStruct(object): self.numberOfBuckets = iprot.readI32() else: iprot.skip(ftype) + elif fid == 19: + if ftype == TType.STRING: + self.orderByClause = iprot.readString().decode('utf-8', errors='replace') if sys.version_info[0] == 2 else iprot.readString() + else: + iprot.skip(ftype) else: iprot.skip(ftype) iprot.readFieldEnd() @@ -15740,6 +15761,10 @@ class CompactionInfoStruct(object): oprot.writeFieldBegin('numberOfBuckets', TType.I32, 18) oprot.writeI32(self.numberOfBuckets) oprot.writeFieldEnd() + if self.orderByClause is not None: + oprot.writeFieldBegin('orderByClause', TType.STRING, 19) + oprot.writeString(self.orderByClause.encode('utf-8') if sys.version_info[0] == 2 else self.orderByClause) + oprot.writeFieldEnd() oprot.writeFieldStop() oprot.writeStructEnd() @@ -31667,6 +31692,7 @@ CompactionRequest.thrift_spec = ( (8, TType.STRING, 'initiatorVersion', 'UTF8', None, ), # 8 (9, TType.STRING, 'poolName', 'UTF8', None, ), # 9 (10, TType.I32, 'numberOfBuckets', None, None, ), # 10 + (11, TType.STRING, 'orderByClause', 'UTF8', None, ), # 11 ) all_structs.append(CompactionInfoStruct) CompactionInfoStruct.thrift_spec = ( @@ -31689,6 +31715,7 @@ CompactionInfoStruct.thrift_spec = ( (16, TType.I64, 'retryRetention', None, None, ), # 16 (17, TType.STRING, 'poolname', 'UTF8', None, ), # 17 (18, TType.I32, 'numberOfBuckets', None, None, ), # 18 + (19, TType.STRING, 'orderByClause', 'UTF8', None, ), # 19 ) all_structs.append(OptionalCompactionInfoStruct) OptionalCompactionInfoStruct.thrift_spec = ( diff --git a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb index 98ba4f54dd5..b65733171e5 100644 --- a/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb +++ b/standalone-metastore/metastore-common/src/gen/thrift/gen-rb/hive_metastore_types.rb @@ -172,8 +172,9 @@ module TxnType COMPACTION = 3 MATER_VIEW_REBUILD = 4 SOFT_DELETE = 5 - VALUE_MAP = {0 => "DEFAULT", 1 => "REPL_CREATED", 2 => "READ_ONLY", 3 => "COMPACTION", 4 => "MATER_VIEW_REBUILD", 5 => "SOFT_DELETE"} - VALID_VALUES = Set.new([DEFAULT, REPL_CREATED, READ_ONLY, COMPACTION, MATER_VIEW_REBUILD, SOFT_DELETE]).freeze + REBALANCE_COMPACTION = 6 + VALUE_MAP = {0 => "DEFAULT", 1 => "REPL_CREATED", 2 => "READ_ONLY", 3 => "COMPACTION", 4 => "MATER_VIEW_REBUILD", 5 => "SOFT_DELETE", 6 => "REBALANCE_COMPACTION"} + VALID_VALUES = Set.new([DEFAULT, REPL_CREATED, READ_ONLY, COMPACTION, MATER_VIEW_REBUILD, SOFT_DELETE, REBALANCE_COMPACTION]).freeze end module GetTablesExtRequestFields @@ -4481,6 +4482,7 @@ class CompactionRequest INITIATORVERSION = 8 POOLNAME = 9 NUMBEROFBUCKETS = 10 + ORDERBYCLAUSE = 11 FIELDS = { DBNAME => {:type => ::Thrift::Types::STRING, :name => 'dbname'}, @@ -4492,7 +4494,8 @@ class CompactionRequest INITIATORID => {:type => ::Thrift::Types::STRING, :name => 'initiatorId', :optional => true}, INITIATORVERSION => {:type => ::Thrift::Types::STRING, :name => 'initiatorVersion', :optional => true}, POOLNAME => {:type => ::Thrift::Types::STRING, :name => 'poolName', :optional => true}, - NUMBEROFBUCKETS => {:type => ::Thrift::Types::I32, :name => 'numberOfBuckets', :optional => true} + NUMBEROFBUCKETS => {:type => ::Thrift::Types::I32, :name => 'numberOfBuckets', :optional => true}, + ORDERBYCLAUSE => {:type => ::Thrift::Types::STRING, :name => 'orderByClause', :optional => true} } def struct_fields; FIELDS; end @@ -4529,6 +4532,7 @@ class CompactionInfoStruct RETRYRETENTION = 16 POOLNAME = 17 NUMBEROFBUCKETS = 18 + ORDERBYCLAUSE = 19 FIELDS = { ID => {:type => ::Thrift::Types::I64, :name => 'id'}, @@ -4548,7 +4552,8 @@ class CompactionInfoStruct ENQUEUETIME => {:type => ::Thrift::Types::I64, :name => 'enqueueTime', :optional => true}, RETRYRETENTION => {:type => ::Thrift::Types::I64, :name => 'retryRetention', :optional => true}, POOLNAME => {:type => ::Thrift::Types::STRING, :name => 'poolname', :optional => true}, - NUMBEROFBUCKETS => {:type => ::Thrift::Types::I32, :name => 'numberOfBuckets', :optional => true} + NUMBEROFBUCKETS => {:type => ::Thrift::Types::I32, :name => 'numberOfBuckets', :optional => true}, + ORDERBYCLAUSE => {:type => ::Thrift::Types::STRING, :name => 'orderByClause', :optional => true} } def struct_fields; FIELDS; end diff --git a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java index 8d3f060e8d0..f3f0e5d939b 100644 --- a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java +++ b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnQueries.java @@ -61,7 +61,7 @@ public class TxnQueries { " \"CQ_WORKER_VERSION\" AS \"CC_WORKER_VERSION\", \"CQ_INITIATOR_ID\" AS \"CC_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\" AS \"CC_INITIATOR_VERSION\", " + " \"CQ_RETRY_RETENTION\" AS \"CC_RETRY_RETENTION\", \"CQ_NEXT_TXN_ID\" AS \"CC_NEXT_TXN_ID\", \"CQ_TXN_ID\" AS \"CC_TXN_ID\", " + " \"CQ_COMMIT_TIME\" AS \"CC_COMMIT_TIME\", \"CQ_POOL_NAME\" AS \"CC_POOL_NAME\", " + - " \"CQ_NUMBER_OF_BUCKETS\" AS \"CC_NUMBER_OF_BUCKETS\" " + + " \"CQ_NUMBER_OF_BUCKETS\" AS \"CC_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\" AS \"CC_ORDER_BY\" " + " FROM " + " \"COMPACTION_QUEUE\" " + " UNION ALL " + @@ -71,7 +71,7 @@ public class TxnQueries { " \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", \"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", " + " \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " + " -1 , \"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_NEXT_TXN_ID\", \"CC_POOL_NAME\", " + - " \"CC_NUMBER_OF_BUCKETS\" " + + " \"CC_NUMBER_OF_BUCKETS\", \"CC_ORDER_BY\" " + " FROM " + " \"COMPLETED_COMPACTIONS\") XX "; @@ -82,6 +82,7 @@ public class TxnQueries { " \"CC_TBLPROPERTIES\", \"CC_WORKER_ID\", \"CC_START\", \"CC_END\", \"CC_RUN_AS\", " + " \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", \"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", " + " \"CC_ENQUEUE_TIME\", \"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\"," + - " \"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\") " + - " VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; + " \"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\", " + + " \"CC_ORDER_BY\") " + + " VALUES(?,?,?,?,?, ?,?,?,?,?, ?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)"; } diff --git a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift index af20b469d48..bca23c8111c 100644 --- a/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift +++ b/standalone-metastore/metastore-common/src/main/thrift/hive_metastore.thrift @@ -1005,7 +1005,8 @@ enum TxnType { READ_ONLY = 2, COMPACTION = 3, MATER_VIEW_REBUILD = 4, - SOFT_DELETE = 5 + SOFT_DELETE = 5, + REBALANCE_COMPACTION = 6 } // specifies which info to return with GetTablesExtRequest @@ -1294,6 +1295,7 @@ struct CompactionRequest { 8: optional string initiatorVersion 9: optional string poolName 10: optional i32 numberOfBuckets + 11: optional string orderByClause; } struct CompactionInfoStruct { @@ -1315,6 +1317,7 @@ struct CompactionInfoStruct { 16: optional i64 retryRetention, 17: optional string poolname 18: optional i32 numberOfBuckets + 19: optional string orderByClause; } struct OptionalCompactionInfoStruct { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java index 46e88a3d6b0..5e5db301d63 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionInfo.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.hive.metastore.txn; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.ToStringBuilder; import org.apache.hadoop.hive.common.ValidCompactorWriteIdList; import org.apache.hadoop.hive.metastore.api.CompactionInfoStruct; import org.apache.hadoop.hive.metastore.api.CompactionType; @@ -65,6 +67,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> { public long commitTime = 0; public String poolName; public int numberOfBuckets = 0; + public String orderByClause; /** * The highest write id that the compaction job will pay attention to. @@ -148,31 +151,35 @@ public class CompactionInfo implements Comparable<CompactionInfo> { public int compareTo(CompactionInfo o) { return getFullPartitionName().compareTo(o.getFullPartitionName()); } + public String toString() { - return "id:" + id + "," + - "dbname:" + dbname + "," + - "tableName:" + tableName + "," + - "partName:" + partName + "," + - "state:" + state + "," + - "type:" + type + "," + - "enqueueTime:" + enqueueTime + "," + - "commitTime:" + commitTime + "," + - "start:" + start + "," + - "properties:" + properties + "," + - "runAs:" + runAs + "," + - "tooManyAborts:" + tooManyAborts + "," + - "hasOldAbort:" + hasOldAbort + "," + - "highestWriteId:" + highestWriteId + "," + - "errorMessage:" + errorMessage + "," + - "workerId: " + workerId + "," + - "workerVersion: " + workerVersion + "," + - "initiatorId: " + initiatorId + "," + - "initiatorVersion: " + initiatorVersion + "," + - "retryRetention" + retryRetention + "," + - "txnId" + txnId + "," + - "nextTxnId" + nextTxnId + "," + - "poolname" + poolName + "," + - "numberOfBuckets" + numberOfBuckets; + return new ToStringBuilder(this) + .append("id", id) + .append("dbname", dbname) + .append("tableName", tableName) + .append("partName", partName) + .append("state", state) + .append("type", type) + .append("enqueueTime", enqueueTime) + .append("commitTime", commitTime) + .append("start", start) + .append("properties", properties) + .append("runAs", runAs) + .append("tooManyAborts", tooManyAborts) + .append("hasOldAbort", hasOldAbort) + .append("highestWriteId", highestWriteId) + .append("errorMessage", errorMessage) + .append("workerId", workerId) + .append("workerVersion", workerVersion) + .append("initiatorId", initiatorId) + .append("initiatorVersion", initiatorVersion) + .append("retryRetention", retryRetention) + .append("txnId", txnId) + .append("nextTxnId", nextTxnId) + .append("poolName", poolName) + .append("numberOfBuckets", numberOfBuckets) + .append("orderByClause", orderByClause) + .build(); } @Override @@ -225,6 +232,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> { fullCi.commitTime = rs.getLong(22); fullCi.poolName = rs.getString(23); fullCi.numberOfBuckets = rs.getInt(24); + fullCi.orderByClause = rs.getString(25); return fullCi; } static void insertIntoCompletedCompactions(PreparedStatement pStmt, CompactionInfo ci, long endTime) throws SQLException, MetaException { @@ -252,6 +260,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> { pStmt.setLong(22, ci.commitTime); pStmt.setString(23, ci.poolName); pStmt.setInt(24, ci.numberOfBuckets); + pStmt.setString(25, ci.orderByClause); } public static CompactionInfo compactionStructToInfo(CompactionInfoStruct cr) { @@ -295,6 +304,9 @@ public class CompactionInfo implements Comparable<CompactionInfo> { if (cr.isSetNumberOfBuckets()) { ci.numberOfBuckets = cr.getNumberOfBuckets(); } + if (cr.isSetOrderByClause()) { + ci.orderByClause = cr.getOrderByClause(); + } return ci; } @@ -317,6 +329,7 @@ public class CompactionInfo implements Comparable<CompactionInfo> { cr.setRetryRetention(ci.retryRetention); cr.setPoolname(ci.poolName); cr.setNumberOfBuckets(ci.numberOfBuckets); + cr.setOrderByClause(ci.orderByClause); return cr; } diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java index 927833f8d48..12851547693 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java @@ -67,7 +67,7 @@ class CompactionTxnHandler extends TxnHandler { + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", " + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", " + "\"CQ_RETRY_RETENTION\", \"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", " - + "\"CQ_NUMBER_OF_BUCKETS\" " + + "\"CQ_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\" " + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_TXN_ID\" = ?"; private static final String SELECT_COMPACTION_METRICS_CACHE_QUERY = "SELECT \"CMC_METRIC_VALUE\", \"CMC_VERSION\" FROM \"COMPACTION_METRICS_CACHE\" " + @@ -247,8 +247,8 @@ class CompactionTxnHandler extends TxnHandler { dbConn = getDbConn(Connection.TRANSACTION_READ_COMMITTED, connPoolCompaction); StringBuilder sb = new StringBuilder(); sb.append("SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + - "\"CQ_TYPE\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", \"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" " + - "WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "' AND "); + "\"CQ_TYPE\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\", " + + "\"CQ_TBLPROPERTIES\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_STATE\" = '" + INITIATED_STATE + "' AND "); boolean hasPoolName = StringUtils.isNotBlank(rqst.getPoolName()); if(hasPoolName) { sb.append("\"CQ_POOL_NAME\"=?"); @@ -279,7 +279,8 @@ class CompactionTxnHandler extends TxnHandler { info.type = TxnUtils.dbCompactionType2ThriftType(rs.getString(5).charAt(0)); info.poolName = rs.getString(6); info.numberOfBuckets = rs.getInt(7); - info.properties = rs.getString(8); + info.orderByClause = rs.getString(8); + info.properties = rs.getString(9); info.workerId = rqst.getWorkerId(); String workerId = rqst.getWorkerId(); @@ -577,13 +578,15 @@ class CompactionTxnHandler extends TxnHandler { + "\"CC_START\", \"CC_END\", \"CC_RUN_AS\", \"CC_HIGHEST_WRITE_ID\", \"CC_META_INFO\", " + "\"CC_HADOOP_JOB_ID\", \"CC_ERROR_MESSAGE\", \"CC_ENQUEUE_TIME\", " + "\"CC_WORKER_VERSION\", \"CC_INITIATOR_ID\", \"CC_INITIATOR_VERSION\", " - + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\") " + + "\"CC_NEXT_TXN_ID\", \"CC_TXN_ID\", \"CC_COMMIT_TIME\", \"CC_POOL_NAME\", \"CC_NUMBER_OF_BUCKETS\"," + + "\"CC_ORDER_BY\") " + "SELECT \"CQ_ID\", \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\", " + quoteChar(SUCCEEDED_STATE) + ", \"CQ_TYPE\", \"CQ_TBLPROPERTIES\", \"CQ_WORKER_ID\", \"CQ_START\", " + getEpochFn(dbProduct) + ", \"CQ_RUN_AS\", \"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", " + "\"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", \"CQ_ENQUEUE_TIME\", " + "\"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", " - + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\" " + + "\"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", \"CQ_NUMBER_OF_BUCKETS\", " + + "\"CQ_ORDER_BY\" " + "FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"; pStmt = dbConn.prepareStatement(s); pStmt.setLong(1, info.id); @@ -1404,7 +1407,7 @@ class CompactionTxnHandler extends TxnHandler { + "\"CQ_HIGHEST_WRITE_ID\", \"CQ_META_INFO\", \"CQ_HADOOP_JOB_ID\", \"CQ_ERROR_MESSAGE\", " + "\"CQ_ENQUEUE_TIME\", \"CQ_WORKER_VERSION\", \"CQ_INITIATOR_ID\", \"CQ_INITIATOR_VERSION\", " + "\"CQ_RETRY_RETENTION\", \"CQ_NEXT_TXN_ID\", \"CQ_TXN_ID\", \"CQ_COMMIT_TIME\", \"CQ_POOL_NAME\", " - + "\"CQ_NUMBER_OF_BUCKETS\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); + + "\"CQ_NUMBER_OF_BUCKETS\", \"CQ_ORDER_BY\" FROM \"COMPACTION_QUEUE\" WHERE \"CQ_ID\" = ?"); pStmt.setLong(1, ci.id); rs = pStmt.executeQuery(); if (rs.next()) { diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java index ab1db0f8201..bf071327434 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java @@ -1567,7 +1567,7 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { assert true; } - if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn) { + if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn && !MetaStoreServerUtils.isCompactionTxn(txnType)) { moveTxnComponentsToCompleted(stmt, txnid, isUpdateDelete); } else if (isReplayedReplTxn) { if (rqst.isSetWriteEventInfos()) { @@ -3767,6 +3767,9 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if (rqst.isSetNumberOfBuckets()) { buf.append(", \"CQ_NUMBER_OF_BUCKETS\""); } + if (rqst.isSetOrderByClause()) { + buf.append(", \"CQ_ORDER_BY\""); + } if (rqst.getProperties() != null) { buf.append(", \"CQ_TBLPROPERTIES\""); } @@ -3802,6 +3805,10 @@ abstract class TxnHandler implements TxnStore, TxnStore.MutexAPI { if (rqst.isSetNumberOfBuckets()) { buf.append(", ").append(rqst.getNumberOfBuckets()); } + if (rqst.isSetOrderByClause()) { + buf.append(", ?"); + params.add(rqst.getOrderByClause()); + } if (rqst.getProperties() != null) { buf.append(", ?"); params.add(new StringableMap(rqst.getProperties()).toString()); diff --git a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java index d487752f1a3..5c417893590 100644 --- a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java +++ b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/utils/MetaStoreServerUtils.java @@ -89,6 +89,7 @@ import org.apache.hadoop.hive.metastore.api.SerDeInfo; import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.TxnType; import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregator; import org.apache.hadoop.hive.metastore.columnstats.aggr.ColumnStatsAggregatorFactory; @@ -1617,4 +1618,8 @@ public class MetaStoreServerUtils { } return null; } + + public static boolean isCompactionTxn(TxnType txnType) { + return TxnType.COMPACTION.equals(txnType) || TxnType.REBALANCE_COMPACTION.equals(txnType); + } } diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql index cfac2d4a717..f68b1400ba9 100644 --- a/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql +++ b/standalone-metastore/metastore-server/src/main/sql/derby/hive-schema-4.0.0.derby.sql @@ -636,7 +636,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_CLEANER_START bigint, CQ_RETRY_RETENTION bigint NOT NULL DEFAULT 0, CQ_POOL_NAME varchar(128), - CQ_NUMBER_OF_BUCKETS integer + CQ_NUMBER_OF_BUCKETS integer, + CQ_ORDER_BY varchar(4000) ); CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( @@ -668,7 +669,8 @@ CREATE TABLE COMPLETED_COMPACTIONS ( CC_INITIATOR_VERSION varchar(128), CC_WORKER_VERSION varchar(128), CC_POOL_NAME varchar(128), - CC_NUMBER_OF_BUCKETS integer + CC_NUMBER_OF_BUCKETS integer, + CC_ORDER_BY varchar(4000) ); CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION); diff --git a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-4.0.0-alpha-2-to-4.0.0.derby.sql b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-4.0.0-alpha-2-to-4.0.0.derby.sql index 8e11eefec12..498988f7fab 100644 --- a/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-4.0.0-alpha-2-to-4.0.0.derby.sql +++ b/standalone-metastore/metastore-server/src/main/sql/derby/upgrade-4.0.0-alpha-2-to-4.0.0.derby.sql @@ -6,7 +6,11 @@ ALTER TABLE "APP"."PART_COL_STATS" ADD HISTOGRAM BLOB; -- HIVE-26719 ALTER TABLE COMPACTION_QUEUE ADD CQ_NUMBER_OF_BUCKETS INTEGER; -ALTER TABLE COMPLETED_COMPACTIONS ADD CQ_NUMBER_OF_BUCKETS INTEGER; +ALTER TABLE COMPLETED_COMPACTIONS ADD CC_NUMBER_OF_BUCKETS INTEGER; + +-- HIVE-26735 +ALTER TABLE COMPACTION_QUEUE ADD CQ_ORDER_BY VARCHAR(4000); +ALTER TABLE COMPLETED_COMPACTIONS ADD CC_ORDER_BY VARCHAR(4000); -- HIVE-26704 CREATE TABLE MIN_HISTORY_WRITE_ID ( diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql index a3270b35ad2..426a5b961ba 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mssql/hive-schema-4.0.0.mssql.sql @@ -1056,6 +1056,7 @@ CREATE TABLE COMPACTION_QUEUE( CQ_RETRY_RETENTION bigint NOT NULL DEFAULT 0, CQ_POOL_NAME nvarchar(128) NULL, CQ_NUMBER_OF_BUCKETS integer, + CQ_ORDER_BY varchar(4000), PRIMARY KEY CLUSTERED ( CQ_ID ASC @@ -1087,6 +1088,7 @@ CREATE TABLE COMPLETED_COMPACTIONS ( CC_WORKER_VERSION nvarchar(128) NULL, CC_POOL_NAME nvarchar(128) NULL, CC_NUMBER_OF_BUCKETS integer, + CC_ORDER_BY varchar(4000), PRIMARY KEY CLUSTERED ( CC_ID ASC diff --git a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-4.0.0-alpha-2-to-4.0.0.mssql.sql b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-4.0.0-alpha-2-to-4.0.0.mssql.sql index f3a611cc2e9..85c9a85c846 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-4.0.0-alpha-2-to-4.0.0.mssql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mssql/upgrade-4.0.0-alpha-2-to-4.0.0.mssql.sql @@ -6,7 +6,11 @@ ALTER TABLE PART_COL_STATS ADD HISTOGRAM varbinary(max); -- HIVE-26719 ALTER TABLE COMPACTION_QUEUE ADD CQ_NUMBER_OF_BUCKETS INTEGER NULL; -ALTER TABLE COMPLETED_COMPACTIONS ADD CQ_NUMBER_OF_BUCKETS INTEGER NULL; +ALTER TABLE COMPLETED_COMPACTIONS ADD CC_NUMBER_OF_BUCKETS INTEGER NULL; + +-- HIVE-26735 +ALTER TABLE COMPACTION_QUEUE ADD CQ_ORDER_BY VARCHAR(4000); +ALTER TABLE COMPLETED_COMPACTIONS ADD CC_ORDER_BY VARCHAR(4000); -- HIVE-26704 CREATE TABLE MIN_HISTORY_WRITE_ID ( diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql index 89ae964f9aa..5983f9cd2a3 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mysql/hive-schema-4.0.0.mysql.sql @@ -1097,7 +1097,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_CLEANER_START bigint, CQ_RETRY_RETENTION bigint NOT NULL DEFAULT 0, CQ_POOL_NAME varchar(128), - CQ_NUMBER_OF_BUCKETS integer + CQ_NUMBER_OF_BUCKETS integer, + CQ_ORDER_BY varchar(4000) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; CREATE TABLE COMPLETED_COMPACTIONS ( @@ -1124,7 +1125,8 @@ CREATE TABLE COMPLETED_COMPACTIONS ( CC_INITIATOR_VERSION varchar(128), CC_WORKER_VERSION varchar(128), CC_POOL_NAME varchar(128), - CC_NUMBER_OF_BUCKETS integer + CC_NUMBER_OF_BUCKETS integer, + CC_ORDER_BY varchar(4000) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION); diff --git a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-4.0.0-alpha-2-to-4.0.0.mysql.sql b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-4.0.0-alpha-2-to-4.0.0.mysql.sql index 6f2c3ca2002..db6ee535290 100644 --- a/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-4.0.0-alpha-2-to-4.0.0.mysql.sql +++ b/standalone-metastore/metastore-server/src/main/sql/mysql/upgrade-4.0.0-alpha-2-to-4.0.0.mysql.sql @@ -6,7 +6,11 @@ ALTER TABLE PART_COL_STATS ADD HISTOGRAM blob; -- HIVE-26719 ALTER TABLE `COMPACTION_QUEUE` ADD COLUMN `CQ_NUMBER_OF_BUCKETS` INTEGER; -ALTER TABLE `COMPLETED_COMPACTIONS` ADD COLUMN `CQ_NUMBER_OF_BUCKETS` INTEGER; +ALTER TABLE `COMPLETED_COMPACTIONS` ADD COLUMN `CC_NUMBER_OF_BUCKETS` INTEGER; + +-- HIVE-26735 +ALTER TABLE `COMPACTION_QUEUE` ADD COLUMN `CQ_ORDER_BY` VARCHAR(4000); +ALTER TABLE `COMPLETED_COMPACTIONS` ADD COLUMN `CC_ORDER_BY` VARCHAR(4000); -- HIVE-26704 CREATE TABLE MIN_HISTORY_WRITE_ID ( diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql index bee4717c959..d31431538df 100644 --- a/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql +++ b/standalone-metastore/metastore-server/src/main/sql/oracle/hive-schema-4.0.0.oracle.sql @@ -1099,7 +1099,8 @@ CREATE TABLE COMPACTION_QUEUE ( CQ_CLEANER_START NUMBER(19), CQ_RETRY_RETENTION NUMBER(19) DEFAULT 0 NOT NULL, CQ_POOL_NAME varchar(128), - CQ_NUMBER_OF_BUCKETS integer + CQ_NUMBER_OF_BUCKETS integer, + CQ_ORDER_BY varchar(4000) ) ROWDEPENDENCIES; CREATE TABLE NEXT_COMPACTION_QUEUE_ID ( @@ -1131,7 +1132,8 @@ CREATE TABLE COMPLETED_COMPACTIONS ( CC_INITIATOR_VERSION varchar(128), CC_WORKER_VERSION varchar(128), CC_POOL_NAME varchar(128), - CC_NUMBER_OF_BUCKETS integer + CC_NUMBER_OF_BUCKETS integer, + CC_ORDER_BY varchar(4000) ) ROWDEPENDENCIES; CREATE INDEX COMPLETED_COMPACTIONS_RES ON COMPLETED_COMPACTIONS (CC_DATABASE,CC_TABLE,CC_PARTITION); diff --git a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-4.0.0-alpha-2-to-4.0.0.oracle.sql b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-4.0.0-alpha-2-to-4.0.0.oracle.sql index 90c96a56b73..3c57e9f912a 100644 --- a/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-4.0.0-alpha-2-to-4.0.0.oracle.sql +++ b/standalone-metastore/metastore-server/src/main/sql/oracle/upgrade-4.0.0-alpha-2-to-4.0.0.oracle.sql @@ -6,7 +6,11 @@ ALTER TABLE PART_COL_STATS ADD HISTOGRAM BLOB; -- HIVE-26719 ALTER TABLE COMPACTION_QUEUE ADD CQ_NUMBER_OF_BUCKETS INTEGER; -ALTER TABLE COMPLETED_COMPACTIONS ADD CQ_NUMBER_OF_BUCKETS INTEGER; +ALTER TABLE COMPLETED_COMPACTIONS ADD CC_NUMBER_OF_BUCKETS INTEGER; + +-- HIVE-26735 +ALTER TABLE COMPACTION_QUEUE ADD CQ_ORDER_BY VARCHAR(4000); +ALTER TABLE COMPLETED_COMPACTIONS ADD CC_ORDER_BY VARCHAR(4000); -- HIVE-26704 CREATE TABLE MIN_HISTORY_WRITE_ID ( diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql index d52060f3418..502125222da 100644 --- a/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/main/sql/postgres/hive-schema-4.0.0.postgres.sql @@ -1814,7 +1814,8 @@ CREATE TABLE "COMPACTION_QUEUE" ( "CQ_CLEANER_START" bigint, "CQ_RETRY_RETENTION" bigint not null default 0, "CQ_POOL_NAME" varchar(128), - "CQ_NUMBER_OF_BUCKETS" integer + "CQ_NUMBER_OF_BUCKETS" integer, + "CQ_ORDER_BY" varchar(4000) ); CREATE TABLE "NEXT_COMPACTION_QUEUE_ID" ( @@ -1846,7 +1847,8 @@ CREATE TABLE "COMPLETED_COMPACTIONS" ( "CC_INITIATOR_VERSION" varchar(128), "CC_WORKER_VERSION" varchar(128), "CC_POOL_NAME" varchar(128), - "CC_NUMBER_OF_BUCKETS" integer + "CC_NUMBER_OF_BUCKETS" integer, + "CC_ORDER_BY" varchar(4000) ); CREATE INDEX "COMPLETED_COMPACTIONS_RES" ON "COMPLETED_COMPACTIONS" ("CC_DATABASE","CC_TABLE","CC_PARTITION"); diff --git a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-4.0.0-alpha-2-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-4.0.0-alpha-2-to-4.0.0.postgres.sql index ff384c681b9..870b5bb4b26 100644 --- a/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-4.0.0-alpha-2-to-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/main/sql/postgres/upgrade-4.0.0-alpha-2-to-4.0.0.postgres.sql @@ -6,7 +6,11 @@ ALTER TABLE "PART_COL_STATS" ADD "HISTOGRAM" bytea; -- HIVE-26719 ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_NUMBER_OF_BUCKETS" INTEGER; -ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CQ_NUMBER_OF_BUCKETS" INTEGER; +ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CC_NUMBER_OF_BUCKETS" INTEGER; + +-- HIVE-26735 +ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_ORDER_BY" VARCHAR(4000); +ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CC_ORDER_BY" VARCHAR(4000); -- HIVE-26704 CREATE TABLE "MIN_HISTORY_WRITE_ID" ( diff --git a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql index 5cf7fadd3a4..b70634aefb3 100644 --- a/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql +++ b/standalone-metastore/metastore-server/src/test/resources/sql/postgres/upgrade-3.1.3000-to-4.0.0.postgres.sql @@ -145,7 +145,11 @@ ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CC_POOL_NAME" VARCHAR(128); -- HIVE-26719 ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_NUMBER_OF_BUCKETS" INTEGER; -ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CQ_NUMBER_OF_BUCKETS" INTEGER; +ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CC_NUMBER_OF_BUCKETS" INTEGER; + +-- HIVE-26735 +ALTER TABLE "COMPACTION_QUEUE" ADD "CQ_ORDER_BY" VARCHAR(4000); +ALTER TABLE "COMPLETED_COMPACTIONS" ADD "CC_ORDER_BY" VARCHAR(4000); -- HIVE-26704 CREATE TABLE "MIN_HISTORY_WRITE_ID" (