[ 
https://issues.apache.org/jira/browse/HIVE-26716?focusedWorklogId=827072&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-827072
 ]

ASF GitHub Bot logged work on HIVE-26716:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Nov/22 08:08
            Start Date: 18/Nov/22 08:08
    Worklog Time Spent: 10m 
      Work Description: kasakrisz commented on code in PR #3746:
URL: https://github.com/apache/hive/pull/3746#discussion_r1026004356


##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
 @SuppressWarnings("deprecation")
 public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
+  @Test
+  public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws 
Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new 
config
+    conf.set("tez.grouping.min-size", "1000");
+    conf.set("tez.grouping.max-size", "80000");
+    driver = new Driver(conf);
+
+    final String stageTableName = "stage_rebalance_test";
+    final String tableName = "rebalance_test";
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(stageTableName, true, false);
+    testDataProvider.insertTestDataPartitioned(stageTableName);
+
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) 
" +
+        "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select 
a, b from " + stageTableName, driver);
+
+    //do some single inserts to have more data in the first bucket.
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('12',12)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('13',13)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('14',14)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('15',15)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('16',16)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('17',17)", driver);
+
+    // Verify buckets and their content before rebalance
+    Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertEquals("Test setup does not match the expected: different 
buckets",
+        Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+        CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+    String[][] expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+            "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+            "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+            "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),

Review Comment:
   This is the non-balanced state, right?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java:
##########
@@ -180,6 +188,9 @@ CompactionQueryBuilder setPartitioned(boolean partitioned) {
    * If true, Create operations for CRUD minor compaction will result in a 
bucketed table.
    */
   CompactionQueryBuilder setBucketed(boolean bucketed) {
+    if(rebalance && bucketed) {
+      throw new IllegalArgumentException("Rebalance compaction is supported 
only on non-bucketed tables!");

Review Comment:
   IIUC the rebalancing feature is about rebalancing buckets so I guess 
`non-bucketed` means `implicitly bucketed` here. WDYT?
   Btw the variable `bucketed` is also confusing since it refers to explicit 
bucketing but it is out of scope of this patch.



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
 @SuppressWarnings("deprecation")
 public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
+  @Test
+  public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws 
Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new 
config
+    conf.set("tez.grouping.min-size", "1000");
+    conf.set("tez.grouping.max-size", "80000");
+    driver = new Driver(conf);
+
+    final String stageTableName = "stage_rebalance_test";
+    final String tableName = "rebalance_test";
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(stageTableName, true, false);
+    testDataProvider.insertTestDataPartitioned(stageTableName);
+
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) 
" +
+        "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select 
a, b from " + stageTableName, driver);
+
+    //do some single inserts to have more data in the first bucket.
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('12',12)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('13',13)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('14',14)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('15',15)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('16',16)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('17',17)", driver);
+
+    // Verify buckets and their content before rebalance
+    Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertEquals("Test setup does not match the expected: different 
buckets",
+        Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+        CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+    String[][] expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+            "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+            "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+            "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+
+    //Try to do a rebalancing compaction
+    executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 
'rebalance'", driver);
+    runWorker(conf);
+
+    //Check if the compaction succeed
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, 
compacts.size());
+    Assert.assertEquals("Expecting compaction state 'ready for cleaning' and 
found:" + compacts.get(0).getState(),
+        "ready for cleaning", compacts.get(0).getState());

Review Comment:
   nit. Is this constant feasible here?
   
https://github.com/apache/hive/blob/497553016d804576cdf2262e5869541c71e2efbd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java#L101



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
 @SuppressWarnings("deprecation")
 public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
+  @Test
+  public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws 
Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new 
config
+    conf.set("tez.grouping.min-size", "1000");
+    conf.set("tez.grouping.max-size", "80000");
+    driver = new Driver(conf);
+
+    final String stageTableName = "stage_rebalance_test";
+    final String tableName = "rebalance_test";
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(stageTableName, true, false);
+    testDataProvider.insertTestDataPartitioned(stageTableName);
+
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) 
" +
+        "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select 
a, b from " + stageTableName, driver);
+
+    //do some single inserts to have more data in the first bucket.
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('12',12)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('13',13)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('14',14)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('15',15)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('16',16)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('17',17)", driver);
+
+    // Verify buckets and their content before rebalance
+    Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertEquals("Test setup does not match the expected: different 
buckets",
+        Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+        CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+    String[][] expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+            "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+            "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+            "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+
+    //Try to do a rebalancing compaction
+    executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 
'rebalance'", driver);
+    runWorker(conf);
+
+    //Check if the compaction succeed
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, 
compacts.size());
+    Assert.assertEquals("Expecting compaction state 'ready for cleaning' and 
found:" + compacts.get(0).getState(),
+        "ready for cleaning", compacts.get(0).getState());
+
+    // Verify buckets and their content after rebalance
+    Assert.assertEquals("Buckets does not match after compaction",
+        Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"),
+        CompactorTestUtil.getBucketFileNames(fs, table, null, 
"base_0000007_v0000020"));
+    expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t2\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t4\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t4\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":10}\t2\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":11}\t3\t4",
+        },
+        {
+            "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12",
+            "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13",
+            "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14",
+            "{\"writeid\":5,\"bucketid\":537001984,\"rowid\":15}\t15\t15",
+            "{\"writeid\":6,\"bucketid\":537001984,\"rowid\":16}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+  }
+
+  @Test
+  public void testRebalanceCompactionPartitionedWithoutBuckets() throws 
Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new 
config
+    conf.set("tez.grouping.min-size", "1");
+    driver = new Driver(conf);
+
+    final String stageTableName = "stage_rebalance_test";
+    final String tableName = "rebalance_test";
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(stageTableName, true, false);
+    executeStatementOnDriver("insert into " + stageTableName +" values " +
+            "('1',1,'yesterday'), ('1',2,'yesterday'), ('1',3, 'yesterday'), 
('1',4, 'yesterday'), " +
+            "('2',1,'today'), ('2',2,'today'), ('2',3,'today'), ('2',4, 
'today'), " +
+            "('3',1,'tomorrow'), ('3',2,'tomorrow'), ('3',3,'tomorrow'), 
('3',4,'tomorrow')",
+        driver);
+
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) 
" +
+        "PARTITIONED BY (ds string) STORED AS ORC 
TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " 
partition (ds='tomorrow') select a, b from " + stageTableName, driver);
+
+    //do some single inserts to have more data in the first bucket.
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('12',12,'tomorrow')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('13',13,'tomorrow')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('14',14,'tomorrow')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('15',15,'tomorrow')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('16',16,'tomorrow')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('17',17,'tomorrow')", driver);
+
+    // Verify buckets and their content before rebalance in partition 
ds=tomorrow
+    Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertEquals("Test setup does not match the expected: different 
buckets",
+        Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+        CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow", 
"base_0000001"));
+    String[][] expectedBuckets = new String[][] {
+        {
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow",
+            
"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12\ttomorrow",
+            
"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13\ttomorrow",
+            
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14\ttomorrow",
+            
"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15\ttomorrow",
+            
"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16\ttomorrow",
+            
"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17\ttomorrow",
+        },
+        {
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t2\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t3\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t3\t4\ttomorrow",
+        },
+        {
+            
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t1\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t1\t2\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":2}\t1\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":3}\t1\t4\ttomorrow",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+
+    //Try to do a rebalancing compaction
+    executeStatementOnDriver("ALTER TABLE " + tableName + " PARTITION 
(ds='tomorrow') COMPACT 'rebalance'", driver);
+    runWorker(conf);
+
+    //Check if the compaction succeed
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, 
compacts.size());
+    Assert.assertEquals("Expecting compaction state 'ready for cleaning' and 
found:" + compacts.get(0).getState(),
+        "ready for cleaning", compacts.get(0).getState());
+
+    // Verify buckets and their content after rebalance in partition 
ds=tomorrow
+    Assert.assertEquals("Buckets does not match after compaction",
+        Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"),
+        CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow", 
"base_0000007_v0000016"));
+    expectedBuckets = new String[][] {
+        {
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t3\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t3\t2\ttomorrow",
+        },
+        {
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t3\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t4\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t1\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t1\t2\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":10}\t1\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":11}\t1\t4\ttomorrow",
+        },
+        {
+            
"{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12\ttomorrow",
+            
"{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13\ttomorrow",
+            
"{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14\ttomorrow",
+            
"{\"writeid\":5,\"bucketid\":537001984,\"rowid\":15}\t15\t15\ttomorrow",
+            
"{\"writeid\":6,\"bucketid\":537001984,\"rowid\":16}\t16\t16\ttomorrow",
+            
"{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17\ttomorrow",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+  }
+
+  @Test
+  public void testRebalanceCompactionNotPartitionedWithBuckets() throws 
Exception {

Review Comment:
   nit. Could you please rename these tests to include the bucketing type of 
table in the method name? ex.:
   ```
   testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable
   testRebalanceCompactionOfNotPartitionedExplicitlyBucketedTable
   ...
   ```
   



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MRCompactor.java:
##########
@@ -261,7 +285,7 @@ public void run(HiveConf conf, String jobName, Table t, 
Partition p, StorageDesc
 
     StringableList dirsToSearch = new StringableList();
     Path baseDir = null;
-    if (ci.isMajorCompaction()) {
+    if (ci.type.equals(CompactionType.MAJOR)) {

Review Comment:
   swap operands for null safety



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java:
##########
@@ -203,19 +214,23 @@ CompactionQueryBuilder setIsDeleteDelta(boolean 
deleteDelta) {
    * @param resultTableName the name of the table we are running the operation 
on
    * @throws IllegalArgumentException if compactionType is null
    */
-  CompactionQueryBuilder(CompactionType compactionType, Operation operation,
+  CompactionQueryBuilder(CompactionType compactionType, Operation operation, 
boolean crud,
       String resultTableName) {
     if (compactionType == null) {
       throw new 
IllegalArgumentException("CompactionQueryBuilder.CompactionType cannot be 
null");
     }
+    this.compactionType = compactionType;
     this.operation = operation;
     this.resultTableName = resultTableName;
-    major = compactionType == CompactionType.MAJOR_CRUD
-        || compactionType == CompactionType.MAJOR_INSERT_ONLY;
-    crud =
-        compactionType == CompactionType.MAJOR_CRUD || compactionType == 
CompactionType.MINOR_CRUD;
-    minor = !major;
+    this.crud = crud;
     insertOnly = !crud;
+    major = compactionType.equals(CompactionType.MAJOR);
+    minor = compactionType.equals(CompactionType.MINOR);
+    rebalance = compactionType.equals(CompactionType.REBALANCE);

Review Comment:
   You can swap the operands in these expressions to be null safe. ex:
   ```
   CompactionType.MAJOR.equals(compactionType)
   ```
   or
   ```
   CompactionType.MAJOR == compactionType
   ```
   also works since these are enum constants.
   
   Btw why do we have three boolean fields for `major`, `minor` and 
`rebalance`? Would it be enough to store the `compactionType` in one 
`CompactionType` enum field instead?
   Going forward the code blocks depend on the values of these could be 
extracted to 3 classes having a common interface. This is just an idea and 
probably worth doing such refactors in a separate patch.



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
 @SuppressWarnings("deprecation")
 public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
+  @Test
+  public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws 
Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new 
config
+    conf.set("tez.grouping.min-size", "1000");
+    conf.set("tez.grouping.max-size", "80000");
+    driver = new Driver(conf);
+
+    final String stageTableName = "stage_rebalance_test";
+    final String tableName = "rebalance_test";
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(stageTableName, true, false);
+    testDataProvider.insertTestDataPartitioned(stageTableName);
+
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) 
" +
+        "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select 
a, b from " + stageTableName, driver);
+
+    //do some single inserts to have more data in the first bucket.
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('12',12)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('13',13)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('14',14)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('15',15)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('16',16)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('17',17)", driver);
+
+    // Verify buckets and their content before rebalance
+    Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertEquals("Test setup does not match the expected: different 
buckets",
+        Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+        CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+    String[][] expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+            "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+            "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+            "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+
+    //Try to do a rebalancing compaction
+    executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 
'rebalance'", driver);
+    runWorker(conf);
+
+    //Check if the compaction succeed
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, 
compacts.size());
+    Assert.assertEquals("Expecting compaction state 'ready for cleaning' and 
found:" + compacts.get(0).getState(),
+        "ready for cleaning", compacts.get(0).getState());

Review Comment:
   Can 
[verifySuccessfulCompaction](https://github.com/apache/hive/blob/497553016d804576cdf2262e5869541c71e2efbd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java#L164)
 replace this block?
   



##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java:
##########
@@ -52,6 +55,7 @@
 
 import static 
org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2.swapTxnManager;
 import static org.mockito.Matchers.any;
+import static org.powermock.api.mockito.PowerMockito.when;

Review Comment:
   Can the usage of `powermock` be avoided?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java:
##########
@@ -402,20 +413,14 @@ protected Boolean findNextCompactionAndExecute(boolean 
collectGenericStats, bool
         todo Find a more generic approach to collecting files in the same 
logical bucket to compact within the same
         task (currently we're using Tez split grouping).
         */
-        QueryCompactor queryCompactor = 
QueryCompactorFactory.getQueryCompactor(t, conf, ci);
-        computeStats = (queryCompactor == null && collectMrStats) || 
collectGenericStats;
+        Compactor compactor = compactorFactory.getQueryCompactor(msc, t, conf, 
ci);
+        computeStats = (compactor == null && collectMrStats) || 
collectGenericStats;
 
         LOG.info("Starting " + ci.type.toString() + " compaction for " + 
ci.getFullPartitionName() + ", id:" +
                 ci.id + " in " + compactionTxn + " with compute stats set to " 
+ computeStats);
+        LOG.info("Will compact id: " + ci.id + " with compactor class: " + 
compactor.getClass().getName());
 
-        if (queryCompactor != null) {
-          LOG.info("Will compact id: " + ci.id + " with query-based compactor 
class: "
-              + queryCompactor.getClass().getName());
-          queryCompactor.runCompaction(conf, t, p, sd, tblValidWriteIds, ci, 
dir);
-        } else {
-          LOG.info("Will compact id: " + ci.id + " via MR job");
-          runCompactionViaMrJob(ci, t, p, sd, tblValidWriteIds, jobName, dir);
-        }
+        compactor.runCompaction(conf, t, p, sd, tblValidWriteIds, ci, dir);

Review Comment:
   Can `compactor` be null here?



##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java:
##########
@@ -1040,10 +1042,13 @@ public void testDoesNotGatherStatsIfCompactionFails() 
throws Exception {
 
     txnHandler.compact(new CompactionRequest("default", "mtwb", 
CompactionType.MINOR));
 
+    CompactorFactory mockedFactory = Mockito.mock(CompactorFactory.class);
+    when(mockedFactory.getQueryCompactor(any(), any(), any(), 
any())).thenThrow(new RuntimeException());
+
     Worker worker = Mockito.spy(new Worker());
-    Mockito.when(worker.getMrCompactor()).thenThrow(RuntimeException.class);
     worker.setConf(conf);
     worker.init(new AtomicBoolean(true));
+    Whitebox.setInternalState(worker, "compactorFactory", mockedFactory);

Review Comment:
   How about adding a constructor which takes the `compactorFactory` as a 
parameter?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java:
##########
@@ -128,8 +111,8 @@ void runCompactionQueries(HiveConf conf, String 
tmpTableName, StorageDescriptor
         }
       }
       for (String query : compactionQueries) {
-        LOG.info("Running {} compaction via query: {}", 
compactionInfo.isMajorCompaction() ? "major" : "minor", query);
-        if (!compactionInfo.isMajorCompaction()) {
+        LOG.info("Running {} compaction via query: {}", compactionInfo.type, 
query);
+        if (compactionInfo.type.equals(CompactionType.MINOR)) {

Review Comment:
   swap operands



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java:
##########
@@ -52,7 +53,7 @@ public final class StatsUpdater {
      */
     public void gatherStats(CompactionInfo ci, HiveConf conf, String userName, 
String compactionQueueName) {
         try {
-            if (!ci.isMajorCompaction()) {
+            if (ci.type.equals(CompactionType.MINOR)) {

Review Comment:
   swap operands





Issue Time Tracking
-------------------

    Worklog Id:     (was: 827072)
    Time Spent: 2.5h  (was: 2h 20m)

> Query based Rebalance compaction on full acid tables
> ----------------------------------------------------
>
>                 Key: HIVE-26716
>                 URL: https://issues.apache.org/jira/browse/HIVE-26716
>             Project: Hive
>          Issue Type: Sub-task
>          Components: Hive
>            Reporter: László Végh
>            Assignee: László Végh
>            Priority: Major
>              Labels: ACID, compaction, pull-request-available
>          Time Spent: 2.5h
>  Remaining Estimate: 0h
>
> Support rebalancing compaction on fully ACID tables.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to