Repository: hive Updated Branches: refs/heads/branch-1 684d0e5e1 -> 77aefd6c8
HIVE-11320 ACID enable predicate pushdown for insert-only delta file (Eugene Koifman, reviewed by Alan Gates) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/77aefd6c Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/77aefd6c Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/77aefd6c Branch: refs/heads/branch-1 Commit: 77aefd6c8d0a59bdc20a3ba74ccec1e955888fcb Parents: 684d0e5 Author: Eugene Koifman <ekoif...@hortonworks.com> Authored: Tue Jul 21 11:57:03 2015 -0700 Committer: Eugene Koifman <ekoif...@hortonworks.com> Committed: Tue Jul 21 11:57:03 2015 -0700 ---------------------------------------------------------------------- .../hive/ql/io/orc/OrcRawRecordMerger.java | 20 ++++-- .../apache/hadoop/hive/ql/TestTxnCommands2.java | 68 ++++++++++++++++++-- 2 files changed, 75 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/77aefd6c/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java index 2f11611..58b85ef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRawRecordMerger.java @@ -478,10 +478,6 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ // we always want to read all of the deltas eventOptions.range(0, Long.MAX_VALUE); - // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as - // it can produce wrong results (if the latest valid version of the record is filtered out by - // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record) - eventOptions.searchArgument(null, null); if (deltaDirectory != null) { for(Path delta: deltaDirectory) { ReaderKey key = new ReaderKey(); @@ -492,8 +488,20 @@ public class OrcRawRecordMerger implements AcidInputFormat.RawReader<OrcStruct>{ if (length != -1 && fs.exists(deltaFile)) { Reader deltaReader = OrcFile.createReader(deltaFile, OrcFile.readerOptions(conf).maxLength(length)); - ReaderPair deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, - maxKey, eventOptions, deltaDir.getStatementId()); + Reader.Options deltaEventOptions = null; + if(eventOptions.getSearchArgument() != null) { + // Turn off the sarg before pushing it to delta. We never want to push a sarg to a delta as + // it can produce wrong results (if the latest valid version of the record is filtered out by + // the sarg) or ArrayOutOfBounds errors (when the sarg is applied to a delete record) + // unless the delta only has insert events + OrcRecordUpdater.AcidStats acidStats = OrcRecordUpdater.parseAcidStats(deltaReader); + if(acidStats.deletes > 0 || acidStats.updates > 0) { + deltaEventOptions = eventOptions.clone().searchArgument(null, null); + } + } + ReaderPair deltaPair; + deltaPair = new ReaderPair(key, deltaReader, bucket, minKey, + maxKey, deltaEventOptions != null ? deltaEventOptions : eventOptions, deltaDir.getStatementId()); if (deltaPair.nextRecord != null) { readers.put(key, deltaPair); } http://git-wip-us.apache.org/repos/asf/hive/blob/77aefd6c/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java index 33ca998..57e4fb9 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java @@ -20,13 +20,11 @@ package org.apache.hadoop.hive.ql; import org.apache.commons.io.FileUtils; import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.txn.TxnDbUtil; -import org.apache.hadoop.hive.ql.io.AcidUtils; -import org.apache.hadoop.hive.ql.io.orc.FileDump; import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse; import org.apache.hadoop.hive.ql.session.SessionState; +import org.apache.hadoop.hive.ql.txn.compactor.Worker; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -36,13 +34,11 @@ import org.junit.Test; import org.junit.rules.TestName; import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FilenameFilter; import java.util.ArrayList; import java.util.Arrays; import java.util.Comparator; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; /** * TODO: this should be merged with TestTxnCommands once that is checked in @@ -55,7 +51,7 @@ public class TestTxnCommands2 { ).getPath().replaceAll("\\\\", "/"); private static final String TEST_WAREHOUSE_DIR = TEST_DATA_DIR + "/warehouse"; //bucket count for test tables; set it to 1 for easier debugging - private static int BUCKET_COUNT = 2; + private static int BUCKET_COUNT = 1; @Rule public TestName testName = new TestName(); private HiveConf hiveConf; @@ -122,6 +118,64 @@ public class TestTxnCommands2 { FileUtils.deleteDirectory(new File(TEST_DATA_DIR)); } } + @Test + public void testOrcPPD() throws Exception { + testOrcPPD(true); + } + @Test + public void testOrcNoPPD() throws Exception { + testOrcPPD(false); + } + private void testOrcPPD(boolean enablePPD) throws Exception { + boolean originalPpd = hiveConf.getBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, enablePPD);//enables ORC PPD + int[][] tableData = {{1,2},{3,4}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(hiveConf); + AtomicBoolean stop = new AtomicBoolean(); + AtomicBoolean looped = new AtomicBoolean(); + stop.set(true); + t.init(stop, looped); + t.run(); + //now we have base_0001 file + int[][] tableData2 = {{1,7},{5,6},{7,8},{9,10}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2)); + //now we have delta_0002_0002_0000 with inserts only (ok to push predicate) + runStatementOnDriver("delete from " + Table.ACIDTBL + " where a=7 and b=8"); + //now we have delta_0003_0003_0000 with delete events (can't push predicate) + runStatementOnDriver("update " + Table.ACIDTBL + " set b = 11 where a = 9"); + //and another delta with update op + List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where a > 1 order by a,b"); + int [][] resultData = {{3,4},{5,6},{9,11}}; + Assert.assertEquals("Update failed", stringifyValues(resultData), rs1); + hiveConf.setBoolVar(HiveConf.ConfVars.HIVEOPTINDEXFILTER, originalPpd); + } + @Ignore("alter table") + @Test + public void testAlterTable() throws Exception { + int[][] tableData = {{1,2}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData)); + runStatementOnDriver("alter table "+ Table.ACIDTBL + " compact 'MAJOR'"); + Worker t = new Worker(); + t.setThreadId((int) t.getId()); + t.setHiveConf(hiveConf); + AtomicBoolean stop = new AtomicBoolean(); + AtomicBoolean looped = new AtomicBoolean(); + stop.set(true); + t.init(stop, looped); + t.run(); + int[][] tableData2 = {{5,6}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b) " + makeValuesClause(tableData2)); + List<String> rs1 = runStatementOnDriver("select a,b from " + Table.ACIDTBL + " where b > 0 order by a,b"); + + runStatementOnDriver("alter table " + Table.ACIDTBL + " add columns(c int)"); + int[][] moreTableData = {{7,8,9}}; + runStatementOnDriver("insert into " + Table.ACIDTBL + "(a,b,c) " + makeValuesClause(moreTableData)); + List<String> rs0 = runStatementOnDriver("select a,b,c from " + Table.ACIDTBL + " where a > 0 order by a,b,c"); + } @Ignore("not needed but useful for testing") @Test public void testNonAcidInsert() throws Exception {