HIVE-18288 - merge/concat not supported on Acid table (Eugene Koifman, reviewed by Sergey Shelukhin)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4a2bfb8b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4a2bfb8b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4a2bfb8b Branch: refs/heads/master Commit: 4a2bfb8bed36b53b1bcc4eee0e4f916820f335f4 Parents: ab7affe Author: Eugene Koifman <ekoif...@apache.org> Authored: Mon May 7 11:55:16 2018 -0700 Committer: Eugene Koifman <ekoif...@apache.org> Committed: Mon May 7 11:55:16 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 + .../apache/hadoop/hive/ql/metadata/Hive.java | 4 +- .../hive/ql/parse/DDLSemanticAnalyzer.java | 21 ++- .../hadoop/hive/ql/TestTxnConcatenate.java | 159 +++++++++++++++++++ 4 files changed, 177 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4a2bfb8b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index b872827..23a9c74 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -2565,6 +2565,9 @@ public class HiveConf extends Configuration { COMPACTOR_JOB_QUEUE("hive.compactor.job.queue", "", "Used to specify name of Hadoop queue to which\n" + "Compaction jobs will be submitted. Set to empty string to let Hadoop choose the queue."), + TRANSACTIONAL_CONCATENATE_NOBLOCK("hive.transactional.concatenate.noblock", false, + "Will cause 'alter table T concatenate' to be non-blocking"), + HIVE_COMPACTOR_COMPACT_MM("hive.compactor.compact.insert.only", true, "Whether the compactor should compact insert-only tables. A safety switch."), /** http://git-wip-us.apache.org/repos/asf/hive/blob/4a2bfb8b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java index 5dbc478..903470a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java @@ -4407,9 +4407,9 @@ private void constructOneLBLocationMap(FileStatus fSta, throws HiveException { try { CompactionType cr = null; - if ("major".equals(compactType)) { + if ("major".equalsIgnoreCase(compactType)) { cr = CompactionType.MAJOR; - } else if ("minor".equals(compactType)) { + } else if ("minor".equalsIgnoreCase(compactType)) { cr = CompactionType.MINOR; } else { throw new RuntimeException("Unknown compaction type " + compactType); http://git-wip-us.apache.org/repos/asf/hive/blob/4a2bfb8b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index defb8be..f0b9eda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1964,9 +1964,19 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { try { tblObj = getTable(tableName); - // TODO: we should probably block all ACID tables here. - if (AcidUtils.isInsertOnlyTable(tblObj.getParameters())) { - throw new SemanticException("Merge is not supported for MM tables"); + if(AcidUtils.isTransactionalTable(tblObj)) { + LinkedHashMap<String, String> newPartSpec = null; + if (partSpec != null) { + newPartSpec = new LinkedHashMap<>(partSpec); + } + + boolean isBlocking = !HiveConf.getBoolVar(conf, + ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, false); + AlterTableSimpleDesc desc = new AlterTableSimpleDesc( + tableName, newPartSpec, "MAJOR", isBlocking); + + rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), desc))); + return; } mergeDesc.setTableDesc(Utilities.getTableDesc(tblObj)); @@ -2039,11 +2049,6 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { throw new SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_TABLE_NOT_MANAGED.getMsg()); } - // transactional tables are compacted and no longer needs to be bucketed, so not safe for merge/concatenation - boolean isAcid = AcidUtils.isTransactionalTable(tblObj); - if (isAcid) { - throw new SemanticException(ErrorMsg.CONCATENATE_UNSUPPORTED_TABLE_TRANSACTIONAL.getMsg()); - } inputDir.add(oldTblPartLoc); mergeDesc.setInputDir(inputDir); http://git-wip-us.apache.org/repos/asf/hive/blob/4a2bfb8b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java new file mode 100644 index 0000000..92bcefe --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnConcatenate.java @@ -0,0 +1,159 @@ +package org.apache.hadoop.hive.ql; + +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.metastore.api.ShowCompactRequest; +import org.apache.hadoop.hive.metastore.api.ShowCompactResponse; +import org.apache.hadoop.hive.metastore.txn.TxnStore; +import org.apache.hadoop.hive.metastore.txn.TxnUtils; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; + +public class TestTxnConcatenate extends TxnCommandsBaseForTests { + static final private Logger LOG = LoggerFactory.getLogger(TestTxnConcatenate.class); + private static final String TEST_DATA_DIR = + new File(System.getProperty("java.io.tmpdir") + + File.separator + TestTxnLoadData.class.getCanonicalName() + + "-" + System.currentTimeMillis() + ).getPath().replaceAll("\\\\", "/"); + @Rule + public TemporaryFolder folder = new TemporaryFolder(); + + @Override + String getTestDataDir() { + return TEST_DATA_DIR; + } + + @Test + public void testConcatenate() throws Exception { + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(1,2),(4,5)"); + runStatementOnDriver("update " + Table.ACIDTBL + " set b = 4"); + runStatementOnDriver("insert into " + Table.ACIDTBL + " values(5,6),(8,8)"); + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDTBL + " order by a, b"; + String[][] expected = new String[][] { + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t1\t4", + "acidtbl/delta_0000002_0000002_0000/bucket_00001"}, + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4", + "acidtbl/delta_0000002_0000002_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t5\t6", + "acidtbl/delta_0000003_0000003_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", + "acidtbl/delta_0000003_0000003_0000/bucket_00001"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker() + but in normal usage 'concatenate' is blocking, */ + hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true); + runStatementOnDriver("alter table " + Table.ACIDTBL + " concatenate"); + + TxnStore txnStore = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.INITIATED_RESPONSE, rsp.getCompacts().get(0).getState()); + runWorker(hiveConf); + rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + String[][] expected2 = new String[][] { + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":1}\t1\t4", + "acidtbl/base_0000003/bucket_00001"}, + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t4\t4", + "acidtbl/base_0000003/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":1}\t5\t6", + "acidtbl/base_0000003/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", + "acidtbl/base_0000003/bucket_00001"}}; + checkResult(expected2, testQuery, false, "check data after concatenate", LOG); + } + @Test + public void testConcatenatePart() throws Exception { + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " values(1,2,'p1'),(4,5,'p2')"); + runStatementOnDriver("update " + Table.ACIDTBLPART + " set b = 4 where p='p1'"); + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " values(5,6,'p1'),(8,8,'p2')"); + String testQuery = "select ROW__ID, a, b, INPUT__FILE__NAME from " + Table.ACIDTBLPART + " order by a, b"; + String[][] expected = new String[][] { + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4", + "acidtblpart/p=p1/delta_0000002_0000002_0000/bucket_00001"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5", + "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6", + "acidtblpart/p=p1/delta_0000003_0000003_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", + "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker() + but in normal usage 'concatenate' is blocking, */ + hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true); + runStatementOnDriver("alter table " + Table.ACIDTBLPART + " PARTITION(p='p1') concatenate"); + + TxnStore txnStore = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.INITIATED_RESPONSE, rsp.getCompacts().get(0).getState()); + runWorker(hiveConf); + rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + String[][] expected2 = new String[][] { + {"{\"writeid\":2,\"bucketid\":536936448,\"rowid\":0}\t1\t4", + "acidtblpart/p=p1/base_0000003/bucket_00001"}, + {"{\"writeid\":1,\"bucketid\":536936448,\"rowid\":0}\t4\t5", + "acidtblpart/p=p2/delta_0000001_0000001_0000/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t5\t6", + "acidtblpart/p=p1/base_0000003/bucket_00001"}, + {"{\"writeid\":3,\"bucketid\":536936448,\"rowid\":0}\t8\t8", + "acidtblpart/p=p2/delta_0000003_0000003_0000/bucket_00001"}}; + + checkResult(expected2, testQuery, false, "check data after concatenate", LOG); + } + + @Test + public void testConcatenateMM() throws Exception { + HiveConf.setBoolVar(hiveConf, HiveConf.ConfVars.HIVE_CREATE_TABLES_AS_INSERT_ONLY, true); + runStatementOnDriver("drop table if exists T"); + runStatementOnDriver("create table T(a int, b int)"); + runStatementOnDriver("insert into T values(1,2),(4,5)"); + runStatementOnDriver("insert into T values(5,6),(8,8)"); + String testQuery = "select a, b, INPUT__FILE__NAME from T order by a, b"; + String[][] expected = new String[][] { + {"1\t2", + "t/delta_0000001_0000001_0000/000000_0"}, + {"4\t5", + "t/delta_0000001_0000001_0000/000000_0"}, + {"5\t6", + "t/delta_0000002_0000002_0000/000000_0"}, + {"8\t8", + "t/delta_0000002_0000002_0000/000000_0"}}; + checkResult(expected, testQuery, false, "check data", LOG); + + /*in UTs, there is no standalone HMS running to kick off compaction so it's done via runWorker() + but in normal usage 'concatenate' is blocking, */ + hiveConf.setBoolVar(HiveConf.ConfVars.TRANSACTIONAL_CONCATENATE_NOBLOCK, true); + runStatementOnDriver("alter table T concatenate"); + + TxnStore txnStore = TxnUtils.getTxnStore(hiveConf); + ShowCompactResponse rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.INITIATED_RESPONSE, rsp.getCompacts().get(0).getState()); + runWorker(hiveConf); + rsp = txnStore.showCompact(new ShowCompactRequest()); + Assert.assertEquals(1, rsp.getCompactsSize()); + Assert.assertEquals(TxnStore.CLEANING_RESPONSE, rsp.getCompacts().get(0).getState()); + String[][] expected2 = new String[][] { + {"1\t2", + "t/base_0000002/000000_0"}, + {"4\t5", + "t/base_0000002/000000_0"}, + {"5\t6", + "t/base_0000002/000000_0"}, + {"8\t8", + "t/base_0000002/000000_0"}}; + checkResult(expected2, testQuery, false, "check data after concatenate", LOG); + } +}