Repository: hive Updated Branches: refs/heads/branch-1 b71c687f0 -> e26697daa
HIVE-11357 ACID enable predicate pushdown for insert-only delta file 2 (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e26697da Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e26697da Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e26697da Branch: refs/heads/branch-1 Commit: e26697daaf16f56754386aec0ef1404eeea4936e Parents: b71c687 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Fri Aug 28 12:29:29 2015 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Fri Aug 28 12:29:29 2015 -0700 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 18 ++++- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 85 ++++++++++++++++---- 2 files changed, 88 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e26697da/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index e5bd640..68da26c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -127,7 +127,7 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, /** * When picking the hosts for a split that crosses block boundaries, - * any drop any host that has fewer than MIN_INCLUDED_LOCATION of the + * drop any host that has fewer than MIN_INCLUDED_LOCATION of the * number of bytes available on the host with the most. * If host1 has 10MB of the split, host2 has 20MB, and host3 has 18MB the * split will contain host2 (100% of host2) and host3 (90% of host2). Host1 @@ -1246,6 +1246,22 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } else { bucket = (int) split.getStart(); reader = null; + if(deltas != null && deltas.length > 0) { + Path bucketPath = AcidUtils.createBucketFile(deltas[0], bucket); + OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf); + FileSystem fs = readerOptions.getFilesystem(); + if(fs == null) { + fs = path.getFileSystem(options.getConfiguration()); + } + if(fs.exists(bucketPath)) { + /* w/o schema evolution (which ACID doesn't support yet) all delta + files have the same schema, so choosing the 1st one*/ + final List<OrcProto.Type> types = + OrcFile.createReader(bucketPath, readerOptions).getTypes(); + readOptions.include(genIncludedColumns(types, conf, split.isOriginal())); + setSearchArgument(readOptions, types, conf, split.isOriginal()); + } + } } String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY, Long.MAX_VALUE + ":"); http://git-wip-us.apache.org/repos/asf/hive/blob/e26697da/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 58c2fca..5aa2500 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; +import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.hive.ql.txn.compactor.Worker; @@ -51,7 +52,7 @@ public class TestTxnCommands2 { ).getPath().replaceAll("\\\\", "/"); private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; //bucket count for test tables; set it to 1 for easier debugging - private static int BUCKET_COUNT = 1; + private static int BUCKET_COUNT = 2; @Rule public TestName testName = new TestName(); private HiveConf hiveConf; @@ -107,7 +108,6 @@ public class TestTxnCommands2 { public void tearDown() throws Exception { try { if (d != null) { - // runStatementOnDriver("set autocommit true"); dropTables(); d.destroy(); d.close(); @@ -126,13 +126,51 @@ public class TestTxnCommands2 { public void testOrcNoPPD() throws Exception { testOrcPPD(false); } - private void testOrcPPD(boolean enablePPD) throws Exception { + + /** + * this is run 2 times: 1 with PPD on, 1 with off + * Also, the queries are such that if we were to push predicate down to an update/delete delta, + * the test would produce wrong results + * @param enablePPD + * @throws Exception + */ + private void testOrcPPD(boolean enablePPD) throws Exception { boolean originalPpd = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER); hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, enablePPD);//enables ORC PPD - int[][] tableData = {{1,2},{3,4}}; - runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); - List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b"); - runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + //create delta_0001_0001_0000 (should push predicate here) + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(new int[][]{{1, 2}, {3, 4}})); + List<String> explain; + String query = "update " + Table.ACIDTBL + " set b = 5 where a = 3"; + if (enablePPD) { + explain = runStatementOnDriver("explain " + query); + /* + here is a portion of the above "explain". The "filterExpr:" in the TableScan is the pushed predicate + w/o PPD, the line is simply not there, otherwise the plan is the same + Map Operator Tree:, + TableScan, + alias: acidtbl, + filterExpr: (a = 3) (type: boolean), + Filter Operator, + predicate: (a = 3) (type: boolean), + Select Operator, + ... + */ + assertPredicateIsPushed("filterExpr: (a = 3)", explain); + } + //create delta_0002_0002_0000 (can't push predicate) + runStatementOnDriver(query); + query = "select a,b from " + Table.ACIDTBL + " where b = 4 order by a,b"; + if (enablePPD) { + /*at this point we have 2 delta files, 1 for insert 1 for update + * we should push predicate into 1st one but not 2nd. If the following 'select' were to + * push into the 'update' delta, we'd filter out {3,5} before doing merge and thus + * produce {3,4} as the value for 2nd row. The right result is 0-rows.*/ + explain = runStatementOnDriver("explain " + query); + assertPredicateIsPushed("filterExpr: (b = 4)", explain); + } + List<String> rs0 = runStatementOnDriver(query); + Assert.assertEquals("Read failed", 0, rs0.size()); + runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'MAJOR'"); Worker t = new Worker(); t.setThreadId((int) t.getId()); t.setHiveConf(hiveConf); @@ -142,18 +180,37 @@ public class TestTxnCommands2 { t.init(stop, looped); t.run(); //now we have base_0001 file - int[][] tableData2 = {{1,7},{5,6},{7,8},{9,10}}; + int[][] tableData2 = {{1, 7}, {5, 6}, {7, 8}, {9, 10}}; runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2)); - //now we have delta_0002_0002_0000 with inserts only (ok to push predicate) + //now we have delta_0003_0003_0000 with inserts only (ok to push predicate) + if (enablePPD) { + explain = runStatementOnDriver("explain delete from " + Table.ACIDTBL + " where a=7 and b=8"); + assertPredicateIsPushed("filterExpr: ((a = 7) and (b = 8))", explain); + } runStatementOnDriver("delete from " + Table.ACIDTBL + " where a=7 and b=8"); - //now we have delta_0003_0003_0000 with delete events (can't push predicate) - runStatementOnDriver("update " + Table.ACIDTBL + " set b = 11 where a = 9"); - //and another delta with update op - List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b"); - int [][] resultData = {{3,4},{5,6},{9,11}}; + //now we have delta_0004_0004_0000 with delete events + + /*(can't push predicate to 'delete' delta) + * if we were to push to 'delete' delta, we'd filter out all rows since the 'row' is always NULL for + * delete events and we'd produce data as if the delete never happened*/ + query = "select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b"; + if(enablePPD) { + explain = runStatementOnDriver("explain " + query); + assertPredicateIsPushed("filterExpr: (a > 1)", explain); + } + List<String> rs1 = runStatementOnDriver(query); + int [][] resultData = new int[][] {{3, 5}, {5, 6}, {9, 10}}; Assert.assertEquals("Update failed", stringifyValues(resultData), rs1); hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, originalPpd); } + private static void assertPredicateIsPushed(String ppd, List<String> queryPlan) { + for(String line : queryPlan) { + if(line != null && line.contains(ppd)) { + return; + } + } + Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true); + } @Ignore("alter table") @Test public void testAlterTable() throws Exception {