[
https://issues.apache.org/jira/browse/HIVE-26716?focusedWorklogId=827072&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-827072
]
ASF GitHub Bot logged work on HIVE-26716:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 18/Nov/22 08:08
Start Date: 18/Nov/22 08:08
Worklog Time Spent: 10m
Work Description: kasakrisz commented on code in PR #3746:
URL: https://github.com/apache/hive/pull/3746#discussion_r1026004356
##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
@SuppressWarnings("deprecation")
public class TestCrudCompactorOnTez extends CompactorOnTezTest {
+ @Test
+ public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws
Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "1000");
+ conf.set("tez.grouping.max-size", "80000");
+ driver = new Driver(conf);
+
+ final String stageTableName = "stage_rebalance_test";
+ final String tableName = "rebalance_test";
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ testDataProvider.insertTestDataPartitioned(stageTableName);
+
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select
a, b from " + stageTableName, driver);
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17)", driver);
+
+ // Verify buckets and their content before rebalance
+ Table table = msClient.getTable("default", tableName);
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertEquals("Test setup does not match the expected: different
buckets",
+ Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+ CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+ String[][] expectedBuckets = new String[][] {
+ {
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+ "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+ "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+ "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+ "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
Review Comment:
This is the non-balanced state, right?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java:
##########
@@ -180,6 +188,9 @@ CompactionQueryBuilder setPartitioned(boolean partitioned) {
* If true, Create operations for CRUD minor compaction will result in a
bucketed table.
*/
CompactionQueryBuilder setBucketed(boolean bucketed) {
+ if(rebalance && bucketed) {
+ throw new IllegalArgumentException("Rebalance compaction is supported
only on non-bucketed tables!");
Review Comment:
IIUC the rebalancing feature is about rebalancing buckets so I guess
`non-bucketed` means `implicitly bucketed` here. WDYT?
Btw the variable `bucketed` is also confusing since it refers to explicit
bucketing but it is out of scope of this patch.
##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
@SuppressWarnings("deprecation")
public class TestCrudCompactorOnTez extends CompactorOnTezTest {
+ @Test
+ public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws
Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "1000");
+ conf.set("tez.grouping.max-size", "80000");
+ driver = new Driver(conf);
+
+ final String stageTableName = "stage_rebalance_test";
+ final String tableName = "rebalance_test";
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ testDataProvider.insertTestDataPartitioned(stageTableName);
+
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select
a, b from " + stageTableName, driver);
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17)", driver);
+
+ // Verify buckets and their content before rebalance
+ Table table = msClient.getTable("default", tableName);
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertEquals("Test setup does not match the expected: different
buckets",
+ Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+ CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+ String[][] expectedBuckets = new String[][] {
+ {
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+ "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+ "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+ "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+ "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance'", driver);
+ runWorker(conf);
+
+ //Check if the compaction succeed
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1,
compacts.size());
+ Assert.assertEquals("Expecting compaction state 'ready for cleaning' and
found:" + compacts.get(0).getState(),
+ "ready for cleaning", compacts.get(0).getState());
Review Comment:
nit. Is this constant feasible here?
https://github.com/apache/hive/blob/497553016d804576cdf2262e5869541c71e2efbd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java#L101
##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
@SuppressWarnings("deprecation")
public class TestCrudCompactorOnTez extends CompactorOnTezTest {
+ @Test
+ public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws
Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "1000");
+ conf.set("tez.grouping.max-size", "80000");
+ driver = new Driver(conf);
+
+ final String stageTableName = "stage_rebalance_test";
+ final String tableName = "rebalance_test";
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ testDataProvider.insertTestDataPartitioned(stageTableName);
+
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select
a, b from " + stageTableName, driver);
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17)", driver);
+
+ // Verify buckets and their content before rebalance
+ Table table = msClient.getTable("default", tableName);
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertEquals("Test setup does not match the expected: different
buckets",
+ Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+ CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+ String[][] expectedBuckets = new String[][] {
+ {
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+ "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+ "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+ "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+ "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance'", driver);
+ runWorker(conf);
+
+ //Check if the compaction succeed
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1,
compacts.size());
+ Assert.assertEquals("Expecting compaction state 'ready for cleaning' and
found:" + compacts.get(0).getState(),
+ "ready for cleaning", compacts.get(0).getState());
+
+ // Verify buckets and their content after rebalance
+ Assert.assertEquals("Buckets does not match after compaction",
+ Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"),
+ CompactorTestUtil.getBucketFileNames(fs, table, null,
"base_0000007_v0000020"));
+ expectedBuckets = new String[][] {
+ {
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t2\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t4\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t4\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":10}\t2\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":11}\t3\t4",
+ },
+ {
+ "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12",
+ "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13",
+ "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14",
+ "{\"writeid\":5,\"bucketid\":537001984,\"rowid\":15}\t15\t15",
+ "{\"writeid\":6,\"bucketid\":537001984,\"rowid\":16}\t16\t16",
+ "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+ }
+
+ @Test
+ public void testRebalanceCompactionPartitionedWithoutBuckets() throws
Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "1");
+ driver = new Driver(conf);
+
+ final String stageTableName = "stage_rebalance_test";
+ final String tableName = "rebalance_test";
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ executeStatementOnDriver("insert into " + stageTableName +" values " +
+ "('1',1,'yesterday'), ('1',2,'yesterday'), ('1',3, 'yesterday'),
('1',4, 'yesterday'), " +
+ "('2',1,'today'), ('2',2,'today'), ('2',3,'today'), ('2',4,
'today'), " +
+ "('3',1,'tomorrow'), ('3',2,'tomorrow'), ('3',3,'tomorrow'),
('3',4,'tomorrow')",
+ driver);
+
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "PARTITIONED BY (ds string) STORED AS ORC
TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + "
partition (ds='tomorrow') select a, b from " + stageTableName, driver);
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17,'tomorrow')", driver);
+
+ // Verify buckets and their content before rebalance in partition
ds=tomorrow
+ Table table = msClient.getTable("default", tableName);
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertEquals("Test setup does not match the expected: different
buckets",
+ Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+ CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow",
"base_0000001"));
+ String[][] expectedBuckets = new String[][] {
+ {
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow",
+
"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12\ttomorrow",
+
"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13\ttomorrow",
+
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14\ttomorrow",
+
"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15\ttomorrow",
+
"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16\ttomorrow",
+
"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17\ttomorrow",
+ },
+ {
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t2\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t3\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t3\t4\ttomorrow",
+ },
+ {
+
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t1\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t1\t2\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":2}\t1\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":3}\t1\t4\ttomorrow",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " PARTITION
(ds='tomorrow') COMPACT 'rebalance'", driver);
+ runWorker(conf);
+
+ //Check if the compaction succeed
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1,
compacts.size());
+ Assert.assertEquals("Expecting compaction state 'ready for cleaning' and
found:" + compacts.get(0).getState(),
+ "ready for cleaning", compacts.get(0).getState());
+
+ // Verify buckets and their content after rebalance in partition
ds=tomorrow
+ Assert.assertEquals("Buckets does not match after compaction",
+ Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"),
+ CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow",
"base_0000007_v0000016"));
+ expectedBuckets = new String[][] {
+ {
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t3\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t3\t2\ttomorrow",
+ },
+ {
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t3\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t4\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t1\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t1\t2\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":10}\t1\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":11}\t1\t4\ttomorrow",
+ },
+ {
+
"{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12\ttomorrow",
+
"{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13\ttomorrow",
+
"{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14\ttomorrow",
+
"{\"writeid\":5,\"bucketid\":537001984,\"rowid\":15}\t15\t15\ttomorrow",
+
"{\"writeid\":6,\"bucketid\":537001984,\"rowid\":16}\t16\t16\ttomorrow",
+
"{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17\ttomorrow",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+ }
+
+ @Test
+ public void testRebalanceCompactionNotPartitionedWithBuckets() throws
Exception {
Review Comment:
nit. Could you please rename these tests to include the bucketing type of
table in the method name? ex.:
```
testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable
testRebalanceCompactionOfNotPartitionedExplicitlyBucketedTable
...
```
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MRCompactor.java:
##########
@@ -261,7 +285,7 @@ public void run(HiveConf conf, String jobName, Table t,
Partition p, StorageDesc
StringableList dirsToSearch = new StringableList();
Path baseDir = null;
- if (ci.isMajorCompaction()) {
+ if (ci.type.equals(CompactionType.MAJOR)) {
Review Comment:
swap operands for null safety
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java:
##########
@@ -203,19 +214,23 @@ CompactionQueryBuilder setIsDeleteDelta(boolean
deleteDelta) {
* @param resultTableName the name of the table we are running the operation
on
* @throws IllegalArgumentException if compactionType is null
*/
- CompactionQueryBuilder(CompactionType compactionType, Operation operation,
+ CompactionQueryBuilder(CompactionType compactionType, Operation operation,
boolean crud,
String resultTableName) {
if (compactionType == null) {
throw new
IllegalArgumentException("CompactionQueryBuilder.CompactionType cannot be
null");
}
+ this.compactionType = compactionType;
this.operation = operation;
this.resultTableName = resultTableName;
- major = compactionType == CompactionType.MAJOR_CRUD
- || compactionType == CompactionType.MAJOR_INSERT_ONLY;
- crud =
- compactionType == CompactionType.MAJOR_CRUD || compactionType ==
CompactionType.MINOR_CRUD;
- minor = !major;
+ this.crud = crud;
insertOnly = !crud;
+ major = compactionType.equals(CompactionType.MAJOR);
+ minor = compactionType.equals(CompactionType.MINOR);
+ rebalance = compactionType.equals(CompactionType.REBALANCE);
Review Comment:
You can swap the operands in these expressions to be null safe. ex:
```
CompactionType.MAJOR.equals(compactionType)
```
or
```
CompactionType.MAJOR == compactionType
```
also works since these are enum constants.
Btw why do we have three boolean fields for `major`, `minor` and
`rebalance`? Would it be enough to store the `compactionType` in one
`CompactionType` enum field instead?
Going forward the code blocks depend on the values of these could be
extracted to 3 classes having a common interface. This is just an idea and
probably worth doing such refactors in a separate patch.
##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
@SuppressWarnings("deprecation")
public class TestCrudCompactorOnTez extends CompactorOnTezTest {
+ @Test
+ public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws
Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "1000");
+ conf.set("tez.grouping.max-size", "80000");
+ driver = new Driver(conf);
+
+ final String stageTableName = "stage_rebalance_test";
+ final String tableName = "rebalance_test";
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ testDataProvider.insertTestDataPartitioned(stageTableName);
+
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select
a, b from " + stageTableName, driver);
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17)", driver);
+
+ // Verify buckets and their content before rebalance
+ Table table = msClient.getTable("default", tableName);
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertEquals("Test setup does not match the expected: different
buckets",
+ Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+ CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+ String[][] expectedBuckets = new String[][] {
+ {
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+ "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+ "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+ "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+ "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance'", driver);
+ runWorker(conf);
+
+ //Check if the compaction succeed
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1,
compacts.size());
+ Assert.assertEquals("Expecting compaction state 'ready for cleaning' and
found:" + compacts.get(0).getState(),
+ "ready for cleaning", compacts.get(0).getState());
Review Comment:
Can
[verifySuccessfulCompaction](https://github.com/apache/hive/blob/497553016d804576cdf2262e5869541c71e2efbd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java#L164)
replace this block?
##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java:
##########
@@ -52,6 +55,7 @@
import static
org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2.swapTxnManager;
import static org.mockito.Matchers.any;
+import static org.powermock.api.mockito.PowerMockito.when;
Review Comment:
Can the usage of `powermock` be avoided?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java:
##########
@@ -402,20 +413,14 @@ protected Boolean findNextCompactionAndExecute(boolean
collectGenericStats, bool
todo Find a more generic approach to collecting files in the same
logical bucket to compact within the same
task (currently we're using Tez split grouping).
*/
- QueryCompactor queryCompactor =
QueryCompactorFactory.getQueryCompactor(t, conf, ci);
- computeStats = (queryCompactor == null && collectMrStats) ||
collectGenericStats;
+ Compactor compactor = compactorFactory.getQueryCompactor(msc, t, conf,
ci);
+ computeStats = (compactor == null && collectMrStats) ||
collectGenericStats;
LOG.info("Starting " + ci.type.toString() + " compaction for " +
ci.getFullPartitionName() + ", id:" +
ci.id + " in " + compactionTxn + " with compute stats set to "
+ computeStats);
+ LOG.info("Will compact id: " + ci.id + " with compactor class: " +
compactor.getClass().getName());
- if (queryCompactor != null) {
- LOG.info("Will compact id: " + ci.id + " with query-based compactor
class: "
- + queryCompactor.getClass().getName());
- queryCompactor.runCompaction(conf, t, p, sd, tblValidWriteIds, ci,
dir);
- } else {
- LOG.info("Will compact id: " + ci.id + " via MR job");
- runCompactionViaMrJob(ci, t, p, sd, tblValidWriteIds, jobName, dir);
- }
+ compactor.runCompaction(conf, t, p, sd, tblValidWriteIds, ci, dir);
Review Comment:
Can `compactor` be null here?
##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java:
##########
@@ -1040,10 +1042,13 @@ public void testDoesNotGatherStatsIfCompactionFails()
throws Exception {
txnHandler.compact(new CompactionRequest("default", "mtwb",
CompactionType.MINOR));
+ CompactorFactory mockedFactory = Mockito.mock(CompactorFactory.class);
+ when(mockedFactory.getQueryCompactor(any(), any(), any(),
any())).thenThrow(new RuntimeException());
+
Worker worker = Mockito.spy(new Worker());
- Mockito.when(worker.getMrCompactor()).thenThrow(RuntimeException.class);
worker.setConf(conf);
worker.init(new AtomicBoolean(true));
+ Whitebox.setInternalState(worker, "compactorFactory", mockedFactory);
Review Comment:
How about adding a constructor which takes the `compactorFactory` as a
parameter?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java:
##########
@@ -128,8 +111,8 @@ void runCompactionQueries(HiveConf conf, String
tmpTableName, StorageDescriptor
}
}
for (String query : compactionQueries) {
- LOG.info("Running {} compaction via query: {}",
compactionInfo.isMajorCompaction() ? "major" : "minor", query);
- if (!compactionInfo.isMajorCompaction()) {
+ LOG.info("Running {} compaction via query: {}", compactionInfo.type,
query);
+ if (compactionInfo.type.equals(CompactionType.MINOR)) {
Review Comment:
swap operands
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java:
##########
@@ -52,7 +53,7 @@ public final class StatsUpdater {
*/
public void gatherStats(CompactionInfo ci, HiveConf conf, String userName,
String compactionQueueName) {
try {
- if (!ci.isMajorCompaction()) {
+ if (ci.type.equals(CompactionType.MINOR)) {
Review Comment:
swap operands
Issue Time Tracking
-------------------
Worklog Id: (was: 827072)
Time Spent: 2.5h (was: 2h 20m)
> Query based Rebalance compaction on full acid tables
> ----------------------------------------------------
>
> Key: HIVE-26716
> URL: https://issues.apache.org/jira/browse/HIVE-26716
> Project: Hive
> Issue Type: Sub-task
> Components: Hive
> Reporter: László Végh
> Assignee: László Végh
> Priority: Major
> Labels: ACID, compaction, pull-request-available
> Time Spent: 2.5h
> Remaining Estimate: 0h
>
> Support rebalancing compaction on fully ACID tables.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)