Repository: hive Updated Branches: refs/heads/master 334c8cae4 -> 9fe65dae8
HIVE-19214: High throughput ingest ORC format (Prasanth Jayachandran reviewed by Gopal V) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/9fe65dae Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/9fe65dae Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/9fe65dae Branch: refs/heads/master Commit: 9fe65dae8938d497e463d2be061fb0591df6c7f7 Parents: 334c8ca Author: Prasanth Jayachandran <prasan...@apache.org> Authored: Mon Apr 23 09:52:08 2018 -0700 Committer: Prasanth Jayachandran <prasan...@apache.org> Committed: Mon Apr 23 09:52:08 2018 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 3 + .../llap/io/decode/OrcEncodedDataConsumer.java | 13 +++- .../llap/io/encoded/OrcEncodedDataReader.java | 10 +-- .../llap/io/metadata/OrcStripeMetadata.java | 17 +++++ .../hadoop/hive/ql/io/orc/OrcInputFormat.java | 5 ++ .../hadoop/hive/ql/io/orc/OrcRecordUpdater.java | 7 ++ .../ql/io/orc/encoded/EncodedReaderImpl.java | 36 +++++++--- .../queries/clientpositive/orc_ppd_exception.q | 13 ++++ .../test/queries/clientpositive/vector_acid3.q | 10 +++ .../clientpositive/llap/vector_acid3.q.out | 34 +++++++++ .../clientpositive/orc_ppd_exception.q.out | 57 ++++++++++++++++ .../results/clientpositive/vector_acid3.q.out | 34 +++++++++ .../apache/hive/streaming/TestStreaming.java | 72 ++++++++++++++++++++ 13 files changed, 293 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 536c7b4..2403d7a 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -1874,6 +1874,9 @@ public class HiveConf extends Configuration { HIVE_ORC_BASE_DELTA_RATIO("hive.exec.orc.base.delta.ratio", 8, "The ratio of base writer and\n" + "delta writer in terms of STRIPE_SIZE and BUFFER_SIZE."), + HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED("hive.exec.orc.delta.streaming.optimizations.enabled", false, + "Whether to enable streaming optimizations for ORC delta files. This will disable ORC's internal indexes,\n" + + "disable compression, enable fast encoding and disable dictionary encoding."), HIVE_ORC_SPLIT_STRATEGY("hive.exec.orc.split.strategy", "HYBRID", new StringSet("HYBRID", "BI", "ETL"), "This is not a user level config. BI strategy is used when the requirement is to spend less time in split generation" + " as opposed to query execution (split generation does not read or cache file footers)." + http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 9e8ae10..fc0c66a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -113,18 +113,25 @@ public class OrcEncodedDataConsumer ConsumerStripeMetadata stripeMetadata = stripes.get(currentStripeIndex); // Get non null row count from root column, to get max vector batches int rgIdx = batch.getBatchKey().rgIx; - long nonNullRowCount = -1; + long nonNullRowCount; + boolean noIndex = false; if (rgIdx == OrcEncodedColumnBatch.ALL_RGS) { nonNullRowCount = stripeMetadata.getRowCount(); } else { OrcProto.RowIndexEntry rowIndex = stripeMetadata.getRowIndexEntry(0, rgIdx); - nonNullRowCount = getRowCount(rowIndex); + // index is disabled + if (rowIndex == null) { + nonNullRowCount = stripeMetadata.getRowCount(); + noIndex = true; + } else { + nonNullRowCount = getRowCount(rowIndex); + } } int maxBatchesRG = (int) ((nonNullRowCount / VectorizedRowBatch.DEFAULT_SIZE) + 1); int batchSize = VectorizedRowBatch.DEFAULT_SIZE; TypeDescription fileSchema = fileMetadata.getSchema(); - if (columnReaders == null || !sameStripe) { + if (columnReaders == null || !sameStripe || noIndex) { createColumnReaders(batch, stripeMetadata, fileSchema); } else { repositionInStreams(this.columnReaders, batch, sameStripe, stripeMetadata); http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index afb8fc5..4033b37 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@ -692,10 +692,12 @@ public class OrcEncodedDataReader extends CallableWithNdc<Void> return result; } finally { try { - if (isPool && !isCodecError) { - OrcCodecPool.returnCodec(kind, codec); - } else { - codec.close(); + if (codec != null) { + if (isPool && !isCodecError) { + OrcCodecPool.returnCodec(kind, codec); + } else { + codec.close(); + } } } catch (Exception ex) { LOG.error("Ignoring codec cleanup error", ex); http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java ---------------------------------------------------------------------- diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java index 3d9e99c..382d398 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/metadata/OrcStripeMetadata.java @@ -88,11 +88,28 @@ public class OrcStripeMetadata implements ConsumerStripeMetadata { @Override public RowIndexEntry getRowIndexEntry(int colIx, int rgIx) { + if (rowIndex == null || rowIndex.getRowGroupIndex()[colIx] == null) { + return null; + } return rowIndex.getRowGroupIndex()[colIx].getEntry(rgIx); } @Override public boolean supportsRowIndexes() { + if (rowIndex == null) { + return false; + } + // if all row indexes are null then indexes are disabled + boolean allNulls = true; + for (OrcProto.RowIndex rowIndex : rowIndex.getRowGroupIndex()) { + if (rowIndex != null) { + allNulls = false; + break; + } + } + if (allNulls) { + return false; + } return true; } http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index fe109d7..dc6cc62 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@ -2199,6 +2199,11 @@ public class OrcInputFormat implements InputFormat<NullWritable, OrcStruct>, } else { // column statistics at index 0 contains only the number of rows ColumnStatistics stats = stripeStatistics.getColumnStatistics()[filterColumns[pred]]; + // if row count is 0 and where there are no nulls it means index is disabled and we don't have stats + if (stats.getNumberOfValues() == 0 && !stats.hasNull()) { + truthValues[pred] = TruthValue.YES_NO_NULL; + continue; + } PredicateLeaf leaf = predLeaves.get(pred); try { truthValues[pred] = RecordReaderImpl.evaluatePredicate(stats, leaf, null); http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java index d850062..09f8802 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcRecordUpdater.java @@ -294,6 +294,13 @@ public class OrcRecordUpdater implements RecordUpdater { writerOptions.bufferSize(baseBufferSizeValue / ratio); writerOptions.stripeSize(baseStripeSizeValue / ratio); writerOptions.blockPadding(false); + if (optionsCloneForDelta.getConfiguration().getBoolean( + HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED.varname, false)) { + writerOptions.compress(CompressionKind.NONE); + writerOptions.encodingStrategy(org.apache.orc.OrcFile.EncodingStrategy.SPEED); + writerOptions.rowIndexStride(0); + writerOptions.getConfiguration().set(OrcConf.DICTIONARY_KEY_SIZE_THRESHOLD.getAttribute(), "-1.0"); + } } writerOptions.fileSystem(fs).callback(indexBuilder); rowInspector = (StructObjectInspector)options.getInspector(); http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 4e17394..1d7eceb 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -395,7 +395,7 @@ class EncodedReaderImpl implements EncodedReader { // We go by RG and not by column because that is how data is processed. boolean hasError = true; try { - int rgCount = (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride); + int rgCount = rowIndexStride == 0 ? 1 : (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride); for (int rgIx = 0; rgIx < rgCount; ++rgIx) { if (rgs != null && !rgs[rgIx]) { continue; // RG filtered. @@ -409,19 +409,31 @@ class EncodedReaderImpl implements EncodedReader { ecb.init(fileKey, stripeIx, rgIx, physicalFileIncludes.length); for (int colIx = 0; colIx < colCtxs.length; ++colIx) { ColumnReadContext ctx = colCtxs[colIx]; - if (ctx == null) continue; // This column is not included. + if (ctx == null) continue; // This column is not included + + OrcProto.RowIndexEntry index; + OrcProto.RowIndexEntry nextIndex; + // index is disabled + if (ctx.rowIndex == null) { + if (isTracingEnabled) { + LOG.trace("Row index is null. Likely reading a file with indexes disabled."); + } + index = null; + nextIndex = null; + } else { + index = ctx.rowIndex.getEntry(rgIx); + nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1); + } if (isTracingEnabled) { LOG.trace("ctx: {} rgIx: {} isLastRg: {} rgCount: {}", ctx, rgIx, isLastRg, rgCount); } - OrcProto.RowIndexEntry index = ctx.rowIndex.getEntry(rgIx), - nextIndex = isLastRg ? null : ctx.rowIndex.getEntry(rgIx + 1); ecb.initOrcColumn(ctx.colIx); trace.logStartCol(ctx.colIx); for (int streamIx = 0; streamIx < ctx.streamCount; ++streamIx) { StreamContext sctx = ctx.streams[streamIx]; - ColumnStreamData cb = null; + ColumnStreamData cb; try { - if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding)) { + if (RecordReaderUtils.isDictionary(sctx.kind, ctx.encoding) || index == null) { // This stream is for entire stripe and needed for every RG; uncompress once and reuse. if (isTracingEnabled) { LOG.trace("Getting stripe-level stream [" + sctx.kind + ", " + ctx.encoding + "] for" @@ -685,12 +697,14 @@ class EncodedReaderImpl implements EncodedReader { @Override public void close() throws IOException { try { - if (isCodecFromPool && !isCodecFailure) { - OrcCodecPool.returnCodec(compressionKind, codec); - } else { - codec.close(); + if (codec != null) { + if (isCodecFromPool && !isCodecFailure) { + OrcCodecPool.returnCodec(compressionKind, codec); + } else { + codec.close(); + } + codec = null; } - codec = null; } catch (Exception ex) { LOG.error("Ignoring error from codec", ex); } finally { http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/ql/src/test/queries/clientpositive/orc_ppd_exception.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/orc_ppd_exception.q b/ql/src/test/queries/clientpositive/orc_ppd_exception.q index 1513d91..a9db8f0 100644 --- a/ql/src/test/queries/clientpositive/orc_ppd_exception.q +++ b/ql/src/test/queries/clientpositive/orc_ppd_exception.q @@ -12,3 +12,16 @@ insert into table test_acid values (1, '2014-09-14 12:34:30'); delete from test_acid where ts = '2014-15-16 17:18:19.20'; select i,ts from test_acid where ts = '2014-15-16 17:18:19.20'; select i,ts from test_acid where ts <= '2014-09-14 12:34:30'; + +drop table test_acid; +set hive.exec.orc.delta.streaming.optimizations.enabled=true; + +create table test_acid( i int, ts timestamp) + clustered by (i) into 2 buckets + stored as orc + tblproperties ('transactional'='true'); +insert into table test_acid values (1, '2014-09-14 12:34:30'); +delete from test_acid where ts = '2014-15-16 17:18:19.20'; +select i,ts from test_acid where ts = '2014-15-16 17:18:19.20'; +select i,ts from test_acid where ts <= '2014-09-14 12:34:30'; + http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/ql/src/test/queries/clientpositive/vector_acid3.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/vector_acid3.q b/ql/src/test/queries/clientpositive/vector_acid3.q index d4313f4..d284e52 100644 --- a/ql/src/test/queries/clientpositive/vector_acid3.q +++ b/ql/src/test/queries/clientpositive/vector_acid3.q @@ -15,3 +15,13 @@ set hive.compute.query.using.stats=false; set hive.vectorized.execution.enabled; select count(1) from testacid1; + +drop table testacid1; +set hive.exec.orc.delta.streaming.optimizations.enabled=true; + +create table testacid1(id int) clustered by (id) into 2 buckets stored as orc tblproperties("transactional"="true"); + +insert into table testacid1 values (1),(2),(3),(4); + +select count(1) from testacid1; + http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/ql/src/test/results/clientpositive/llap/vector_acid3.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/llap/vector_acid3.q.out b/ql/src/test/results/clientpositive/llap/vector_acid3.q.out index 46c82fc..396f76e 100644 --- a/ql/src/test/results/clientpositive/llap/vector_acid3.q.out +++ b/ql/src/test/results/clientpositive/llap/vector_acid3.q.out @@ -29,3 +29,37 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@testacid1 #### A masked pattern was here #### 4 +PREHOOK: query: drop table testacid1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@testacid1 +PREHOOK: Output: default@testacid1 +POSTHOOK: query: drop table testacid1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@testacid1 +POSTHOOK: Output: default@testacid1 +PREHOOK: query: create table testacid1(id int) clustered by (id) into 2 buckets stored as orc tblproperties("transactional"="true") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@testacid1 +POSTHOOK: query: create table testacid1(id int) clustered by (id) into 2 buckets stored as orc tblproperties("transactional"="true") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@testacid1 +PREHOOK: query: insert into table testacid1 values (1),(2),(3),(4) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@testacid1 +POSTHOOK: query: insert into table testacid1 values (1),(2),(3),(4) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@testacid1 +POSTHOOK: Lineage: testacid1.id SCRIPT [] +PREHOOK: query: select count(1) from testacid1 +PREHOOK: type: QUERY +PREHOOK: Input: default@testacid1 +#### A masked pattern was here #### +POSTHOOK: query: select count(1) from testacid1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@testacid1 +#### A masked pattern was here #### +4 http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/ql/src/test/results/clientpositive/orc_ppd_exception.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/orc_ppd_exception.q.out b/ql/src/test/results/clientpositive/orc_ppd_exception.q.out index e03eb71..eee9176 100644 --- a/ql/src/test/results/clientpositive/orc_ppd_exception.q.out +++ b/ql/src/test/results/clientpositive/orc_ppd_exception.q.out @@ -47,3 +47,60 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@test_acid #### A masked pattern was here #### 1 2014-09-14 12:34:30 +PREHOOK: query: drop table test_acid +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@test_acid +PREHOOK: Output: default@test_acid +POSTHOOK: query: drop table test_acid +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@test_acid +POSTHOOK: Output: default@test_acid +PREHOOK: query: create table test_acid( i int, ts timestamp) + clustered by (i) into 2 buckets + stored as orc + tblproperties ('transactional'='true') +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@test_acid +POSTHOOK: query: create table test_acid( i int, ts timestamp) + clustered by (i) into 2 buckets + stored as orc + tblproperties ('transactional'='true') +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@test_acid +PREHOOK: query: insert into table test_acid values (1, '2014-09-14 12:34:30') +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@test_acid +POSTHOOK: query: insert into table test_acid values (1, '2014-09-14 12:34:30') +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@test_acid +POSTHOOK: Lineage: test_acid.i SCRIPT [] +POSTHOOK: Lineage: test_acid.ts SCRIPT [] +PREHOOK: query: delete from test_acid where ts = '2014-15-16 17:18:19.20' +PREHOOK: type: QUERY +PREHOOK: Input: default@test_acid +PREHOOK: Output: default@test_acid +POSTHOOK: query: delete from test_acid where ts = '2014-15-16 17:18:19.20' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_acid +POSTHOOK: Output: default@test_acid +PREHOOK: query: select i,ts from test_acid where ts = '2014-15-16 17:18:19.20' +PREHOOK: type: QUERY +PREHOOK: Input: default@test_acid +#### A masked pattern was here #### +POSTHOOK: query: select i,ts from test_acid where ts = '2014-15-16 17:18:19.20' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_acid +#### A masked pattern was here #### +PREHOOK: query: select i,ts from test_acid where ts <= '2014-09-14 12:34:30' +PREHOOK: type: QUERY +PREHOOK: Input: default@test_acid +#### A masked pattern was here #### +POSTHOOK: query: select i,ts from test_acid where ts <= '2014-09-14 12:34:30' +POSTHOOK: type: QUERY +POSTHOOK: Input: default@test_acid +#### A masked pattern was here #### +1 2014-09-14 12:34:30 http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/ql/src/test/results/clientpositive/vector_acid3.q.out ---------------------------------------------------------------------- diff --git a/ql/src/test/results/clientpositive/vector_acid3.q.out b/ql/src/test/results/clientpositive/vector_acid3.q.out index 46c82fc..396f76e 100644 --- a/ql/src/test/results/clientpositive/vector_acid3.q.out +++ b/ql/src/test/results/clientpositive/vector_acid3.q.out @@ -29,3 +29,37 @@ POSTHOOK: type: QUERY POSTHOOK: Input: default@testacid1 #### A masked pattern was here #### 4 +PREHOOK: query: drop table testacid1 +PREHOOK: type: DROPTABLE +PREHOOK: Input: default@testacid1 +PREHOOK: Output: default@testacid1 +POSTHOOK: query: drop table testacid1 +POSTHOOK: type: DROPTABLE +POSTHOOK: Input: default@testacid1 +POSTHOOK: Output: default@testacid1 +PREHOOK: query: create table testacid1(id int) clustered by (id) into 2 buckets stored as orc tblproperties("transactional"="true") +PREHOOK: type: CREATETABLE +PREHOOK: Output: database:default +PREHOOK: Output: default@testacid1 +POSTHOOK: query: create table testacid1(id int) clustered by (id) into 2 buckets stored as orc tblproperties("transactional"="true") +POSTHOOK: type: CREATETABLE +POSTHOOK: Output: database:default +POSTHOOK: Output: default@testacid1 +PREHOOK: query: insert into table testacid1 values (1),(2),(3),(4) +PREHOOK: type: QUERY +PREHOOK: Input: _dummy_database@_dummy_table +PREHOOK: Output: default@testacid1 +POSTHOOK: query: insert into table testacid1 values (1),(2),(3),(4) +POSTHOOK: type: QUERY +POSTHOOK: Input: _dummy_database@_dummy_table +POSTHOOK: Output: default@testacid1 +POSTHOOK: Lineage: testacid1.id SCRIPT [] +PREHOOK: query: select count(1) from testacid1 +PREHOOK: type: QUERY +PREHOOK: Input: default@testacid1 +#### A masked pattern was here #### +POSTHOOK: query: select count(1) from testacid1 +POSTHOOK: type: QUERY +POSTHOOK: Input: default@testacid1 +#### A masked pattern was here #### +4 http://git-wip-us.apache.org/repos/asf/hive/blob/9fe65dae/streaming/src/test/org/apache/hive/streaming/TestStreaming.java ---------------------------------------------------------------------- diff --git a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java index 6f63bfb..e5dd3b3 100644 --- a/streaming/src/test/org/apache/hive/streaming/TestStreaming.java +++ b/streaming/src/test/org/apache/hive/streaming/TestStreaming.java @@ -1579,6 +1579,78 @@ public class TestStreaming { } @Test + public void testFileDumpDeltaFiles() throws Exception { + String agentInfo = "UT_" + Thread.currentThread().getName(); + conf.setBoolVar(HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED, true); + try { + dropDB(msClient, dbName3); + dropDB(msClient, dbName4); + + // 1) Create two bucketed tables + String dbLocation = dbFolder.newFolder(dbName3).getCanonicalPath() + ".db"; + dbLocation = dbLocation.replaceAll("\\\\", "/"); // for windows paths + String[] colNames = "key1,key2,data".split(","); + String[] colTypes = "string,int,string".split(","); + String[] bucketNames = "key1,key2".split(","); + int bucketCount = 4; + createDbAndTable(driver, dbName3, tblName3, null, colNames, colTypes, bucketNames + , null, dbLocation, bucketCount); + + // 2) Insert data into both tables + HiveEndPoint endPt = new HiveEndPoint(metaStoreURI, dbName3, tblName3, null); + StreamingConnection connection = endPt.newConnection(false, agentInfo); + DelimitedInputWriter writer = new DelimitedInputWriter(colNames, ",", endPt, conf, connection); + + TransactionBatch txnBatch = connection.fetchTransactionBatch(2, writer); + txnBatch.beginNextTransaction(); + txnBatch.write("name0,1,streaming".getBytes()); + txnBatch.write("name2,2,streaming".getBytes()); + txnBatch.write("name4,2,unlimited".getBytes()); + txnBatch.write("name5,2,unlimited".getBytes()); + for (int i = 0; i < 6000; i++) { + if (i % 2 == 0) { + txnBatch.write(("name" + i + "," + i + "," + "streaming").getBytes()); + } else { + txnBatch.write(("name" + i + "," + i + "," + "unlimited").getBytes()); + } + } + txnBatch.commit(); + txnBatch.close(); + connection.close(); + + PrintStream origOut = System.out; + ByteArrayOutputStream myOut = new ByteArrayOutputStream(); + + // replace stderr and run command + System.setOut(new PrintStream(myOut)); + FileDump.main(new String[]{dbLocation}); + System.out.flush(); + System.setOut(origOut); + + String outDump = new String(myOut.toByteArray()); + // make sure delta files are written with no indexes, no compression and no dictionary + // no compression + Assert.assertEquals(true, outDump.contains("Compression: NONE")); + // no stats/indexes + Assert.assertEquals(true, outDump.contains("Column 0: count: 0 hasNull: false")); + Assert.assertEquals(true, outDump.contains("Column 1: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 2: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 3: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 4: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 5: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 6: count: 0 hasNull: false")); + Assert.assertEquals(true, outDump.contains("Column 7: count: 0 hasNull: false")); + Assert.assertEquals(true, outDump.contains("Column 8: count: 0 hasNull: false sum: 0")); + Assert.assertEquals(true, outDump.contains("Column 9: count: 0 hasNull: false")); + // no dictionary + Assert.assertEquals(true, outDump.contains("Encoding column 7: DIRECT_V2")); + Assert.assertEquals(true, outDump.contains("Encoding column 9: DIRECT_V2")); + } finally { + conf.unset(HiveConf.ConfVars.HIVE_ORC_DELTA_STREAMING_OPTIMIZATIONS_ENABLED.varname); + } + } + + @Test public void testFileDumpCorruptDataFiles() throws Exception { dropDB(msClient, dbName3);