[ https://issues.apache.org/jira/browse/HIVE-26716?focusedWorklogId=827072&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-827072 ]
ASF GitHub Bot logged work on HIVE-26716: ----------------------------------------- Author: ASF GitHub Bot Created on: 18/Nov/22 08:08 Start Date: 18/Nov/22 08:08 Worklog Time Spent: 10m Work Description: kasakrisz commented on code in PR #3746: URL: https://github.com/apache/hive/pull/3746#discussion_r1026004356 ########## itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java: ########## @@ -89,6 +89,270 @@ @SuppressWarnings("deprecation") public class TestCrudCompactorOnTez extends CompactorOnTezTest { + @Test + public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws Exception { + conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false); + conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + + //set grouping size to have 3 buckets, and re-create driver with the new config + conf.set("tez.grouping.min-size", "1000"); + conf.set("tez.grouping.max-size", "80000"); + driver = new Driver(conf); + + final String stageTableName = "stage_rebalance_test"; + final String tableName = "rebalance_test"; + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf); + + TestDataProvider testDataProvider = new TestDataProvider(); + testDataProvider.createFullAcidTable(stageTableName, true, false); + testDataProvider.insertTestDataPartitioned(stageTableName); + + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) " + + "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver); + executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select a, b from " + stageTableName, driver); + + //do some single inserts to have more data in the first bucket. + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('12',12)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('13',13)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('14',14)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('15',15)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('16',16)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('17',17)", driver); + + // Verify buckets and their content before rebalance + Table table = msClient.getTable("default", tableName); + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals("Test setup does not match the expected: different buckets", + Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"), + CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001")); + String[][] expectedBuckets = new String[][] { + { + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12", + "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14", + "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15", + "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17", + }, + { + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3", + }, + { + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3", + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4", + }, + }; + for(int i = 0; i < 3; i++) { + Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]), Review Comment: This is the non-balanced state, right? ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java: ########## @@ -180,6 +188,9 @@ CompactionQueryBuilder setPartitioned(boolean partitioned) { * If true, Create operations for CRUD minor compaction will result in a bucketed table. */ CompactionQueryBuilder setBucketed(boolean bucketed) { + if(rebalance && bucketed) { + throw new IllegalArgumentException("Rebalance compaction is supported only on non-bucketed tables!"); Review Comment: IIUC the rebalancing feature is about rebalancing buckets so I guess `non-bucketed` means `implicitly bucketed` here. WDYT? Btw the variable `bucketed` is also confusing since it refers to explicit bucketing but it is out of scope of this patch. ########## itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java: ########## @@ -89,6 +89,270 @@ @SuppressWarnings("deprecation") public class TestCrudCompactorOnTez extends CompactorOnTezTest { + @Test + public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws Exception { + conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false); + conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + + //set grouping size to have 3 buckets, and re-create driver with the new config + conf.set("tez.grouping.min-size", "1000"); + conf.set("tez.grouping.max-size", "80000"); + driver = new Driver(conf); + + final String stageTableName = "stage_rebalance_test"; + final String tableName = "rebalance_test"; + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf); + + TestDataProvider testDataProvider = new TestDataProvider(); + testDataProvider.createFullAcidTable(stageTableName, true, false); + testDataProvider.insertTestDataPartitioned(stageTableName); + + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) " + + "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver); + executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select a, b from " + stageTableName, driver); + + //do some single inserts to have more data in the first bucket. + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('12',12)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('13',13)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('14',14)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('15',15)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('16',16)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('17',17)", driver); + + // Verify buckets and their content before rebalance + Table table = msClient.getTable("default", tableName); + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals("Test setup does not match the expected: different buckets", + Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"), + CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001")); + String[][] expectedBuckets = new String[][] { + { + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12", + "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14", + "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15", + "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17", + }, + { + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3", + }, + { + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3", + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4", + }, + }; + for(int i = 0; i < 3; i++) { + Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]), + testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + "")); + } + + //Try to do a rebalancing compaction + executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance'", driver); + runWorker(conf); + + //Check if the compaction succeed + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, compacts.size()); + Assert.assertEquals("Expecting compaction state 'ready for cleaning' and found:" + compacts.get(0).getState(), + "ready for cleaning", compacts.get(0).getState()); Review Comment: nit. Is this constant feasible here? https://github.com/apache/hive/blob/497553016d804576cdf2262e5869541c71e2efbd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java#L101 ########## itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java: ########## @@ -89,6 +89,270 @@ @SuppressWarnings("deprecation") public class TestCrudCompactorOnTez extends CompactorOnTezTest { + @Test + public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws Exception { + conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false); + conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + + //set grouping size to have 3 buckets, and re-create driver with the new config + conf.set("tez.grouping.min-size", "1000"); + conf.set("tez.grouping.max-size", "80000"); + driver = new Driver(conf); + + final String stageTableName = "stage_rebalance_test"; + final String tableName = "rebalance_test"; + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf); + + TestDataProvider testDataProvider = new TestDataProvider(); + testDataProvider.createFullAcidTable(stageTableName, true, false); + testDataProvider.insertTestDataPartitioned(stageTableName); + + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) " + + "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver); + executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select a, b from " + stageTableName, driver); + + //do some single inserts to have more data in the first bucket. + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('12',12)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('13',13)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('14',14)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('15',15)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('16',16)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('17',17)", driver); + + // Verify buckets and their content before rebalance + Table table = msClient.getTable("default", tableName); + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals("Test setup does not match the expected: different buckets", + Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"), + CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001")); + String[][] expectedBuckets = new String[][] { + { + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12", + "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14", + "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15", + "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17", + }, + { + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3", + }, + { + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3", + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4", + }, + }; + for(int i = 0; i < 3; i++) { + Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]), + testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + "")); + } + + //Try to do a rebalancing compaction + executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance'", driver); + runWorker(conf); + + //Check if the compaction succeed + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, compacts.size()); + Assert.assertEquals("Expecting compaction state 'ready for cleaning' and found:" + compacts.get(0).getState(), + "ready for cleaning", compacts.get(0).getState()); + + // Verify buckets and their content after rebalance + Assert.assertEquals("Buckets does not match after compaction", + Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"), + CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000007_v0000020")); + expectedBuckets = new String[][] { + { + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3", + }, + { + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t2\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t3", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t4\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t4\t3", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":10}\t2\t3", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":11}\t3\t4", + }, + { + "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12", + "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13", + "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14", + "{\"writeid\":5,\"bucketid\":537001984,\"rowid\":15}\t15\t15", + "{\"writeid\":6,\"bucketid\":537001984,\"rowid\":16}\t16\t16", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17", + }, + }; + for(int i = 0; i < 3; i++) { + Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]), + testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + "")); + } + } + + @Test + public void testRebalanceCompactionPartitionedWithoutBuckets() throws Exception { + conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false); + conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + + //set grouping size to have 3 buckets, and re-create driver with the new config + conf.set("tez.grouping.min-size", "1"); + driver = new Driver(conf); + + final String stageTableName = "stage_rebalance_test"; + final String tableName = "rebalance_test"; + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf); + + TestDataProvider testDataProvider = new TestDataProvider(); + testDataProvider.createFullAcidTable(stageTableName, true, false); + executeStatementOnDriver("insert into " + stageTableName +" values " + + "('1',1,'yesterday'), ('1',2,'yesterday'), ('1',3, 'yesterday'), ('1',4, 'yesterday'), " + + "('2',1,'today'), ('2',2,'today'), ('2',3,'today'), ('2',4, 'today'), " + + "('3',1,'tomorrow'), ('3',2,'tomorrow'), ('3',3,'tomorrow'), ('3',4,'tomorrow')", + driver); + + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) " + + "PARTITIONED BY (ds string) STORED AS ORC TBLPROPERTIES('transactional'='true')", driver); + executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " partition (ds='tomorrow') select a, b from " + stageTableName, driver); + + //do some single inserts to have more data in the first bucket. + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('12',12,'tomorrow')", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('13',13,'tomorrow')", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('14',14,'tomorrow')", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('15',15,'tomorrow')", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('16',16,'tomorrow')", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('17',17,'tomorrow')", driver); + + // Verify buckets and their content before rebalance in partition ds=tomorrow + Table table = msClient.getTable("default", tableName); + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals("Test setup does not match the expected: different buckets", + Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"), + CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow", "base_0000001")); + String[][] expectedBuckets = new String[][] { + { + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12\ttomorrow", + "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13\ttomorrow", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14\ttomorrow", + "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15\ttomorrow", + "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16\ttomorrow", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17\ttomorrow", + }, + { + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t1\ttomorrow", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t2\ttomorrow", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t3\t3\ttomorrow", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t3\t4\ttomorrow", + }, + { + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t1\t1\ttomorrow", + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t1\t2\ttomorrow", + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":2}\t1\t3\ttomorrow", + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":3}\t1\t4\ttomorrow", + }, + }; + for(int i = 0; i < 3; i++) { + Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]), + testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + "")); + } + + //Try to do a rebalancing compaction + executeStatementOnDriver("ALTER TABLE " + tableName + " PARTITION (ds='tomorrow') COMPACT 'rebalance'", driver); + runWorker(conf); + + //Check if the compaction succeed + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, compacts.size()); + Assert.assertEquals("Expecting compaction state 'ready for cleaning' and found:" + compacts.get(0).getState(), + "ready for cleaning", compacts.get(0).getState()); + + // Verify buckets and their content after rebalance in partition ds=tomorrow + Assert.assertEquals("Buckets does not match after compaction", + Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"), + CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow", "base_0000007_v0000016")); + expectedBuckets = new String[][] { + { + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t3\t1\ttomorrow", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t3\t2\ttomorrow", + }, + { + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t3\t3\ttomorrow", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t4\ttomorrow", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t1\t1\ttomorrow", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t1\t2\ttomorrow", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":10}\t1\t3\ttomorrow", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":11}\t1\t4\ttomorrow", + }, + { + "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12\ttomorrow", + "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13\ttomorrow", + "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14\ttomorrow", + "{\"writeid\":5,\"bucketid\":537001984,\"rowid\":15}\t15\t15\ttomorrow", + "{\"writeid\":6,\"bucketid\":537001984,\"rowid\":16}\t16\t16\ttomorrow", + "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17\ttomorrow", + }, + }; + for(int i = 0; i < 3; i++) { + Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]), + testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + "")); + } + } + + @Test + public void testRebalanceCompactionNotPartitionedWithBuckets() throws Exception { Review Comment: nit. Could you please rename these tests to include the bucketing type of table in the method name? ex.: ``` testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable testRebalanceCompactionOfNotPartitionedExplicitlyBucketedTable ... ``` ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MRCompactor.java: ########## @@ -261,7 +285,7 @@ public void run(HiveConf conf, String jobName, Table t, Partition p, StorageDesc StringableList dirsToSearch = new StringableList(); Path baseDir = null; - if (ci.isMajorCompaction()) { + if (ci.type.equals(CompactionType.MAJOR)) { Review Comment: swap operands for null safety ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java: ########## @@ -203,19 +214,23 @@ CompactionQueryBuilder setIsDeleteDelta(boolean deleteDelta) { * @param resultTableName the name of the table we are running the operation on * @throws IllegalArgumentException if compactionType is null */ - CompactionQueryBuilder(CompactionType compactionType, Operation operation, + CompactionQueryBuilder(CompactionType compactionType, Operation operation, boolean crud, String resultTableName) { if (compactionType == null) { throw new IllegalArgumentException("CompactionQueryBuilder.CompactionType cannot be null"); } + this.compactionType = compactionType; this.operation = operation; this.resultTableName = resultTableName; - major = compactionType == CompactionType.MAJOR_CRUD - || compactionType == CompactionType.MAJOR_INSERT_ONLY; - crud = - compactionType == CompactionType.MAJOR_CRUD || compactionType == CompactionType.MINOR_CRUD; - minor = !major; + this.crud = crud; insertOnly = !crud; + major = compactionType.equals(CompactionType.MAJOR); + minor = compactionType.equals(CompactionType.MINOR); + rebalance = compactionType.equals(CompactionType.REBALANCE); Review Comment: You can swap the operands in these expressions to be null safe. ex: ``` CompactionType.MAJOR.equals(compactionType) ``` or ``` CompactionType.MAJOR == compactionType ``` also works since these are enum constants. Btw why do we have three boolean fields for `major`, `minor` and `rebalance`? Would it be enough to store the `compactionType` in one `CompactionType` enum field instead? Going forward the code blocks depend on the values of these could be extracted to 3 classes having a common interface. This is just an idea and probably worth doing such refactors in a separate patch. ########## itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java: ########## @@ -89,6 +89,270 @@ @SuppressWarnings("deprecation") public class TestCrudCompactorOnTez extends CompactorOnTezTest { + @Test + public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws Exception { + conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true); + conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false); + conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false); + + //set grouping size to have 3 buckets, and re-create driver with the new config + conf.set("tez.grouping.min-size", "1000"); + conf.set("tez.grouping.max-size", "80000"); + driver = new Driver(conf); + + final String stageTableName = "stage_rebalance_test"; + final String tableName = "rebalance_test"; + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf); + + TestDataProvider testDataProvider = new TestDataProvider(); + testDataProvider.createFullAcidTable(stageTableName, true, false); + testDataProvider.insertTestDataPartitioned(stageTableName); + + executeStatementOnDriver("drop table if exists " + tableName, driver); + executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) " + + "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver); + executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select a, b from " + stageTableName, driver); + + //do some single inserts to have more data in the first bucket. + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('12',12)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('13',13)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('14',14)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('15',15)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('16',16)", driver); + executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values ('17',17)", driver); + + // Verify buckets and their content before rebalance + Table table = msClient.getTable("default", tableName); + FileSystem fs = FileSystem.get(conf); + Assert.assertEquals("Test setup does not match the expected: different buckets", + Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"), + CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001")); + String[][] expectedBuckets = new String[][] { + { + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2", + "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3", + "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12", + "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13", + "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14", + "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15", + "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16", + "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17", + }, + { + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4", + "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3", + }, + { + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3", + "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4", + }, + }; + for(int i = 0; i < 3; i++) { + Assert.assertEquals("rebalanced bucket " + i, Arrays.asList(expectedBuckets[i]), + testDataProvider.getBucketData(tableName, BucketCodec.V1.encode(options.bucket(i)) + "")); + } + + //Try to do a rebalancing compaction + executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 'rebalance'", driver); + runWorker(conf); + + //Check if the compaction succeed + TxnStore txnHandler = TxnUtils.getTxnStore(conf); + ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest()); + List<ShowCompactResponseElement> compacts = rsp.getCompacts(); + Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, compacts.size()); + Assert.assertEquals("Expecting compaction state 'ready for cleaning' and found:" + compacts.get(0).getState(), + "ready for cleaning", compacts.get(0).getState()); Review Comment: Can [verifySuccessfulCompaction](https://github.com/apache/hive/blob/497553016d804576cdf2262e5869541c71e2efbd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java#L164) replace this block? ########## ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java: ########## @@ -52,6 +55,7 @@ import static org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2.swapTxnManager; import static org.mockito.Matchers.any; +import static org.powermock.api.mockito.PowerMockito.when; Review Comment: Can the usage of `powermock` be avoided? ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java: ########## @@ -402,20 +413,14 @@ protected Boolean findNextCompactionAndExecute(boolean collectGenericStats, bool todo Find a more generic approach to collecting files in the same logical bucket to compact within the same task (currently we're using Tez split grouping). */ - QueryCompactor queryCompactor = QueryCompactorFactory.getQueryCompactor(t, conf, ci); - computeStats = (queryCompactor == null && collectMrStats) || collectGenericStats; + Compactor compactor = compactorFactory.getQueryCompactor(msc, t, conf, ci); + computeStats = (compactor == null && collectMrStats) || collectGenericStats; LOG.info("Starting " + ci.type.toString() + " compaction for " + ci.getFullPartitionName() + ", id:" + ci.id + " in " + compactionTxn + " with compute stats set to " + computeStats); + LOG.info("Will compact id: " + ci.id + " with compactor class: " + compactor.getClass().getName()); - if (queryCompactor != null) { - LOG.info("Will compact id: " + ci.id + " with query-based compactor class: " - + queryCompactor.getClass().getName()); - queryCompactor.runCompaction(conf, t, p, sd, tblValidWriteIds, ci, dir); - } else { - LOG.info("Will compact id: " + ci.id + " via MR job"); - runCompactionViaMrJob(ci, t, p, sd, tblValidWriteIds, jobName, dir); - } + compactor.runCompaction(conf, t, p, sd, tblValidWriteIds, ci, dir); Review Comment: Can `compactor` be null here? ########## ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java: ########## @@ -1040,10 +1042,13 @@ public void testDoesNotGatherStatsIfCompactionFails() throws Exception { txnHandler.compact(new CompactionRequest("default", "mtwb", CompactionType.MINOR)); + CompactorFactory mockedFactory = Mockito.mock(CompactorFactory.class); + when(mockedFactory.getQueryCompactor(any(), any(), any(), any())).thenThrow(new RuntimeException()); + Worker worker = Mockito.spy(new Worker()); - Mockito.when(worker.getMrCompactor()).thenThrow(RuntimeException.class); worker.setConf(conf); worker.init(new AtomicBoolean(true)); + Whitebox.setInternalState(worker, "compactorFactory", mockedFactory); Review Comment: How about adding a constructor which takes the `compactorFactory` as a parameter? ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java: ########## @@ -128,8 +111,8 @@ void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor } } for (String query : compactionQueries) { - LOG.info("Running {} compaction via query: {}", compactionInfo.isMajorCompaction() ? "major" : "minor", query); - if (!compactionInfo.isMajorCompaction()) { + LOG.info("Running {} compaction via query: {}", compactionInfo.type, query); + if (compactionInfo.type.equals(CompactionType.MINOR)) { Review Comment: swap operands ########## ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java: ########## @@ -52,7 +53,7 @@ public final class StatsUpdater { */ public void gatherStats(CompactionInfo ci, HiveConf conf, String userName, String compactionQueueName) { try { - if (!ci.isMajorCompaction()) { + if (ci.type.equals(CompactionType.MINOR)) { Review Comment: swap operands Issue Time Tracking ------------------- Worklog Id: (was: 827072) Time Spent: 2.5h (was: 2h 20m) > Query based Rebalance compaction on full acid tables > ---------------------------------------------------- > > Key: HIVE-26716 > URL: https://issues.apache.org/jira/browse/HIVE-26716 > Project: Hive > Issue Type: Sub-task > Components: Hive > Reporter: László Végh > Assignee: László Végh > Priority: Major > Labels: ACID, compaction, pull-request-available > Time Spent: 2.5h > Remaining Estimate: 0h > > Support rebalancing compaction on fully ACID tables. -- This message was sent by Atlassian Jira (v8.20.10#820010)