Repository: hive Updated Branches: refs/heads/branch-1.2 0273771d6 -> 343486b65
HIVE-10151 - insert into A select from B is broken when both A and B are Acid tables and bucketed the same way (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/343486b6 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/343486b6 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/343486b6 Branch: refs/heads/branch-1.2 Commit: 343486b6542ab75f4b6049b56973d9a5d4c5a495 Parents: 0273771 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Fri May 1 09:34:37 2015 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Fri May 1 09:34:37 2015 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/ql/exec/Operator.java | 4 ++++ .../BucketingSortingReduceSinkOptimizer.java | 12 ++++++++++++ .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 2 +- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 18 +++++++++++++++++- 4 files changed, 34 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/343486b6/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 5856cfd..d7f1b42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@ -1181,6 +1181,10 @@ public abstract class Operator<T extends OperatorDesc> implements Serializable,C return useBucketizedHiveInputFormat; } + /** + * Before setting this to {@code true} make sure it's not reading ACID tables + * @param useBucketizedHiveInputFormat + */ public void setUseBucketizedHiveInputFormat(boolean useBucketizedHiveInputFormat) { this.useBucketizedHiveInputFormat = useBucketizedHiveInputFormat; } http://git-wip-us.apache.org/repos/asf/hive/blob/343486b6/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java index 76cc540..7cb0f15 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/BucketingSortingReduceSinkOptimizer.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.ParseContext; import org.apache.hadoop.hive.ql.parse.PrunedPartitionList; +import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.ExprNodeColumnDesc; import org.apache.hadoop.hive.ql.plan.ExprNodeDesc; @@ -215,6 +216,9 @@ public class BucketingSortingReduceSinkOptimizer implements Transform { private void storeBucketPathMapping(TableScanOperator tsOp, FileStatus[] srcs) { Map<String, Integer> bucketFileNameMapping = new HashMap<String, Integer>(); for (int pos = 0; pos < srcs.length; pos++) { + if(!srcs[pos].isFile()) { + throw new RuntimeException("Was expecting '" + srcs[pos].getPath() + "' to be bucket file."); + } bucketFileNameMapping.put(srcs[pos].getPath().getName(), pos); } tsOp.getConf().setBucketFileNameMapping(bucketFileNameMapping); @@ -376,6 +380,14 @@ public class BucketingSortingReduceSinkOptimizer implements Transform { return null; } + if(stack.get(0) instanceof TableScanOperator) { + TableScanOperator tso = ((TableScanOperator)stack.get(0)); + if(SemanticAnalyzer.isAcidTable(tso.getConf().getTableMetadata())) { + /*ACID tables have complex directory layout and require merging of delta files + * on read thus we should not try to read bucket files directly*/ + return null; + } + } // Support for dynamic partitions can be added later if (fsOp.getConf().getDynPartCtx() != null) { return null; http://git-wip-us.apache.org/repos/asf/hive/blob/343486b6/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index 8e65b59..1d2c764 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -12036,7 +12036,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // Even if the table is of Acid type, if we aren't working with an Acid compliant TxnManager // then return false. - private boolean isAcidTable(Table tab) { + public static boolean isAcidTable(Table tab) { if (tab == null) return false; if (!SessionState.get().getTxnMgr().supportsAcid()) return false; String tableIsTransactional = http://git-wip-us.apache.org/repos/asf/hive/blob/343486b6/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 1431e19..3c987dd 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -98,8 +98,8 @@ public class TestTxnCommands2 { d.destroy(); d.close(); d = null; - TxnDbUtil.cleanDb(); } + TxnDbUtil.cleanDb(); } finally { FileUtils.deleteDirectory(new File(TEST_DATA_DIR)); } @@ -142,6 +142,22 @@ public class TestTxnCommands2 { Assert.assertEquals("Bulk update2 failed", stringifyValues(updatedData2), rs2); } + /** + * https://issues.apache.org/jira/browse/HIVE-10151 + */ + @Test + public void testBucketizedInputFormat() throws Exception { + int[][] tableData = {{1,2}}; + runStatementOnDriver("insert into " + Table.ACIDTBLPART + " partition(p=1) (a,b) " + makeValuesClause(tableData)); + + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) select a,b from " + Table.ACIDTBLPART + " where p = 1"); + List<String> rs = runStatementOnDriver("select a,b from " + Table.ACIDTBL);//no order by as it's just 1 row + Assert.assertEquals("Insert into " + Table.ACIDTBL + " didn't match:", stringifyValues(tableData), rs); + + runStatementOnDriver("insert into " + Table.NONACIDORCTBL + "(a,b) select a,b from " + Table.ACIDTBLPART + " where p = 1"); + List<String> rs2 = runStatementOnDriver("select a,b from " + Table.NONACIDORCTBL);//no order by as it's just 1 row + Assert.assertEquals("Insert into " + Table.NONACIDORCTBL + " didn't match:", stringifyValues(tableData), rs2); + } @Test public void testInsertOverwriteWithSelfJoin() throws Exception { int[][] part1Data = {{1,7}};