Repository: hive
Updated Branches:
  refs/heads/branch-1 b71c687f0 -> e26697daa


HIVE-11357 ACID enable predicate pushdown for insert-only delta file 2 (Eugene 
Koifman, reviewed by Alan Gates)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e26697da
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e26697da
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e26697da

Branch: refs/heads/branch-1
Commit: e26697daaf16f56754386aec0ef1404eeea4936e
Parents: b71c687
Author: Eugene Koifman <ekoif...@hortonworks.com>
Authored: Fri Aug 28 12:29:29 2015 -0700
Committer: Eugene Koifman <ekoif...@hortonworks.com>
Committed: Fri Aug 28 12:29:29 2015 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/io/orc/OrcInputFormat.java   | 18 ++++-
 .../apache/hadoop/hive/ql/TestTxnCommands2.java | 85 ++++++++++++++++----
 2 files changed, 88 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/e26697da/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index e5bd640..68da26c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@ -127,7 +127,7 @@ public class OrcInputFormat  implements 
InputFormat<NullWritable, OrcStruct>,
 
   /**
    * When picking the hosts for a split that crosses block boundaries,
-   * any drop any host that has fewer than MIN_INCLUDED_LOCATION of the
+   * drop any host that has fewer than MIN_INCLUDED_LOCATION of the
    * number of bytes available on the host with the most.
    * If host1 has 10MB of the split, host2 has 20MB, and host3 has 18MB the
    * split will contain host2 (100% of host2) and host3 (90% of host2). Host1
@@ -1246,6 +1246,22 @@ public class OrcInputFormat  implements 
InputFormat<NullWritable, OrcStruct>,
     } else {
       bucket = (int) split.getStart();
       reader = null;
+      if(deltas != null && deltas.length > 0) {
+        Path bucketPath = AcidUtils.createBucketFile(deltas[0], bucket);
+        OrcFile.ReaderOptions readerOptions = OrcFile.readerOptions(conf);
+        FileSystem fs = readerOptions.getFilesystem();
+        if(fs == null) {
+          fs = path.getFileSystem(options.getConfiguration());
+        }
+        if(fs.exists(bucketPath)) {
+        /* w/o schema evolution (which ACID doesn't support yet) all delta
+        files have the same schema, so choosing the 1st one*/
+          final List<OrcProto.Type> types =
+            OrcFile.createReader(bucketPath, readerOptions).getTypes();
+          readOptions.include(genIncludedColumns(types, conf, 
split.isOriginal()));
+          setSearchArgument(readOptions, types, conf, split.isOriginal());
+        }
+      }
     }
     String txnString = conf.get(ValidTxnList.VALID_TXNS_KEY,
                                 Long.MAX_VALUE + ":");

http://git-wip-us.apache.org/repos/asf/hive/blob/e26697da/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 58c2fca..5aa2500 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -22,6 +22,7 @@ import org.apache.commons.io.FileUtils;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.txn.compactor.Worker;
@@ -51,7 +52,7 @@ public class TestTxnCommands2 {
   ).getPath().replaceAll("\\\\", "/");
   private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + 
"/warehouse";
   //bucket count for test tables; set it to 1 for easier debugging
-  private static int BUCKET_COUNT = 1;
+  private static int BUCKET_COUNT = 2;
   @Rule
   public TestName testName = new TestName();
   private HiveConf hiveConf;
@@ -107,7 +108,6 @@ public class TestTxnCommands2 {
   public void tearDown() throws Exception {
     try {
       if (d != null) {
-     //   runStatementOnDriver("set autocommit true");
         dropTables();
         d.destroy();
         d.close();
@@ -126,13 +126,51 @@ public class TestTxnCommands2 {
   public void testOrcNoPPD() throws Exception {
     testOrcPPD(false);
   }
-  private void testOrcPPD(boolean enablePPD) throws Exception  {
+
+  /**
+   * this is run 2 times: 1 with PPD on, 1 with off
+   * Also, the queries are such that if we were to push predicate down to an 
update/delete delta,
+   * the test would produce wrong results
+   * @param enablePPD
+   * @throws Exception
+   */
+  private void testOrcPPD(boolean enablePPD) throws Exception {
     boolean originalPpd = 
hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER);
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, 
enablePPD);//enables ORC PPD
-    int[][] tableData = {{1,2},{3,4}};
-    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + 
makeValuesClause(tableData));
-    List<String> rs2 = runStatementOnDriver("select a,b from " + Table.ACIDTBL 
+ " where a > 1 order by a,b");
-    runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'");
+    //create delta_0001_0001_0000 (should push predicate here)
+    runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + 
makeValuesClause(new int[][]{{1, 2}, {3, 4}}));
+    List<String> explain;
+    String query = "update " + Table.ACIDTBL + " set b = 5 where a = 3";
+    if (enablePPD) {
+      explain = runStatementOnDriver("explain " + query);
+      /*
+      here is a portion of the above "explain".  The "filterExpr:" in the 
TableScan is the pushed predicate
+      w/o PPD, the line is simply not there, otherwise the plan is the same
+       Map Operator Tree:,
+         TableScan,
+          alias: acidtbl,
+          filterExpr: (a = 3) (type: boolean),
+            Filter Operator,
+             predicate: (a = 3) (type: boolean),
+             Select Operator,
+             ...
+       */
+      assertPredicateIsPushed("filterExpr: (a = 3)", explain);
+    }
+    //create delta_0002_0002_0000 (can't push predicate)
+    runStatementOnDriver(query);
+    query = "select a,b from " + Table.ACIDTBL + " where b = 4 order by a,b";
+    if (enablePPD) {
+      /*at this point we have 2 delta files, 1 for insert 1 for update
+      * we should push predicate into 1st one but not 2nd.  If the following 
'select' were to
+      * push into the 'update' delta, we'd filter out {3,5} before doing merge 
and thus
+     * produce {3,4} as the value for 2nd row.  The right result is 0-rows.*/
+      explain = runStatementOnDriver("explain " + query);
+      assertPredicateIsPushed("filterExpr: (b = 4)", explain);
+    }
+    List<String> rs0 = runStatementOnDriver(query);
+    Assert.assertEquals("Read failed", 0, rs0.size());
+    runStatementOnDriver("alter table " + Table.ACIDTBL + " compact 'MAJOR'");
     Worker t = new Worker();
     t.setThreadId((int) t.getId());
     t.setHiveConf(hiveConf);
@@ -142,18 +180,37 @@ public class TestTxnCommands2 {
     t.init(stop, looped);
     t.run();
     //now we have base_0001 file
-    int[][] tableData2 = {{1,7},{5,6},{7,8},{9,10}};
+    int[][] tableData2 = {{1, 7}, {5, 6}, {7, 8}, {9, 10}};
     runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + 
makeValuesClause(tableData2));
-    //now we have delta_0002_0002_0000 with inserts only (ok to push predicate)
+    //now we have delta_0003_0003_0000 with inserts only (ok to push predicate)
+    if (enablePPD) {
+      explain = runStatementOnDriver("explain delete from " + Table.ACIDTBL + 
" where a=7 and b=8");
+      assertPredicateIsPushed("filterExpr: ((a = 7) and (b = 8))", explain);
+    }
     runStatementOnDriver("delete from " + Table.ACIDTBL + " where a=7 and 
b=8");
-    //now we have delta_0003_0003_0000 with delete events (can't push 
predicate)
-    runStatementOnDriver("update " + Table.ACIDTBL + " set b = 11 where a = 
9");
-    //and another delta with update op
-    List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL 
+ " where a > 1 order by a,b");
-    int [][] resultData = {{3,4},{5,6},{9,11}};
+    //now we have delta_0004_0004_0000 with delete events
+
+    /*(can't push predicate to 'delete' delta)
+    * if we were to push to 'delete' delta, we'd filter out all rows since the 
'row' is always NULL for
+    * delete events and we'd produce data as if the delete never happened*/
+    query = "select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b";
+    if(enablePPD) {
+      explain = runStatementOnDriver("explain " + query);
+      assertPredicateIsPushed("filterExpr: (a > 1)", explain);
+    }
+    List<String> rs1 = runStatementOnDriver(query);
+    int [][] resultData = new int[][] {{3, 5}, {5, 6}, {9, 10}};
     Assert.assertEquals("Update failed", stringifyValues(resultData), rs1);
     hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, originalPpd);
   }
+  private static void assertPredicateIsPushed(String ppd, List<String> 
queryPlan) {
+    for(String line : queryPlan) {
+      if(line != null && line.contains(ppd)) {
+        return;
+      }
+    }
+    Assert.assertFalse("PPD '" + ppd + "' wasn't pushed", true);
+  }
   @Ignore("alter table")
   @Test
   public void testAlterTable() throws Exception {

Reply via email to