kasakrisz commented on code in PR #3746:
URL: https://github.com/apache/hive/pull/3746#discussion_r1026004356


##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
 @SuppressWarnings("deprecation")
 public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
+  @Test
+  public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws 
Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new 
config
+    conf.set("tez.grouping.min-size", "1000");
+    conf.set("tez.grouping.max-size", "80000");
+    driver = new Driver(conf);
+
+    final String stageTableName = "stage_rebalance_test";
+    final String tableName = "rebalance_test";
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(stageTableName, true, false);
+    testDataProvider.insertTestDataPartitioned(stageTableName);
+
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) 
" +
+        "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select 
a, b from " + stageTableName, driver);
+
+    //do some single inserts to have more data in the first bucket.
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('12',12)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('13',13)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('14',14)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('15',15)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('16',16)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('17',17)", driver);
+
+    // Verify buckets and their content before rebalance
+    Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertEquals("Test setup does not match the expected: different 
buckets",
+        Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+        CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+    String[][] expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+            "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+            "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+            "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),

Review Comment:
   This is the non-balanced state, right?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java:
##########
@@ -180,6 +188,9 @@ CompactionQueryBuilder setPartitioned(boolean partitioned) {
    * If true, Create operations for CRUD minor compaction will result in a 
bucketed table.
    */
   CompactionQueryBuilder setBucketed(boolean bucketed) {
+    if(rebalance && bucketed) {
+      throw new IllegalArgumentException("Rebalance compaction is supported 
only on non-bucketed tables!");

Review Comment:
   IIUC the rebalancing feature is about rebalancing buckets so I guess 
`non-bucketed` means `implicitly bucketed` here. WDYT?
   Btw the variable `bucketed` is also confusing since it refers to explicit 
bucketing but it is out of scope of this patch.



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
 @SuppressWarnings("deprecation")
 public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
+  @Test
+  public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws 
Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new 
config
+    conf.set("tez.grouping.min-size", "1000");
+    conf.set("tez.grouping.max-size", "80000");
+    driver = new Driver(conf);
+
+    final String stageTableName = "stage_rebalance_test";
+    final String tableName = "rebalance_test";
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(stageTableName, true, false);
+    testDataProvider.insertTestDataPartitioned(stageTableName);
+
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) 
" +
+        "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select 
a, b from " + stageTableName, driver);
+
+    //do some single inserts to have more data in the first bucket.
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('12',12)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('13',13)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('14',14)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('15',15)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('16',16)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('17',17)", driver);
+
+    // Verify buckets and their content before rebalance
+    Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertEquals("Test setup does not match the expected: different 
buckets",
+        Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+        CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+    String[][] expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+            "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+            "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+            "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+
+    //Try to do a rebalancing compaction
+    executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 
'rebalance'", driver);
+    runWorker(conf);
+
+    //Check if the compaction succeed
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, 
compacts.size());
+    Assert.assertEquals("Expecting compaction state 'ready for cleaning' and 
found:" + compacts.get(0).getState(),
+        "ready for cleaning", compacts.get(0).getState());

Review Comment:
   nit. Is this constant feasible here?
   
https://github.com/apache/hive/blob/497553016d804576cdf2262e5869541c71e2efbd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java#L101



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
 @SuppressWarnings("deprecation")
 public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
+  @Test
+  public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws 
Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new 
config
+    conf.set("tez.grouping.min-size", "1000");
+    conf.set("tez.grouping.max-size", "80000");
+    driver = new Driver(conf);
+
+    final String stageTableName = "stage_rebalance_test";
+    final String tableName = "rebalance_test";
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(stageTableName, true, false);
+    testDataProvider.insertTestDataPartitioned(stageTableName);
+
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) 
" +
+        "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select 
a, b from " + stageTableName, driver);
+
+    //do some single inserts to have more data in the first bucket.
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('12',12)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('13',13)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('14',14)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('15',15)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('16',16)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('17',17)", driver);
+
+    // Verify buckets and their content before rebalance
+    Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertEquals("Test setup does not match the expected: different 
buckets",
+        Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+        CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+    String[][] expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+            "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+            "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+            "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+
+    //Try to do a rebalancing compaction
+    executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 
'rebalance'", driver);
+    runWorker(conf);
+
+    //Check if the compaction succeed
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, 
compacts.size());
+    Assert.assertEquals("Expecting compaction state 'ready for cleaning' and 
found:" + compacts.get(0).getState(),
+        "ready for cleaning", compacts.get(0).getState());
+
+    // Verify buckets and their content after rebalance
+    Assert.assertEquals("Buckets does not match after compaction",
+        Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"),
+        CompactorTestUtil.getBucketFileNames(fs, table, null, 
"base_0000007_v0000020"));
+    expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t2\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t4\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t4\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":10}\t2\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":11}\t3\t4",
+        },
+        {
+            "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12",
+            "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13",
+            "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14",
+            "{\"writeid\":5,\"bucketid\":537001984,\"rowid\":15}\t15\t15",
+            "{\"writeid\":6,\"bucketid\":537001984,\"rowid\":16}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+  }
+
+  @Test
+  public void testRebalanceCompactionPartitionedWithoutBuckets() throws 
Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new 
config
+    conf.set("tez.grouping.min-size", "1");
+    driver = new Driver(conf);
+
+    final String stageTableName = "stage_rebalance_test";
+    final String tableName = "rebalance_test";
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(stageTableName, true, false);
+    executeStatementOnDriver("insert into " + stageTableName +" values " +
+            "('1',1,'yesterday'), ('1',2,'yesterday'), ('1',3, 'yesterday'), 
('1',4, 'yesterday'), " +
+            "('2',1,'today'), ('2',2,'today'), ('2',3,'today'), ('2',4, 
'today'), " +
+            "('3',1,'tomorrow'), ('3',2,'tomorrow'), ('3',3,'tomorrow'), 
('3',4,'tomorrow')",
+        driver);
+
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) 
" +
+        "PARTITIONED BY (ds string) STORED AS ORC 
TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " 
partition (ds='tomorrow') select a, b from " + stageTableName, driver);
+
+    //do some single inserts to have more data in the first bucket.
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('12',12,'tomorrow')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('13',13,'tomorrow')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('14',14,'tomorrow')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('15',15,'tomorrow')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('16',16,'tomorrow')", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('17',17,'tomorrow')", driver);
+
+    // Verify buckets and their content before rebalance in partition 
ds=tomorrow
+    Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertEquals("Test setup does not match the expected: different 
buckets",
+        Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+        CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow", 
"base_0000001"));
+    String[][] expectedBuckets = new String[][] {
+        {
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow",
+            
"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12\ttomorrow",
+            
"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13\ttomorrow",
+            
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14\ttomorrow",
+            
"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15\ttomorrow",
+            
"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16\ttomorrow",
+            
"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17\ttomorrow",
+        },
+        {
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t2\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t3\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t3\t4\ttomorrow",
+        },
+        {
+            
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t1\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t1\t2\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":2}\t1\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":3}\t1\t4\ttomorrow",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+
+    //Try to do a rebalancing compaction
+    executeStatementOnDriver("ALTER TABLE " + tableName + " PARTITION 
(ds='tomorrow') COMPACT 'rebalance'", driver);
+    runWorker(conf);
+
+    //Check if the compaction succeed
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, 
compacts.size());
+    Assert.assertEquals("Expecting compaction state 'ready for cleaning' and 
found:" + compacts.get(0).getState(),
+        "ready for cleaning", compacts.get(0).getState());
+
+    // Verify buckets and their content after rebalance in partition 
ds=tomorrow
+    Assert.assertEquals("Buckets does not match after compaction",
+        Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"),
+        CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow", 
"base_0000007_v0000016"));
+    expectedBuckets = new String[][] {
+        {
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t3\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t3\t2\ttomorrow",
+        },
+        {
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t3\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t4\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t1\t1\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t1\t2\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":10}\t1\t3\ttomorrow",
+            
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":11}\t1\t4\ttomorrow",
+        },
+        {
+            
"{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12\ttomorrow",
+            
"{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13\ttomorrow",
+            
"{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14\ttomorrow",
+            
"{\"writeid\":5,\"bucketid\":537001984,\"rowid\":15}\t15\t15\ttomorrow",
+            
"{\"writeid\":6,\"bucketid\":537001984,\"rowid\":16}\t16\t16\ttomorrow",
+            
"{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17\ttomorrow",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+  }
+
+  @Test
+  public void testRebalanceCompactionNotPartitionedWithBuckets() throws 
Exception {

Review Comment:
   nit. Could you please rename these tests to include the bucketing type of 
table in the method name? ex.:
   ```
   testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable
   testRebalanceCompactionOfNotPartitionedExplicitlyBucketedTable
   ...
   ```
   



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MRCompactor.java:
##########
@@ -261,7 +285,7 @@ public void run(HiveConf conf, String jobName, Table t, 
Partition p, StorageDesc
 
     StringableList dirsToSearch = new StringableList();
     Path baseDir = null;
-    if (ci.isMajorCompaction()) {
+    if (ci.type.equals(CompactionType.MAJOR)) {

Review Comment:
   swap operands for null safety



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java:
##########
@@ -203,19 +214,23 @@ CompactionQueryBuilder setIsDeleteDelta(boolean 
deleteDelta) {
    * @param resultTableName the name of the table we are running the operation 
on
    * @throws IllegalArgumentException if compactionType is null
    */
-  CompactionQueryBuilder(CompactionType compactionType, Operation operation,
+  CompactionQueryBuilder(CompactionType compactionType, Operation operation, 
boolean crud,
       String resultTableName) {
     if (compactionType == null) {
       throw new 
IllegalArgumentException("CompactionQueryBuilder.CompactionType cannot be 
null");
     }
+    this.compactionType = compactionType;
     this.operation = operation;
     this.resultTableName = resultTableName;
-    major = compactionType == CompactionType.MAJOR_CRUD
-        || compactionType == CompactionType.MAJOR_INSERT_ONLY;
-    crud =
-        compactionType == CompactionType.MAJOR_CRUD || compactionType == 
CompactionType.MINOR_CRUD;
-    minor = !major;
+    this.crud = crud;
     insertOnly = !crud;
+    major = compactionType.equals(CompactionType.MAJOR);
+    minor = compactionType.equals(CompactionType.MINOR);
+    rebalance = compactionType.equals(CompactionType.REBALANCE);

Review Comment:
   You can swap the operands in these expressions to be null safe. ex:
   ```
   CompactionType.MAJOR.equals(compactionType)
   ```
   or
   ```
   CompactionType.MAJOR == compactionType
   ```
   also works since these are enum constants.
   
   Btw why do we have three boolean fields for `major`, `minor` and 
`rebalance`? Would it be enough to store the `compactionType` in one 
`CompactionType` enum field instead?
   Going forward the code blocks depend on the values of these could be 
extracted to 3 classes having a common interface. This is just an idea and 
probably worth doing such refactors in a separate patch.



##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
 @SuppressWarnings("deprecation")
 public class TestCrudCompactorOnTez extends CompactorOnTezTest {
 
+  @Test
+  public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws 
Exception {
+    conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+    conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+    conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+    //set grouping size to have 3 buckets, and re-create driver with the new 
config
+    conf.set("tez.grouping.min-size", "1000");
+    conf.set("tez.grouping.max-size", "80000");
+    driver = new Driver(conf);
+
+    final String stageTableName = "stage_rebalance_test";
+    final String tableName = "rebalance_test";
+    AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+    TestDataProvider testDataProvider = new TestDataProvider();
+    testDataProvider.createFullAcidTable(stageTableName, true, false);
+    testDataProvider.insertTestDataPartitioned(stageTableName);
+
+    executeStatementOnDriver("drop table if exists " + tableName, driver);
+    executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int) 
" +
+        "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+    executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select 
a, b from " + stageTableName, driver);
+
+    //do some single inserts to have more data in the first bucket.
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('12',12)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('13',13)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('14',14)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('15',15)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('16',16)", driver);
+    executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values 
('17',17)", driver);
+
+    // Verify buckets and their content before rebalance
+    Table table = msClient.getTable("default", tableName);
+    FileSystem fs = FileSystem.get(conf);
+    Assert.assertEquals("Test setup does not match the expected: different 
buckets",
+        Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+        CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+    String[][] expectedBuckets = new String[][] {
+        {
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+            "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+            "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+            "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+            "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+            "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+            "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+            "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+            "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+        },
+        {
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+            "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+        },
+    };
+    for(int i = 0; i < 3; i++) {
+      Assert.assertEquals("rebalanced bucket " + i, 
Arrays.asList(expectedBuckets[i]),
+          testDataProvider.getBucketData(tableName, 
BucketCodec.V1.encode(options.bucket(i)) + ""));
+    }
+
+    //Try to do a rebalancing compaction
+    executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT 
'rebalance'", driver);
+    runWorker(conf);
+
+    //Check if the compaction succeed
+    TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+    Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1, 
compacts.size());
+    Assert.assertEquals("Expecting compaction state 'ready for cleaning' and 
found:" + compacts.get(0).getState(),
+        "ready for cleaning", compacts.get(0).getState());

Review Comment:
   Can 
[verifySuccessfulCompaction](https://github.com/apache/hive/blob/497553016d804576cdf2262e5869541c71e2efbd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java#L164)
 replace this block?
   



##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java:
##########
@@ -52,6 +55,7 @@
 
 import static 
org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2.swapTxnManager;
 import static org.mockito.Matchers.any;
+import static org.powermock.api.mockito.PowerMockito.when;

Review Comment:
   Can the usage of `powermock` be avoided?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java:
##########
@@ -402,20 +413,14 @@ protected Boolean findNextCompactionAndExecute(boolean 
collectGenericStats, bool
         todo Find a more generic approach to collecting files in the same 
logical bucket to compact within the same
         task (currently we're using Tez split grouping).
         */
-        QueryCompactor queryCompactor = 
QueryCompactorFactory.getQueryCompactor(t, conf, ci);
-        computeStats = (queryCompactor == null && collectMrStats) || 
collectGenericStats;
+        Compactor compactor = compactorFactory.getQueryCompactor(msc, t, conf, 
ci);
+        computeStats = (compactor == null && collectMrStats) || 
collectGenericStats;
 
         LOG.info("Starting " + ci.type.toString() + " compaction for " + 
ci.getFullPartitionName() + ", id:" +
                 ci.id + " in " + compactionTxn + " with compute stats set to " 
+ computeStats);
+        LOG.info("Will compact id: " + ci.id + " with compactor class: " + 
compactor.getClass().getName());
 
-        if (queryCompactor != null) {
-          LOG.info("Will compact id: " + ci.id + " with query-based compactor 
class: "
-              + queryCompactor.getClass().getName());
-          queryCompactor.runCompaction(conf, t, p, sd, tblValidWriteIds, ci, 
dir);
-        } else {
-          LOG.info("Will compact id: " + ci.id + " via MR job");
-          runCompactionViaMrJob(ci, t, p, sd, tblValidWriteIds, jobName, dir);
-        }
+        compactor.runCompaction(conf, t, p, sd, tblValidWriteIds, ci, dir);

Review Comment:
   Can `compactor` be null here?



##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java:
##########
@@ -1040,10 +1042,13 @@ public void testDoesNotGatherStatsIfCompactionFails() 
throws Exception {
 
     txnHandler.compact(new CompactionRequest("default", "mtwb", 
CompactionType.MINOR));
 
+    CompactorFactory mockedFactory = Mockito.mock(CompactorFactory.class);
+    when(mockedFactory.getQueryCompactor(any(), any(), any(), 
any())).thenThrow(new RuntimeException());
+
     Worker worker = Mockito.spy(new Worker());
-    Mockito.when(worker.getMrCompactor()).thenThrow(RuntimeException.class);
     worker.setConf(conf);
     worker.init(new AtomicBoolean(true));
+    Whitebox.setInternalState(worker, "compactorFactory", mockedFactory);

Review Comment:
   How about adding a constructor which takes the `compactorFactory` as a 
parameter?



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java:
##########
@@ -128,8 +111,8 @@ void runCompactionQueries(HiveConf conf, String 
tmpTableName, StorageDescriptor
         }
       }
       for (String query : compactionQueries) {
-        LOG.info("Running {} compaction via query: {}", 
compactionInfo.isMajorCompaction() ? "major" : "minor", query);
-        if (!compactionInfo.isMajorCompaction()) {
+        LOG.info("Running {} compaction via query: {}", compactionInfo.type, 
query);
+        if (compactionInfo.type.equals(CompactionType.MINOR)) {

Review Comment:
   swap operands



##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java:
##########
@@ -52,7 +53,7 @@ public final class StatsUpdater {
      */
     public void gatherStats(CompactionInfo ci, HiveConf conf, String userName, 
String compactionQueueName) {
         try {
-            if (!ci.isMajorCompaction()) {
+            if (ci.type.equals(CompactionType.MINOR)) {

Review Comment:
   swap operands



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to