This is an automated email from the ASF dual-hosted git repository. hashutosh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 8b9fadb HIVE-23169 : Probe runtime support for LLAP (Panagiotis Garefalakis via Ashutosh Chauhan) 8b9fadb is described below commit 8b9fadb5515aace73db5068cc81317b6f10e0f32 Author: Ashutosh Chauhan <hashut...@apache.org> AuthorDate: Tue Apr 21 16:54:58 2020 -0700 HIVE-23169 : Probe runtime support for LLAP (Panagiotis Garefalakis via Ashutosh Chauhan) Signed-off-by: Ashutosh Chauhan <hashut...@apache.org> --- .../hive/llap/io/api/impl/LlapRecordReader.java | 51 ++++++++++++++++++++++ .../hive/llap/io/decode/ColumnVectorProducer.java | 6 +++ .../llap/io/decode/OrcEncodedDataConsumer.java | 4 ++ 3 files changed, 61 insertions(+) diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java index acb6b2d..417a42a 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapRecordReader.java @@ -25,6 +25,8 @@ import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; +import java.util.regex.Matcher; +import java.util.regex.Pattern; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.conf.Configuration; @@ -41,6 +43,7 @@ import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.Includes; import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer.SchemaEvolutionFactory; import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; import org.apache.hadoop.hive.llap.tezplugins.LlapTezUtils; +import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidUtils; @@ -82,6 +85,7 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> private final SearchArgument sarg; private final VectorizedRowBatchCtx rbCtx; private final boolean isVectorized; + private final boolean probeDecodeEnabled; private VectorizedOrcAcidRowBatchReader acidReader; private final Object[] partitionValues; @@ -196,6 +200,12 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> this.includes = new IncludesImpl(tableIncludedCols, isAcidFormat, rbCtx, schema, job, isAcidScan && acidReader.includeAcidColumns()); + this.probeDecodeEnabled = HiveConf.getBoolVar(jobConf, ConfVars.HIVE_OPTIMIZE_SCAN_PROBEDECODE); + if (this.probeDecodeEnabled) { + includes.setProbeDecodeContext(mapWork.getProbeDecodeContext()); + LOG.info("LlapRecordReader ProbeDecode is enabled"); + } + // Create the consumer of encoded data; it will coordinate decoding to CVBs. feedback = rp = cvp.createReadPipeline(this, split, includes, sarg, counters, includes, sourceInputFormat, sourceSerDe, reporter, job, mapWork.getPathToPartitionInfo()); @@ -629,6 +639,9 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> private TypeDescription readerSchema; private JobConf jobConf; + // ProbeDecode Context for row-level filtering + private TableScanOperator.ProbeDecodeContext probeDecodeContext = null; + public IncludesImpl(List<Integer> tableIncludedCols, boolean isAcidScan, VectorizedRowBatchCtx rbCtx, TypeDescription readerSchema, JobConf jobConf, boolean includeAcidColumns) { @@ -710,6 +723,10 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> fileSchema, filePhysicalColumnIds, acidStructColumnId); } + public void setProbeDecodeContext(TableScanOperator.ProbeDecodeContext currProbeDecodeContext) { + this.probeDecodeContext = currProbeDecodeContext; + } + @Override public List<Integer> getPhysicalColumnIds() { return filePhysicalColumnIds; @@ -725,5 +742,39 @@ class LlapRecordReader implements RecordReader<NullWritable, VectorizedRowBatch> return OrcInputFormat.genIncludedTypes( fileSchema, filePhysicalColumnIds, acidStructColumnId); } + + @Override + public String getQueryId() { + return HiveConf.getVar(jobConf, HiveConf.ConfVars.HIVEQUERYID); + } + + @Override + public boolean isProbeDecodeEnabled() { + return this.probeDecodeContext != null; + } + + @Override + public byte getProbeMjSmallTablePos() { + return this.probeDecodeContext.getMjSmallTablePos(); + } + + @Override + public int getProbeColIdx() { + // TODO: is this the best way to get the ColId? + Pattern pattern = Pattern.compile("_col([0-9]+)"); + Matcher matcher = pattern.matcher(this.probeDecodeContext.getMjBigTableKeyColName()); + return matcher.find() ? Integer.parseInt(matcher.group(1)) : -1; + } + + @Override + public String getProbeColName() { + return this.probeDecodeContext.getMjBigTableKeyColName(); + } + + @Override + public String getProbeCacheKey() { + return this.probeDecodeContext.getMjSmallTableCacheKey(); + } + } } diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java index a830c07..e37379b 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/ColumnVectorProducer.java @@ -49,6 +49,12 @@ public interface ColumnVectorProducer { List<Integer> getPhysicalColumnIds(); List<Integer> getReaderLogicalColumnIds(); TypeDescription[] getBatchReaderTypes(TypeDescription fileSchema); + String getQueryId(); + boolean isProbeDecodeEnabled(); + byte getProbeMjSmallTablePos(); + String getProbeCacheKey(); + String getProbeColName(); + int getProbeColIdx(); } ReadPipeline createReadPipeline(Consumer<ColumnVectorBatch> consumer, FileSplit split, diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java index 1b41d4e..b697a0d 100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/decode/OrcEncodedDataConsumer.java @@ -84,6 +84,10 @@ public class OrcEncodedDataConsumer this.includes = includes; // TODO: get rid of this this.skipCorrupt = skipCorrupt; + if (includes.isProbeDecodeEnabled()) { + LlapIoImpl.LOG.info("OrcEncodedDataConsumer probeDecode is enabled with cacheKey {} colIndex {} and colName {}", + this.includes.getProbeCacheKey(), this.includes.getProbeColIdx(), this.includes.getProbeColName()); + } } public void setUseDecimal64ColumnVectors(final boolean useDecimal64ColumnVectors) {