kasakrisz commented on code in PR #3746:
URL: https://github.com/apache/hive/pull/3746#discussion_r1026004356
##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
@SuppressWarnings("deprecation")
public class TestCrudCompactorOnTez extends CompactorOnTezTest {
+ @Test
+ public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws
Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "1000");
+ conf.set("tez.grouping.max-size", "80000");
+ driver = new Driver(conf);
+
+ final String stageTableName = "stage_rebalance_test";
+ final String tableName = "rebalance_test";
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ testDataProvider.insertTestDataPartitioned(stageTableName);
+
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select
a, b from " + stageTableName, driver);
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17)", driver);
+
+ // Verify buckets and their content before rebalance
+ Table table = msClient.getTable("default", tableName);
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertEquals("Test setup does not match the expected: different
buckets",
+ Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+ CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+ String[][] expectedBuckets = new String[][] {
+ {
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+ "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+ "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+ "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+ "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
Review Comment:
This is the non-balanced state, right?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java:
##########
@@ -180,6 +188,9 @@ CompactionQueryBuilder setPartitioned(boolean partitioned) {
* If true, Create operations for CRUD minor compaction will result in a
bucketed table.
*/
CompactionQueryBuilder setBucketed(boolean bucketed) {
+ if(rebalance && bucketed) {
+ throw new IllegalArgumentException("Rebalance compaction is supported
only on non-bucketed tables!");
Review Comment:
IIUC the rebalancing feature is about rebalancing buckets so I guess
`non-bucketed` means `implicitly bucketed` here. WDYT?
Btw the variable `bucketed` is also confusing since it refers to explicit
bucketing but it is out of scope of this patch.
##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
@SuppressWarnings("deprecation")
public class TestCrudCompactorOnTez extends CompactorOnTezTest {
+ @Test
+ public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws
Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "1000");
+ conf.set("tez.grouping.max-size", "80000");
+ driver = new Driver(conf);
+
+ final String stageTableName = "stage_rebalance_test";
+ final String tableName = "rebalance_test";
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ testDataProvider.insertTestDataPartitioned(stageTableName);
+
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select
a, b from " + stageTableName, driver);
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17)", driver);
+
+ // Verify buckets and their content before rebalance
+ Table table = msClient.getTable("default", tableName);
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertEquals("Test setup does not match the expected: different
buckets",
+ Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+ CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+ String[][] expectedBuckets = new String[][] {
+ {
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+ "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+ "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+ "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+ "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance'", driver);
+ runWorker(conf);
+
+ //Check if the compaction succeed
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1,
compacts.size());
+ Assert.assertEquals("Expecting compaction state 'ready for cleaning' and
found:" + compacts.get(0).getState(),
+ "ready for cleaning", compacts.get(0).getState());
Review Comment:
nit. Is this constant feasible here?
https://github.com/apache/hive/blob/497553016d804576cdf2262e5869541c71e2efbd/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java#L101
##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
@SuppressWarnings("deprecation")
public class TestCrudCompactorOnTez extends CompactorOnTezTest {
+ @Test
+ public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws
Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "1000");
+ conf.set("tez.grouping.max-size", "80000");
+ driver = new Driver(conf);
+
+ final String stageTableName = "stage_rebalance_test";
+ final String tableName = "rebalance_test";
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ testDataProvider.insertTestDataPartitioned(stageTableName);
+
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select
a, b from " + stageTableName, driver);
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17)", driver);
+
+ // Verify buckets and their content before rebalance
+ Table table = msClient.getTable("default", tableName);
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertEquals("Test setup does not match the expected: different
buckets",
+ Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+ CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+ String[][] expectedBuckets = new String[][] {
+ {
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+ "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+ "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+ "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+ "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance'", driver);
+ runWorker(conf);
+
+ //Check if the compaction succeed
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1,
compacts.size());
+ Assert.assertEquals("Expecting compaction state 'ready for cleaning' and
found:" + compacts.get(0).getState(),
+ "ready for cleaning", compacts.get(0).getState());
+
+ // Verify buckets and their content after rebalance
+ Assert.assertEquals("Buckets does not match after compaction",
+ Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"),
+ CompactorTestUtil.getBucketFileNames(fs, table, null,
"base_0000007_v0000020"));
+ expectedBuckets = new String[][] {
+ {
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t2\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t4\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t4\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":10}\t2\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":11}\t3\t4",
+ },
+ {
+ "{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12",
+ "{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13",
+ "{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14",
+ "{\"writeid\":5,\"bucketid\":537001984,\"rowid\":15}\t15\t15",
+ "{\"writeid\":6,\"bucketid\":537001984,\"rowid\":16}\t16\t16",
+ "{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+ }
+
+ @Test
+ public void testRebalanceCompactionPartitionedWithoutBuckets() throws
Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "1");
+ driver = new Driver(conf);
+
+ final String stageTableName = "stage_rebalance_test";
+ final String tableName = "rebalance_test";
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ executeStatementOnDriver("insert into " + stageTableName +" values " +
+ "('1',1,'yesterday'), ('1',2,'yesterday'), ('1',3, 'yesterday'),
('1',4, 'yesterday'), " +
+ "('2',1,'today'), ('2',2,'today'), ('2',3,'today'), ('2',4,
'today'), " +
+ "('3',1,'tomorrow'), ('3',2,'tomorrow'), ('3',3,'tomorrow'),
('3',4,'tomorrow')",
+ driver);
+
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "PARTITIONED BY (ds string) STORED AS ORC
TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + "
partition (ds='tomorrow') select a, b from " + stageTableName, driver);
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16,'tomorrow')", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17,'tomorrow')", driver);
+
+ // Verify buckets and their content before rebalance in partition
ds=tomorrow
+ Table table = msClient.getTable("default", tableName);
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertEquals("Test setup does not match the expected: different
buckets",
+ Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+ CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow",
"base_0000001"));
+ String[][] expectedBuckets = new String[][] {
+ {
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow",
+
"{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12\ttomorrow",
+
"{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13\ttomorrow",
+
"{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14\ttomorrow",
+
"{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15\ttomorrow",
+
"{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16\ttomorrow",
+
"{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17\ttomorrow",
+ },
+ {
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t3\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t2\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t3\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t3\t4\ttomorrow",
+ },
+ {
+
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t1\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t1\t2\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":2}\t1\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":537001984,\"rowid\":3}\t1\t4\ttomorrow",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " PARTITION
(ds='tomorrow') COMPACT 'rebalance'", driver);
+ runWorker(conf);
+
+ //Check if the compaction succeed
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1,
compacts.size());
+ Assert.assertEquals("Expecting compaction state 'ready for cleaning' and
found:" + compacts.get(0).getState(),
+ "ready for cleaning", compacts.get(0).getState());
+
+ // Verify buckets and their content after rebalance in partition
ds=tomorrow
+ Assert.assertEquals("Buckets does not match after compaction",
+ Arrays.asList("bucket_00000", "bucket_00001", "bucket_00002"),
+ CompactorTestUtil.getBucketFileNames(fs, table, "ds=tomorrow",
"base_0000007_v0000016"));
+ expectedBuckets = new String[][] {
+ {
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t2\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t2\t2\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t2\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t2\t4\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t3\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t3\t2\ttomorrow",
+ },
+ {
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":6}\t3\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":7}\t3\t4\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":8}\t1\t1\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":9}\t1\t2\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":10}\t1\t3\ttomorrow",
+
"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":11}\t1\t4\ttomorrow",
+ },
+ {
+
"{\"writeid\":2,\"bucketid\":537001984,\"rowid\":12}\t12\t12\ttomorrow",
+
"{\"writeid\":3,\"bucketid\":537001984,\"rowid\":13}\t13\t13\ttomorrow",
+
"{\"writeid\":4,\"bucketid\":537001984,\"rowid\":14}\t14\t14\ttomorrow",
+
"{\"writeid\":5,\"bucketid\":537001984,\"rowid\":15}\t15\t15\ttomorrow",
+
"{\"writeid\":6,\"bucketid\":537001984,\"rowid\":16}\t16\t16\ttomorrow",
+
"{\"writeid\":7,\"bucketid\":537001984,\"rowid\":17}\t17\t17\ttomorrow",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+ }
+
+ @Test
+ public void testRebalanceCompactionNotPartitionedWithBuckets() throws
Exception {
Review Comment:
nit. Could you please rename these tests to include the bucketing type of
table in the method name? ex.:
```
testRebalanceCompactionOfNotPartitionedImplicitlyBucketedTable
testRebalanceCompactionOfNotPartitionedExplicitlyBucketedTable
...
```
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MRCompactor.java:
##########
@@ -261,7 +285,7 @@ public void run(HiveConf conf, String jobName, Table t,
Partition p, StorageDesc
StringableList dirsToSearch = new StringableList();
Path baseDir = null;
- if (ci.isMajorCompaction()) {
+ if (ci.type.equals(CompactionType.MAJOR)) {
Review Comment:
swap operands for null safety
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java:
##########
@@ -203,19 +214,23 @@ CompactionQueryBuilder setIsDeleteDelta(boolean
deleteDelta) {
* @param resultTableName the name of the table we are running the operation
on
* @throws IllegalArgumentException if compactionType is null
*/
- CompactionQueryBuilder(CompactionType compactionType, Operation operation,
+ CompactionQueryBuilder(CompactionType compactionType, Operation operation,
boolean crud,
String resultTableName) {
if (compactionType == null) {
throw new
IllegalArgumentException("CompactionQueryBuilder.CompactionType cannot be
null");
}
+ this.compactionType = compactionType;
this.operation = operation;
this.resultTableName = resultTableName;
- major = compactionType == CompactionType.MAJOR_CRUD
- || compactionType == CompactionType.MAJOR_INSERT_ONLY;
- crud =
- compactionType == CompactionType.MAJOR_CRUD || compactionType ==
CompactionType.MINOR_CRUD;
- minor = !major;
+ this.crud = crud;
insertOnly = !crud;
+ major = compactionType.equals(CompactionType.MAJOR);
+ minor = compactionType.equals(CompactionType.MINOR);
+ rebalance = compactionType.equals(CompactionType.REBALANCE);
Review Comment:
You can swap the operands in these expressions to be null safe. ex:
```
CompactionType.MAJOR.equals(compactionType)
```
or
```
CompactionType.MAJOR == compactionType
```
also works since these are enum constants.
Btw why do we have three boolean fields for `major`, `minor` and
`rebalance`? Would it be enough to store the `compactionType` in one
`CompactionType` enum field instead?
Going forward the code blocks depend on the values of these could be
extracted to 3 classes having a common interface. This is just an idea and
probably worth doing such refactors in a separate patch.
##########
itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCrudCompactorOnTez.java:
##########
@@ -89,6 +89,270 @@
@SuppressWarnings("deprecation")
public class TestCrudCompactorOnTez extends CompactorOnTezTest {
+ @Test
+ public void testRebalanceCompactionNotPartitionedWithoutBuckets() throws
Exception {
+ conf.setBoolVar(HiveConf.ConfVars.COMPACTOR_CRUD_QUERY_BASED, true);
+ conf.setBoolVar(HiveConf.ConfVars.HIVE_COMPACTOR_GATHER_STATS, false);
+ conf.setBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER, false);
+
+ //set grouping size to have 3 buckets, and re-create driver with the new
config
+ conf.set("tez.grouping.min-size", "1000");
+ conf.set("tez.grouping.max-size", "80000");
+ driver = new Driver(conf);
+
+ final String stageTableName = "stage_rebalance_test";
+ final String tableName = "rebalance_test";
+ AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf);
+
+ TestDataProvider testDataProvider = new TestDataProvider();
+ testDataProvider.createFullAcidTable(stageTableName, true, false);
+ testDataProvider.insertTestDataPartitioned(stageTableName);
+
+ executeStatementOnDriver("drop table if exists " + tableName, driver);
+ executeStatementOnDriver("CREATE TABLE " + tableName + "(a string, b int)
" +
+ "STORED AS ORC TBLPROPERTIES('transactional'='true')", driver);
+ executeStatementOnDriver("INSERT OVERWRITE TABLE " + tableName + " select
a, b from " + stageTableName, driver);
+
+ //do some single inserts to have more data in the first bucket.
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('12',12)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('13',13)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('14',14)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('15',15)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('16',16)", driver);
+ executeStatementOnDriver("INSERT INTO TABLE " + tableName + " values
('17',17)", driver);
+
+ // Verify buckets and their content before rebalance
+ Table table = msClient.getTable("default", tableName);
+ FileSystem fs = FileSystem.get(conf);
+ Assert.assertEquals("Test setup does not match the expected: different
buckets",
+ Arrays.asList("bucket_00000_0", "bucket_00001_0", "bucket_00002_0"),
+ CompactorTestUtil.getBucketFileNames(fs, table, null, "base_0000001"));
+ String[][] expectedBuckets = new String[][] {
+ {
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":0}\t5\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":1}\t6\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":2}\t6\t3",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":3}\t6\t4",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":4}\t5\t2",
+ "{\"writeid\":1,\"bucketid\":536870912,\"rowid\":5}\t5\t3",
+ "{\"writeid\":2,\"bucketid\":536870912,\"rowid\":0}\t12\t12",
+ "{\"writeid\":3,\"bucketid\":536870912,\"rowid\":0}\t13\t13",
+ "{\"writeid\":4,\"bucketid\":536870912,\"rowid\":0}\t14\t14",
+ "{\"writeid\":5,\"bucketid\":536870912,\"rowid\":0}\t15\t15",
+ "{\"writeid\":6,\"bucketid\":536870912,\"rowid\":0}\t16\t16",
+ "{\"writeid\":7,\"bucketid\":536870912,\"rowid\":0}\t17\t17",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t2\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":1}\t3\t3",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":2}\t4\t4",
+ "{\"writeid\":1,\"bucketid\":536936448,\"rowid\":3}\t4\t3",
+ },
+ {
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":0}\t2\t3",
+ "{\"writeid\":1,\"bucketid\":537001984,\"rowid\":1}\t3\t4",
+ },
+ };
+ for(int i = 0; i < 3; i++) {
+ Assert.assertEquals("rebalanced bucket " + i,
Arrays.asList(expectedBuckets[i]),
+ testDataProvider.getBucketData(tableName,
BucketCodec.V1.encode(options.bucket(i)) + ""));
+ }
+
+ //Try to do a rebalancing compaction
+ executeStatementOnDriver("ALTER TABLE " + tableName + " COMPACT
'rebalance'", driver);
+ runWorker(conf);
+
+ //Check if the compaction succeed
+ TxnStore txnHandler = TxnUtils.getTxnStore(conf);
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ List<ShowCompactResponseElement> compacts = rsp.getCompacts();
+ Assert.assertEquals("Expecting 1 rows and found " + compacts.size(), 1,
compacts.size());
+ Assert.assertEquals("Expecting compaction state 'ready for cleaning' and
found:" + compacts.get(0).getState(),
+ "ready for cleaning", compacts.get(0).getState());
Review Comment:
Can
[verifySuccessfulCompaction](https://github.com/apache/hive/blob/497553016d804576cdf2262e5869541c71e2efbd/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java#L164)
replace this block?
##########
ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java:
##########
@@ -52,6 +55,7 @@
import static
org.apache.hadoop.hive.ql.lockmgr.TestDbTxnManager2.swapTxnManager;
import static org.mockito.Matchers.any;
+import static org.powermock.api.mockito.PowerMockito.when;
Review Comment:
Can the usage of `powermock` be avoided?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java:
##########
@@ -402,20 +413,14 @@ protected Boolean findNextCompactionAndExecute(boolean
collectGenericStats, bool
todo Find a more generic approach to collecting files in the same
logical bucket to compact within the same
task (currently we're using Tez split grouping).
*/
- QueryCompactor queryCompactor =
QueryCompactorFactory.getQueryCompactor(t, conf, ci);
- computeStats = (queryCompactor == null && collectMrStats) ||
collectGenericStats;
+ Compactor compactor = compactorFactory.getQueryCompactor(msc, t, conf,
ci);
+ computeStats = (compactor == null && collectMrStats) ||
collectGenericStats;
LOG.info("Starting " + ci.type.toString() + " compaction for " +
ci.getFullPartitionName() + ", id:" +
ci.id + " in " + compactionTxn + " with compute stats set to "
+ computeStats);
+ LOG.info("Will compact id: " + ci.id + " with compactor class: " +
compactor.getClass().getName());
- if (queryCompactor != null) {
- LOG.info("Will compact id: " + ci.id + " with query-based compactor
class: "
- + queryCompactor.getClass().getName());
- queryCompactor.runCompaction(conf, t, p, sd, tblValidWriteIds, ci,
dir);
- } else {
- LOG.info("Will compact id: " + ci.id + " via MR job");
- runCompactionViaMrJob(ci, t, p, sd, tblValidWriteIds, jobName, dir);
- }
+ compactor.runCompaction(conf, t, p, sd, tblValidWriteIds, ci, dir);
Review Comment:
Can `compactor` be null here?
##########
ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestWorker.java:
##########
@@ -1040,10 +1042,13 @@ public void testDoesNotGatherStatsIfCompactionFails()
throws Exception {
txnHandler.compact(new CompactionRequest("default", "mtwb",
CompactionType.MINOR));
+ CompactorFactory mockedFactory = Mockito.mock(CompactorFactory.class);
+ when(mockedFactory.getQueryCompactor(any(), any(), any(),
any())).thenThrow(new RuntimeException());
+
Worker worker = Mockito.spy(new Worker());
- Mockito.when(worker.getMrCompactor()).thenThrow(RuntimeException.class);
worker.setConf(conf);
worker.init(new AtomicBoolean(true));
+ Whitebox.setInternalState(worker, "compactorFactory", mockedFactory);
Review Comment:
How about adding a constructor which takes the `compactorFactory` as a
parameter?
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java:
##########
@@ -128,8 +111,8 @@ void runCompactionQueries(HiveConf conf, String
tmpTableName, StorageDescriptor
}
}
for (String query : compactionQueries) {
- LOG.info("Running {} compaction via query: {}",
compactionInfo.isMajorCompaction() ? "major" : "minor", query);
- if (!compactionInfo.isMajorCompaction()) {
+ LOG.info("Running {} compaction via query: {}", compactionInfo.type,
query);
+ if (compactionInfo.type.equals(CompactionType.MINOR)) {
Review Comment:
swap operands
##########
ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/StatsUpdater.java:
##########
@@ -52,7 +53,7 @@ public final class StatsUpdater {
*/
public void gatherStats(CompactionInfo ci, HiveConf conf, String userName,
String compactionQueueName) {
try {
- if (!ci.isMajorCompaction()) {
+ if (ci.type.equals(CompactionType.MINOR)) {
Review Comment:
swap operands
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]