http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java ---------------------------------------------------------------------- diff --cc llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java index 152230c,0000000..333772c mode 100644,000000..100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java @@@ -1,323 -1,0 +1,322 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hive.llap.io.api.impl; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; + +import org.apache.hadoop.hive.llap.ConsumerFeedback; +import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter; +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer; +import org.apache.hadoop.hive.llap.io.decode.ReadPipeline; +import org.apache.hadoop.hive.ql.exec.Utilities; - import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.llap.Consumer; ++import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; - import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; +import org.apache.hive.common.util.HiveStringUtils; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; + +public class LlapInputFormat + implements InputFormat<NullWritable, VectorizedRowBatch>, VectorizedInputFormatInterface { + @SuppressWarnings("rawtypes") + private final InputFormat sourceInputFormat; + private final ColumnVectorProducer cvp; + private final ListeningExecutorService executor; + private final String hostName; + + @SuppressWarnings("rawtypes") + LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp, + ListeningExecutorService executor) { + // TODO: right now, we do nothing with source input format, ORC-only in the first cut. + // We'd need to plumb it thru and use it to get data to cache/etc. + assert sourceInputFormat instanceof OrcInputFormat; + this.executor = executor; + this.cvp = cvp; + this.sourceInputFormat = sourceInputFormat; + this.hostName = HiveStringUtils.getHostname(); + } + + @Override + public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader( + InputSplit split, JobConf job, Reporter reporter) throws IOException { + boolean isVectorMode = Utilities.isVectorMode(job); + if (!isVectorMode) { + LlapIoImpl.LOG.error("No llap in non-vectorized mode"); + throw new UnsupportedOperationException("No llap in non-vectorized mode"); + } + FileSplit fileSplit = (FileSplit)split; + reporter.setStatus(fileSplit.toString()); + try { + List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job) + ? null : ColumnProjectionUtils.getReadColumnIDs(job); + return new LlapRecordReader(job, fileSplit, includedCols, hostName); + } catch (Exception ex) { + throw new IOException(ex); + } + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + return sourceInputFormat.getSplits(job, numSplits); + } + + private class LlapRecordReader + implements RecordReader<NullWritable, VectorizedRowBatch>, Consumer<ColumnVectorBatch> { + private final InputSplit split; + private final List<Integer> columnIds; + private final SearchArgument sarg; + private final String[] columnNames; + private final VectorizedRowBatchCtx rbCtx; + + private final LinkedList<ColumnVectorBatch> pendingData = new LinkedList<ColumnVectorBatch>(); + private ColumnVectorBatch lastCvb = null; + private boolean isFirst = true; + + private Throwable pendingError = null; + /** Vector that is currently being processed by our user. */ + private boolean isDone = false, isClosed = false; + private ConsumerFeedback<ColumnVectorBatch> feedback; + private final QueryFragmentCounters counters; + private long firstReturnTime; + + public LlapRecordReader( + JobConf job, FileSplit split, List<Integer> includedCols, String hostName) { + this.split = split; + this.columnIds = includedCols; - this.sarg = SearchArgumentFactory.createFromConf(job); ++ this.sarg = ConvertAstToSearchArg.createFromConf(job); + this.columnNames = ColumnProjectionUtils.getReadColumnNames(job); + this.counters = new QueryFragmentCounters(job); + this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName); + try { + rbCtx = new VectorizedRowBatchCtx(); + rbCtx.init(job, split); + } catch (Exception e) { + throw new RuntimeException(e); + } + startRead(); + } + + @Override + public boolean next(NullWritable key, VectorizedRowBatch value) throws IOException { + assert value != null; + if (isClosed) { + throw new AssertionError("next called after close"); + } + // Add partition cols if necessary (see VectorizedOrcInputFormat for details). + boolean wasFirst = isFirst; + if (isFirst) { + try { + rbCtx.addPartitionColsToBatch(value); + } catch (HiveException e) { + throw new IOException(e); + } + isFirst = false; + } + ColumnVectorBatch cvb = null; + try { + cvb = nextCvb(); + } catch (InterruptedException e) { + // Query might have been canceled. Stop the background processing. + feedback.stop(); + throw new IOException(e); + } + if (cvb == null) { + if (wasFirst) { + firstReturnTime = counters.startTimeCounter(); + } + counters.incrTimeCounter(Counter.CONSUMER_TIME_US, firstReturnTime); + return false; + } + if (columnIds.size() != cvb.cols.length) { + throw new RuntimeException("Unexpected number of columns, VRB has " + columnIds.size() + + " included, but the reader returned " + cvb.cols.length); + } + // VRB was created from VrbCtx, so we already have pre-allocated column vectors + for (int i = 0; i < cvb.cols.length; ++i) { + // Return old CVs (if any) to caller. We assume these things all have the same schema. + cvb.swapColumnVector(i, value.cols, columnIds.get(i)); + } + value.selectedInUse = false; + value.size = cvb.size; + if (wasFirst) { + firstReturnTime = counters.startTimeCounter(); + } + return true; + } + + + private final class UncaughtErrorHandler implements FutureCallback<Void> { + @Override + public void onSuccess(Void result) { + // Successful execution of reader is supposed to call setDone. + } + + @Override + public void onFailure(Throwable t) { + // Reader is not supposed to throw AFTER calling setError. + LlapIoImpl.LOG.error("Unhandled error from reader thread " + t.getMessage()); + setError(t); + } + } + + private void startRead() { + // Create the consumer of encoded data; it will coordinate decoding to CVBs. + ReadPipeline rp = cvp.createReadPipeline( + this, split, columnIds, sarg, columnNames, counters); + feedback = rp; + ListenableFuture<Void> future = executor.submit(rp.getReadCallable()); + // TODO: we should NOT do this thing with handler. Reader needs to do cleanup in most cases. + Futures.addCallback(future, new UncaughtErrorHandler()); + } + + ColumnVectorBatch nextCvb() throws InterruptedException, IOException { + boolean isFirst = (lastCvb == null); + if (!isFirst) { + feedback.returnData(lastCvb); + } + synchronized (pendingData) { + // We are waiting for next block. Either we will get it, or be told we are done. + boolean doLogBlocking = DebugUtils.isTraceMttEnabled() && isNothingToReport(); + if (doLogBlocking) { + LlapIoImpl.LOG.info("next will block"); + } + while (isNothingToReport()) { + pendingData.wait(100); + } + if (doLogBlocking) { + LlapIoImpl.LOG.info("next is unblocked"); + } + rethrowErrorIfAny(); + lastCvb = pendingData.poll(); + } + if (DebugUtils.isTraceMttEnabled() && lastCvb != null) { + LlapIoImpl.LOG.info("Processing will receive vector " + lastCvb); + } + return lastCvb; + } + + private boolean isNothingToReport() { + return !isDone && pendingData.isEmpty() && pendingError == null; + } + + @Override + public NullWritable createKey() { + return NullWritable.get(); + } + + @Override + public VectorizedRowBatch createValue() { + try { + return rbCtx.createVectorizedRowBatch(); + } catch (HiveException e) { + throw new RuntimeException("Error creating a batch", e); + } + } + + @Override + public long getPos() throws IOException { + return -1; // Position doesn't make sense for async reader, chunk order is arbitrary. + } + + @Override + public void close() throws IOException { + if (DebugUtils.isTraceMttEnabled()) { + LlapIoImpl.LOG.info("close called; closed " + isClosed + ", done " + isDone + + ", err " + pendingError + ", pending " + pendingData.size()); + } + LlapIoImpl.LOG.info(counters); // This is where counters are logged! + feedback.stop(); + rethrowErrorIfAny(); + } + + private void rethrowErrorIfAny() throws IOException { + if (pendingError == null) return; + if (pendingError instanceof IOException) { + throw (IOException)pendingError; + } + throw new IOException(pendingError); + } + + @Override + public void setDone() { + if (DebugUtils.isTraceMttEnabled()) { + LlapIoImpl.LOG.info("setDone called; closed " + isClosed + + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); + } + synchronized (pendingData) { + isDone = true; + pendingData.notifyAll(); + } + } + + @Override + public void consumeData(ColumnVectorBatch data) { + if (DebugUtils.isTraceMttEnabled()) { + LlapIoImpl.LOG.info("consume called; closed " + isClosed + ", done " + isDone + + ", err " + pendingError + ", pending " + pendingData.size()); + } + synchronized (pendingData) { + if (isClosed) { + return; + } + pendingData.add(data); + pendingData.notifyAll(); + } + } + + @Override + public void setError(Throwable t) { + counters.incrCounter(QueryFragmentCounters.Counter.NUM_ERRORS); + LlapIoImpl.LOG.info("setError called; closed " + isClosed + + ", done " + isDone + ", err " + pendingError + ", pending " + pendingData.size()); + assert t != null; + synchronized (pendingData) { + pendingError = t; + pendingData.notifyAll(); + } + } + + @Override + public float getProgress() throws IOException { + // TODO: plumb progress info thru the reader if we can get metadata from loader first. + return 0.0f; + } + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java ---------------------------------------------------------------------- diff --cc llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java index f88a943,0000000..f310fd5 mode 100644,000000..100644 --- a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java +++ b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java @@@ -1,906 -1,0 +1,906 @@@ +package org.apache.hadoop.hive.llap.io.encoded; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.CallableWithNdc; +import org.apache.hadoop.hive.common.DiskRange; +import org.apache.hadoop.hive.common.DiskRangeList; +import org.apache.hadoop.hive.common.io.storage_api.Allocator; +import org.apache.hadoop.hive.common.io.storage_api.DataCache; +import org.apache.hadoop.hive.common.io.storage_api.DataReader; +import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer; +import org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.llap.ConsumerFeedback; +import org.apache.hadoop.hive.llap.DebugUtils; +import org.apache.hadoop.hive.llap.cache.Cache; +import org.apache.hadoop.hive.llap.cache.LowLevelCache; +import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters; +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter; +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl; +import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer; +import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata; +import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache; +import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata; +import org.apache.hadoop.hive.ql.exec.DDLTask; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.HdfsUtils; +import org.apache.hadoop.hive.ql.io.orc.CompressionKind; +import org.apache.hadoop.hive.ql.io.orc.EncodedReader; +import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl; +import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch; +import org.apache.hadoop.hive.ql.io.orc.MetadataReader; +import org.apache.hadoop.hive.ql.io.orc.OrcFile; +import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions; ++import org.apache.hadoop.hive.ql.io.orc.OrcConf; +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; +import org.apache.hadoop.hive.ql.io.orc.OrcProto; +import org.apache.hadoop.hive.ql.io.orc.OrcSplit; +import org.apache.hadoop.hive.ql.io.orc.Reader; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier; +import org.apache.hadoop.hive.ql.io.orc.llap.Consumer; +import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey; +import org.apache.hadoop.hive.ql.io.orc.llap.OrcCacheKey; +import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils; +import org.apache.hadoop.hive.ql.io.orc.StripeInformation; +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; +import org.apache.hadoop.hive.shims.HadoopShims; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; + +/** + * This produces EncodedColumnBatch via ORC EncodedDataImpl. + * It serves as Consumer for EncodedColumnBatch too, for the high-level cache scenario where + * it inserts itself into the pipeline to put the data in cache, before passing it to the real + * consumer. It also serves as ConsumerFeedback that receives processed EncodedColumnBatch-es. + */ +public class OrcEncodedDataReader extends CallableWithNdc<Void> + implements ConsumerFeedback<OrcEncodedColumnBatch>, Consumer<OrcEncodedColumnBatch> { + private static final Log LOG = LogFactory.getLog(OrcEncodedDataReader.class); + + private final OrcMetadataCache metadataCache; + private final LowLevelCache lowLevelCache; + private final Configuration conf; + private final Cache<OrcCacheKey> cache; + private final FileSplit split; + private List<Integer> columnIds; + private final SearchArgument sarg; + private final String[] columnNames; + private final OrcEncodedDataConsumer consumer; + private final QueryFragmentCounters counters; + + // Read state. + private int stripeIxFrom; + private OrcFileMetadata fileMetadata; + private Reader orcReader; + private MetadataReader metadataReader; + private EncodedReader stripeReader; + private long fileId; + private FileSystem fs; + /** + * readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of rg-s that need to be + * read. Contains only stripes that are read, and only columns included. null => read all RGs. + */ + private boolean[][][] readState; + private volatile boolean isStopped = false; + @SuppressWarnings("unused") + private volatile boolean isPaused = false; + + public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache<OrcCacheKey> cache, + OrcMetadataCache metadataCache, Configuration conf, InputSplit split, + List<Integer> columnIds, SearchArgument sarg, String[] columnNames, + OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) { + this.lowLevelCache = lowLevelCache; + this.metadataCache = metadataCache; + this.cache = cache; + this.conf = conf; + this.split = (FileSplit)split; + this.columnIds = columnIds; + if (this.columnIds != null) { + Collections.sort(this.columnIds); + } + this.sarg = sarg; + this.columnNames = columnNames; + this.consumer = consumer; + this.counters = counters; + } + + @Override + public void stop() { + if (LOG.isInfoEnabled()) { + LOG.info("Encoded reader is being stopped"); + } + isStopped = true; + } + + @Override + public void pause() { + isPaused = true; + // TODO: pause fetching + } + + @Override + public void unpause() { + isPaused = false; + // TODO: unpause fetching + } + + @Override + protected Void callInternal() throws IOException { + long startTime = counters.startTimeCounter(); + if (LlapIoImpl.LOGL.isInfoEnabled()) { + LlapIoImpl.LOG.info("Processing data for " + split.getPath()); + } + if (processStop()) { + recordReaderTime(startTime); + return null; + } + counters.setDesc(QueryFragmentCounters.Desc.TABLE, getDbAndTableName(split.getPath())); + orcReader = null; + // 1. Get file metadata from cache, or create the reader and read it. + // Don't cache the filesystem object for now; Tez closes it and FS cache will fix all that + fs = split.getPath().getFileSystem(conf); + fileId = determineFileId(fs, split); + counters.setDesc(QueryFragmentCounters.Desc.FILE, fileId); + + try { + fileMetadata = getOrReadFileMetadata(); + consumer.setFileMetadata(fileMetadata); + validateFileMetadata(); + if (columnIds == null) { + columnIds = createColumnIds(fileMetadata); + } + + // 2. Determine which stripes to read based on the split. + determineStripesToRead(); + } catch (Throwable t) { + recordReaderTime(startTime); + consumer.setError(t); + return null; + } + + if (readState.length == 0) { + consumer.setDone(); + recordReaderTime(startTime); + return null; // No data to read. + } + counters.setDesc(QueryFragmentCounters.Desc.STRIPES, stripeIxFrom + "," + readState.length); + + // 3. Apply SARG if needed, and otherwise determine what RGs to read. + int stride = fileMetadata.getRowIndexStride(); + ArrayList<OrcStripeMetadata> stripeMetadatas = null; + boolean[] globalIncludes = null; + boolean[] sargColumns = null; + try { + globalIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), columnIds, true); + if (sarg != null && stride != 0) { + // TODO: move this to a common method + int[] filterColumns = RecordReaderImpl.mapSargColumns(sarg.getLeaves(), columnNames, 0); + // included will not be null, row options will fill the array with trues if null + sargColumns = new boolean[globalIncludes.length]; + for (int i : filterColumns) { + // filter columns may have -1 as index which could be partition column in SARG. + if (i > 0) { + sargColumns[i] = true; + } + } + + // If SARG is present, get relevant stripe metadata from cache or readers. + stripeMetadatas = readStripesMetadata(globalIncludes, sargColumns); + } + + // Now, apply SARG if any; w/o sarg, this will just initialize readState. + boolean hasData = determineRgsToRead(globalIncludes, stride, stripeMetadatas); + if (!hasData) { + consumer.setDone(); + recordReaderTime(startTime); + return null; // No data to read. + } + } catch (Throwable t) { + cleanupReaders(); + consumer.setError(t); + recordReaderTime(startTime); + return null; + } + + if (processStop()) { + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + + // 4. Get data from high-level cache. + // If some cols are fully in cache, this will also give us the modified list of columns to + // read for every stripe (null means read all of them - the usual path). In any case, + // readState will be modified for column x rgs that were fetched from high-level cache. + List<Integer>[] stripeColsToRead = null; + if (cache != null) { + try { + stripeColsToRead = produceDataFromCache(stride); + } catch (Throwable t) { + // produceDataFromCache handles its own cleanup. + consumer.setError(t); + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + } + + // 5. Create encoded data reader. + // In case if we have high-level cache, we will intercept the data and add it there; + // otherwise just pass the data directly to the consumer. + Consumer<OrcEncodedColumnBatch> dataConsumer = (cache == null) ? this.consumer : this; + try { + ensureOrcReader(); + // Reader creating updates HDFS counters, don't do it here. + DataWrapperForOrc dw = new DataWrapperForOrc(); + stripeReader = orcReader.encodedReader(fileId, dw, dw); + stripeReader.setDebugTracing(DebugUtils.isTraceOrcEnabled()); + } catch (Throwable t) { + consumer.setError(t); + recordReaderTime(startTime); + cleanupReaders(); + return null; + } + + // 6. Read data. + // TODO: I/O threadpool could be here - one thread per stripe; for now, linear. + OrcBatchKey stripeKey = new OrcBatchKey(fileId, -1, 0); + for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + if (processStop()) { + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + int stripeIx = stripeIxFrom + stripeIxMod; + boolean[][] colRgs = null; + boolean[] stripeIncludes = null; + OrcStripeMetadata stripeMetadata = null; + StripeInformation stripe; + try { + List<Integer> cols = stripeColsToRead == null ? null : stripeColsToRead[stripeIxMod]; + if (cols != null && cols.isEmpty()) continue; // No need to read this stripe. + stripe = fileMetadata.getStripes().get(stripeIx); + + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Reading stripe " + stripeIx + ": " + + stripe.getOffset() + ", " + stripe.getLength()); + } + colRgs = readState[stripeIxMod]; + // We assume that NO_RGS value is only set from SARG filter and for all columns; + // intermediate changes for individual columns will unset values in the array. + // Skip this case for 0-column read. We could probably special-case it just like we do + // in EncodedReaderImpl, but for now it's not that important. + if (colRgs.length > 0 && colRgs[0] == SargApplier.READ_NO_RGS) continue; + + // 6.1. Determine the columns to read (usually the same as requested). + if (cache == null || cols == null || cols.size() == colRgs.length) { + cols = columnIds; + stripeIncludes = globalIncludes; + } else { + // We are reading subset of the original columns, remove unnecessary bitmasks/etc. + // This will never happen w/o high-level cache. + stripeIncludes = OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), cols, true); + colRgs = genStripeColRgs(cols, colRgs); + } + + // 6.2. Ensure we have stripe metadata. We might have read it before for RG filtering. + boolean isFoundInCache = false; + if (stripeMetadatas != null) { + stripeMetadata = stripeMetadatas.get(stripeIxMod); + } else { + stripeKey.stripeIx = stripeIx; + stripeMetadata = metadataCache.getStripeMetadata(stripeKey); + isFoundInCache = (stripeMetadata != null); + if (!isFoundInCache) { + counters.incrCounter(Counter.METADATA_CACHE_MISS); + ensureMetadataReader(); + long startTimeHdfs = counters.startTimeCounter(); + stripeMetadata = new OrcStripeMetadata( + stripeKey, metadataReader, stripe, stripeIncludes, sargColumns); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs); + stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata); + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx + + " metadata with includes: " + DebugUtils.toString(stripeIncludes)); + } + stripeKey = new OrcBatchKey(fileId, -1, 0); + } + consumer.setStripeMetadata(stripeMetadata); + } + if (!stripeMetadata.hasAllIndexes(stripeIncludes)) { + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx + + " metadata for includes: " + DebugUtils.toString(stripeIncludes)); + } + assert isFoundInCache; + counters.incrCounter(Counter.METADATA_CACHE_MISS); + ensureMetadataReader(); + updateLoadedIndexes(stripeMetadata, stripe, stripeIncludes, sargColumns); + } else if (isFoundInCache) { + counters.incrCounter(Counter.METADATA_CACHE_HIT); + } + } catch (Throwable t) { + consumer.setError(t); + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + if (processStop()) { + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + + // 6.3. Finally, hand off to the stripe reader to produce the data. + // This is a sync call that will feed data to the consumer. + try { + // TODO: readEncodedColumns is not supposed to throw; errors should be propagated thru + // consumer. It is potentially holding locked buffers, and must perform its own cleanup. + // Also, currently readEncodedColumns is not stoppable. The consumer will discard the + // data it receives for one stripe. We could probably interrupt it, if it checked that. + stripeReader.readEncodedColumns(stripeIx, stripe, stripeMetadata.getRowIndexes(), + stripeMetadata.getEncodings(), stripeMetadata.getStreams(), stripeIncludes, + colRgs, dataConsumer); + } catch (Throwable t) { + consumer.setError(t); + cleanupReaders(); + recordReaderTime(startTime); + return null; + } + } + + // Done with all the things. + recordReaderTime(startTime); + dataConsumer.setDone(); + if (DebugUtils.isTraceMttEnabled()) { + LlapIoImpl.LOG.info("done processing " + split); + } + + // Close the stripe reader, we are done reading. + cleanupReaders(); + return null; + } + + private void recordReaderTime(long startTime) { + counters.incrTimeCounter(Counter.TOTAL_IO_TIME_US, startTime); + } + + private static String getDbAndTableName(Path path) { + // Ideally, we'd get this from split; however, split doesn't contain any such thing and it's + // actually pretty hard to get cause even split generator only uses paths. We only need this + // for metrics; therefore, brace for BLACK MAGIC! + String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR); + int dbIx = -1; + // Try to find the default db postfix; don't check two last components - at least there + // should be a table and file (we could also try to throw away partition/bucket/acid stuff). + for (int i = 0; i < parts.length - 2; ++i) { + if (!parts[i].endsWith(DDLTask.DATABASE_PATH_SUFFIX)) continue; + if (dbIx >= 0) { + dbIx = -1; // Let's not guess. + break; + } + dbIx = i; + } + if (dbIx >= 0) { + return parts[dbIx].substring(0, parts[dbIx].length() - 3) + "." + parts[dbIx + 1]; + } + + // Just go from the back and throw away everything we think is wrong; skip last item, the file. + boolean isInPartFields = false; + for (int i = parts.length - 2; i >= 0; --i) { + String p = parts[i]; + boolean isPartField = p.contains("="); + if ((isInPartFields && !isPartField) || (!isPartField && !p.startsWith(AcidUtils.BASE_PREFIX) + && !p.startsWith(AcidUtils.DELTA_PREFIX) && !p.startsWith(AcidUtils.BUCKET_PREFIX))) { + dbIx = i - 1; + break; + } + isInPartFields = isPartField; + } + // If we found something before we ran out of components, use it. + if (dbIx >= 0) { + String dbName = parts[dbIx]; + if (dbName.endsWith(DDLTask.DATABASE_PATH_SUFFIX)) { + dbName = dbName.substring(0, dbName.length() - 3); + } + return dbName + "." + parts[dbIx + 1]; + } + return "unknown"; + } + + private void validateFileMetadata() throws IOException { + if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return; + int bufferSize = fileMetadata.getCompressionBufferSize(); + int minAllocSize = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC); + if (bufferSize < minAllocSize) { + LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller than LLAP low-level " + + "cache minimum allocation size (" + minAllocSize + "). Decrease the value for " + + HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.toString() + " to avoid wasting memory"); + } + } + + private boolean processStop() { + if (!isStopped) return false; + LOG.info("Encoded data reader is stopping"); + cleanupReaders(); + return true; + } + + private static long determineFileId(FileSystem fs, FileSplit split) throws IOException { + if (split instanceof OrcSplit) { + Long fileId = ((OrcSplit)split).getFileId(); + if (fileId != null) { + return fileId; + } + } + LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") does not have file ID"); + return HdfsUtils.getFileId(fs, split.getPath()); + } + + private boolean[][] genStripeColRgs(List<Integer> stripeCols, boolean[][] globalColRgs) { + boolean[][] stripeColRgs = new boolean[stripeCols.size()][]; + for (int i = 0, i2 = -1; i < globalColRgs.length; ++i) { + if (globalColRgs[i] == null) continue; + stripeColRgs[i2] = globalColRgs[i]; + ++i2; + } + return stripeColRgs; + } + + /** + * Puts all column indexes from metadata to make a column list to read all column. + */ + private static List<Integer> createColumnIds(OrcFileMetadata metadata) { + List<Integer> columnIds = new ArrayList<Integer>(metadata.getTypes().size()); + for (int i = 1; i < metadata.getTypes().size(); ++i) { + columnIds.add(i); + } + return columnIds; + } + + /** + * In case if stripe metadata in cache does not have all indexes for current query, load + * the missing one. This is a temporary cludge until real metadata cache becomes available. + */ + private void updateLoadedIndexes(OrcStripeMetadata stripeMetadata, + StripeInformation stripe, boolean[] stripeIncludes, boolean[] sargColumns) throws IOException { + // We only synchronize on write for now - design of metadata cache is very temporary; + // we pre-allocate the array and never remove entries; so readers should be safe. + synchronized (stripeMetadata) { + if (stripeMetadata.hasAllIndexes(stripeIncludes)) return; + long startTime = counters.startTimeCounter(); + stripeMetadata.loadMissingIndexes(metadataReader, stripe, stripeIncludes, sargColumns); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + } + } + + /** + * Closes the stripe readers (on error). + */ + private void cleanupReaders() { + if (metadataReader != null) { + try { + metadataReader.close(); + } catch (IOException ex) { + // Ignore. + } + } + if (stripeReader != null) { + try { + stripeReader.close(); + } catch (IOException ex) { + // Ignore. + } + } + } + + /** + * Ensures orcReader is initialized for the split. + */ + private void ensureOrcReader() throws IOException { + if (orcReader != null) return; + Path path = HdfsUtils.getFileIdPath(fs, split.getPath(), fileId); + if (DebugUtils.isTraceOrcEnabled()) { + LOG.info("Creating reader for " + path + " (" + split.getPath() + ")"); + } + long startTime = counters.startTimeCounter(); + ReaderOptions opts = OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata); + orcReader = OrcFile.createReader(path, opts); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + } + + /** + * Gets file metadata for the split from cache, or reads it from the file. + */ + private OrcFileMetadata getOrReadFileMetadata() throws IOException { + OrcFileMetadata metadata = metadataCache.getFileMetadata(fileId); + if (metadata != null) { + counters.incrCounter(Counter.METADATA_CACHE_HIT); + return metadata; + } + counters.incrCounter(Counter.METADATA_CACHE_MISS); + ensureOrcReader(); + // We assume this call doesn't touch HDFS because everything is already read; don't add time. + metadata = new OrcFileMetadata(fileId, orcReader); + return metadataCache.putFileMetadata(metadata); + } + + /** + * Reads the metadata for all stripes in the file. + */ + private ArrayList<OrcStripeMetadata> readStripesMetadata( + boolean[] globalInc, boolean[] sargColumns) throws IOException { + ArrayList<OrcStripeMetadata> result = new ArrayList<OrcStripeMetadata>(readState.length); + OrcBatchKey stripeKey = new OrcBatchKey(fileId, 0, 0); + for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + stripeKey.stripeIx = stripeIxMod + stripeIxFrom; + OrcStripeMetadata value = metadataCache.getStripeMetadata(stripeKey); + if (value == null || !value.hasAllIndexes(globalInc)) { + counters.incrCounter(Counter.METADATA_CACHE_MISS); + ensureMetadataReader(); + StripeInformation si = fileMetadata.getStripes().get(stripeKey.stripeIx); + if (value == null) { + long startTime = counters.startTimeCounter(); + value = new OrcStripeMetadata(stripeKey, metadataReader, si, globalInc, sargColumns); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + value = metadataCache.putStripeMetadata(value); + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx + + " metadata with includes: " + DebugUtils.toString(globalInc)); + } + // Create new key object to reuse for gets; we've used the old one to put in cache. + stripeKey = new OrcBatchKey(fileId, 0, 0); + } + // We might have got an old value from cache; recheck it has indexes. + if (!value.hasAllIndexes(globalInc)) { + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Updating indexes in stripe " + stripeKey.stripeIx + + " metadata for includes: " + DebugUtils.toString(globalInc)); + } + updateLoadedIndexes(value, si, globalInc, sargColumns); + } + } else { + counters.incrCounter(Counter.METADATA_CACHE_HIT); + } + result.add(value); + consumer.setStripeMetadata(value); + } + return result; + } + + private void ensureMetadataReader() throws IOException { + ensureOrcReader(); + if (metadataReader != null) return; + long startTime = counters.startTimeCounter(); + metadataReader = orcReader.metadata(); + counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime); + } + + @Override + public void returnData(OrcEncodedColumnBatch ecb) { + for (ColumnStreamData[] datas : ecb.getColumnData()) { + for (ColumnStreamData data : datas) { + if (data.decRef() != 0) continue; + if (DebugUtils.isTraceLockingEnabled()) { + for (MemoryBuffer buf : data.getCacheBuffers()) { + LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of processing"); + } + } + lowLevelCache.releaseBuffers(data.getCacheBuffers()); + EncodedReaderImpl.SB_POOL.offer(data); + } + } + // We can offer ECB even with some streams not discarded; reset() will clear the arrays. + EncodedReaderImpl.ECB_POOL.offer(ecb); + } + + /** + * Determines which RGs need to be read, after stripes have been determined. + * SARG is applied, and readState is populated for each stripe accordingly. + * @param stripes All stripes in the file (field state is used to determine stripes to read). + */ + private boolean determineRgsToRead(boolean[] globalIncludes, int rowIndexStride, + ArrayList<OrcStripeMetadata> metadata) throws IOException { + SargApplier sargApp = null; + if (sarg != null && rowIndexStride != 0) { + List<OrcProto.Type> types = fileMetadata.getTypes(); + String[] colNamesForSarg = OrcInputFormat.getSargColumnNames( + columnNames, types, globalIncludes, fileMetadata.isOriginalFormat()); + sargApp = new SargApplier(sarg, colNamesForSarg, rowIndexStride, types, globalIncludes.length); + } + boolean hasAnyData = false; + // readState should have been initialized by this time with an empty array. + for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + int stripeIx = stripeIxMod + stripeIxFrom; + StripeInformation stripe = fileMetadata.getStripes().get(stripeIx); + int rgCount = getRgCount(stripe, rowIndexStride); + boolean[] rgsToRead = null; + if (sargApp != null) { + OrcStripeMetadata stripeMetadata = metadata.get(stripeIxMod); + rgsToRead = sargApp.pickRowGroups(stripe, stripeMetadata.getRowIndexes(), + stripeMetadata.getBloomFilterIndexes(), true); + } + boolean isNone = rgsToRead == SargApplier.READ_NO_RGS, + isAll = rgsToRead == SargApplier.READ_ALL_RGS; + hasAnyData = hasAnyData || !isNone; + if (DebugUtils.isTraceOrcEnabled()) { + if (isNone) { + LlapIoImpl.LOG.info("SARG eliminated all RGs for stripe " + stripeIx); + } else if (!isAll) { + LlapIoImpl.LOG.info("SARG picked RGs for stripe " + stripeIx + ": " + + DebugUtils.toString(rgsToRead)); + } else { + LlapIoImpl.LOG.info("Will read all " + rgCount + " RGs for stripe " + stripeIx); + } + } + assert isAll || isNone || rgsToRead.length == rgCount; + readState[stripeIxMod] = new boolean[columnIds.size()][]; + for (int j = 0; j < columnIds.size(); ++j) { + readState[stripeIxMod][j] = (isAll || isNone) ? rgsToRead : + Arrays.copyOf(rgsToRead, rgsToRead.length); + } + + adjustRgMetric(rgCount, rgsToRead, isNone, isAll); + } + return hasAnyData; + } + + private void adjustRgMetric(int rgCount, boolean[] rgsToRead, boolean isNone, + boolean isAll) { + int count = 0; + if (!isAll) { + for (boolean b : rgsToRead) { + if (b) + count++; + } + } else if (!isNone) { + count = rgCount; + } + counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, count); + } + + + private int getRgCount(StripeInformation stripe, int rowIndexStride) { + return (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride); + } + + /** + * Determine which stripes to read for a split. Populates stripeIxFrom and readState. + */ + public void determineStripesToRead() { + // The unit of caching for ORC is (rg x column) (see OrcBatchKey). + List<StripeInformation> stripes = fileMetadata.getStripes(); + long offset = split.getStart(), maxOffset = offset + split.getLength(); + stripeIxFrom = -1; + int stripeIxTo = -1; + if (LlapIoImpl.LOGL.isDebugEnabled()) { + String tmp = "FileSplit {" + split.getStart() + ", " + split.getLength() + "}; stripes "; + for (StripeInformation stripe : stripes) { + tmp += "{" + stripe.getOffset() + ", " + stripe.getLength() + "}, "; + } + LlapIoImpl.LOG.debug(tmp); + } + + int stripeIx = 0; + for (StripeInformation stripe : stripes) { + long stripeStart = stripe.getOffset(); + if (offset > stripeStart) { + // We assume splits will never start in the middle of the stripe. + ++stripeIx; + continue; + } + if (stripeIxFrom == -1) { + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Including stripes from " + stripeIx + + " (" + stripeStart + " >= " + offset + ")"); + } + stripeIxFrom = stripeIx; + } + if (stripeStart >= maxOffset) { + stripeIxTo = stripeIx; + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Including stripes until " + stripeIxTo + " (" + stripeStart + + " >= " + maxOffset + "); " + (stripeIxTo - stripeIxFrom) + " stripes"); + } + break; + } + ++stripeIx; + } + if (stripeIxTo == -1) { + stripeIxTo = stripeIx; + if (DebugUtils.isTraceOrcEnabled()) { + LlapIoImpl.LOG.info("Including stripes until " + stripeIx + " (end of file); " + + (stripeIxTo - stripeIxFrom) + " stripes"); + } + } + readState = new boolean[stripeIxTo - stripeIxFrom][][]; + } + + // TODO: split by stripe? we do everything by stripe, and it might be faster + /** + * Takes the data from high-level cache for all stripes and returns to consumer. + * @return List of columns to read per stripe, if any columns were fully eliminated by cache. + */ + private List<Integer>[] produceDataFromCache(int rowIndexStride) throws IOException { + OrcCacheKey key = new OrcCacheKey(fileId, -1, -1, -1); + // For each stripe, keep a list of columns that are not fully in cache (null => all of them). + @SuppressWarnings("unchecked") + List<Integer>[] stripeColsNotInCache = new List[readState.length]; + for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) { + key.stripeIx = stripeIxFrom + stripeIxMod; + boolean[][] cols = readState[stripeIxMod]; + boolean[] isMissingAnyRgs = new boolean[cols.length]; + int totalRgCount = getRgCount(fileMetadata.getStripes().get(key.stripeIx), rowIndexStride); + for (int rgIx = 0; rgIx < totalRgCount; ++rgIx) { + OrcEncodedColumnBatch col = EncodedReaderImpl.ECB_POOL.take(); + col.init(fileId, key.stripeIx, rgIx, cols.length); + boolean hasAnyCached = false; + try { + key.rgIx = rgIx; + for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) { + boolean[] readMask = cols[colIxMod]; + // Check if RG is eliminated by SARG + if ((readMask == SargApplier.READ_NO_RGS) || (readMask != SargApplier.READ_ALL_RGS + && (readMask.length <= rgIx || !readMask[rgIx]))) continue; + key.colIx = columnIds.get(colIxMod); + ColumnStreamData[] cached = cache.get(key); + if (cached == null) { + isMissingAnyRgs[colIxMod] = true; + continue; + } + col.setAllStreamsData(colIxMod, key.colIx, cached); + hasAnyCached = true; + if (readMask == SargApplier.READ_ALL_RGS) { + // We were going to read all RGs, but some were in cache, allocate the mask. + cols[colIxMod] = readMask = new boolean[totalRgCount]; + Arrays.fill(readMask, true); + } + readMask[rgIx] = false; // Got from cache, don't read from disk. + } + } catch (Throwable t) { + // TODO: Any cleanup needed to release data in col back to cache should be here. + throw (t instanceof IOException) ? (IOException)t : new IOException(t); + } + if (hasAnyCached) { + consumer.consumeData(col); + } + } + boolean makeStripeColList = false; // By default assume we'll fetch all original columns. + for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) { + if (isMissingAnyRgs[colIxMod]) { + if (makeStripeColList) { + stripeColsNotInCache[stripeIxMod].add(columnIds.get(colIxMod)); + } + } else if (!makeStripeColList) { + // Some columns were fully in cache. Make a per-stripe col list, add previous columns. + makeStripeColList = true; + stripeColsNotInCache[stripeIxMod] = new ArrayList<Integer>(cols.length - 1); + for (int i = 0; i < colIxMod; ++i) { + stripeColsNotInCache[stripeIxMod].add(columnIds.get(i)); + } + } + } + } + return stripeColsNotInCache; + } + + @Override + public void setDone() { + consumer.setDone(); + } + + @Override + public void consumeData(OrcEncodedColumnBatch data) { + // Store object in cache; create new key object - cannot be reused. + assert cache != null; + for (int i = 0; i < data.getColumnData().length; ++i) { + OrcCacheKey key = new OrcCacheKey(data.getBatchKey(), data.getColumnIxs()[i]); + ColumnStreamData[] toCache = data.getColumnData()[i]; + ColumnStreamData[] cached = cache.cacheOrGet(key, toCache); + if (toCache != cached) { + for (ColumnStreamData sb : toCache) { + if (sb.decRef() != 0) continue; + lowLevelCache.releaseBuffers(sb.getCacheBuffers()); + } + data.getColumnData()[i] = cached; + } + } + consumer.consumeData(data); + } + + @Override + public void setError(Throwable t) { + consumer.setError(t); + } + + private class DataWrapperForOrc implements DataReader, DataCache { + private DataReader orcDataReader; + + public DataWrapperForOrc() { - boolean useZeroCopy = (conf != null) - && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_ZEROCOPY); ++ boolean useZeroCopy = (conf != null) && OrcConf.USE_ZEROCOPY.getBoolean(conf); + if (useZeroCopy && !lowLevelCache.getAllocator().isDirectAlloc()) { + throw new UnsupportedOperationException("Cannot use zero-copy reader with non-direct cache " + + "buffers; either disable zero-copy or enable direct cache allocation"); + } + this.orcDataReader = orcReader.createDefaultDataReader(useZeroCopy); + } + + @Override + public DiskRangeList getFileData(long fileId, DiskRangeList range, + long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) { + return lowLevelCache.getFileData(fileId, range, baseOffset, factory, counters, gotAllData); + } + + @Override + public long[] putFileData(long fileId, DiskRange[] ranges, + MemoryBuffer[] data, long baseOffset) { + return lowLevelCache.putFileData( + fileId, ranges, data, baseOffset, Priority.NORMAL, counters); + } + + @Override + public void releaseBuffer(MemoryBuffer buffer) { + lowLevelCache.releaseBuffer(buffer); + } + + @Override + public void reuseBuffer(MemoryBuffer buffer) { + boolean isReused = lowLevelCache.reuseBuffer(buffer); + assert isReused; + } + + @Override + public Allocator getAllocator() { + return lowLevelCache.getAllocator(); + } + + @Override + public void close() throws IOException { + orcDataReader.close(); + } + + @Override + public DiskRangeList readFileData(DiskRangeList range, long baseOffset, + boolean doForceDirect) throws IOException { + long startTime = counters.startTimeCounter(); + DiskRangeList result = orcDataReader.readFileData(range, baseOffset, doForceDirect); + counters.recordHdfsTime(startTime); + if (DebugUtils.isTraceOrcEnabled() && LOG.isInfoEnabled()) { + LOG.info("Disk ranges after disk read (file " + fileId + ", base offset " + baseOffset + + "): " + RecordReaderUtils.stringifyDiskRanges(result)); + } + return result; + } + + @Override + public boolean isTrackingDiskRanges() { + return orcDataReader.isTrackingDiskRanges(); + } + + @Override + public void releaseBuffer(ByteBuffer buffer) { + orcDataReader.releaseBuffer(buffer); + } + + @Override + public void open() throws IOException { + long startTime = counters.startTimeCounter(); + orcDataReader.open(); + counters.recordHdfsTime(startTime); + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/pom.xml ---------------------------------------------------------------------- diff --cc pom.xml index 24b8b49,d2a5d52..9ff08a8 --- a/pom.xml +++ b/pom.xml @@@ -47,9 -47,9 +47,10 @@@ <module>ql</module> <module>serde</module> <module>service</module> + <module>llap-client</module> <module>shims</module> <module>spark-client</module> + <module>storage-api</module> <module>testutils</module> <module>packaging</module> </modules> @@@ -160,9 -160,9 +161,9 @@@ <stax.version>1.0.1</stax.version> <slf4j.version>1.7.5</slf4j.version> <ST4.version>4.0.4</ST4.version> - <tez.version>0.5.2</tez.version> + <tez.version>0.8.0-TEZ-2003-SNAPSHOT</tez.version> <super-csv.version>2.2.0</super-csv.version> - <spark.version>1.3.1</spark.version> + <spark.version>1.4.0</spark.version> <scala.binary.version>2.10</scala.binary.version> <scala.version>2.10.4</scala.version> <tempus-fugit.version>1.1</tempus-fugit.version> http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/pom.xml ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java index ed75639,1b9d7ef..d4bb38c --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java @@@ -171,9 -177,8 +173,8 @@@ public class MapJoinOperator extends Ab return loadHashTable(mapContext, mrContext); } }); - result.add(future); + asyncInitOperations.add(future); - } else if (mapContext == null || mapContext.getLocalWork() == null - || mapContext.getLocalWork().getInputFileChangeSensitive() == false) { + } else if (!isInputFileChangeSensitive(mapContext)) { loadHashTable(mapContext, mrContext); hashTblInitedOnce = true; } http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java index 861f536,0f02737..9f1b20a --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java @@@ -1385,7 -1350,14 +1385,13 @@@ public abstract class Operator<T extend } @Override - protected Collection<Future<?>> initializeOp(Configuration conf) { - return childOperators; + protected void initializeOp(Configuration conf) { } } + + public void removeParents() { + for (Operator<?> parent : new ArrayList<Operator<?>>(getParentOperators())) { + removeParent(parent); + } + } } http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java index 4a5f0b9,aa8808a..8dbe45c --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java @@@ -61,9 -58,10 +58,10 @@@ public class SparkHashTableSinkOperato } @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { - Collection<Future<?>> result = super.initializeOp(hconf); + protected void initializeOp(Configuration hconf) throws HiveException { + super.initializeOp(hconf); ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()]; + byte tag = conf.getTag(); inputOIs[tag] = inputObjInspectors[0]; conf.setTagOrder(new Byte[]{ tag }); htsOperator.setConf(conf); http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java index 77677a0,d649672..f177f0d --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java @@@ -52,10 -50,9 +52,11 @@@ import org.apache.tez.runtime.api.Input import org.apache.tez.runtime.api.LogicalInput; import org.apache.tez.runtime.api.LogicalOutput; import org.apache.tez.runtime.api.ProcessorContext; + import org.apache.tez.runtime.api.Reader; import org.apache.tez.runtime.library.api.KeyValuesReader; +import com.google.common.collect.Lists; + /** * Process input from tez LogicalInput and write output - for a map plan * Just pump the records through the query plan. http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java index 7f50bea,73263ee..8d35855 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java @@@ -173,9 -171,15 +173,15 @@@ public class TezTask extends Task<TezWo } // fetch the counters - Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); - counters = client.getDAGStatus(statusGetOpts).getDAGCounters(); + try { + Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS); + counters = client.getDAGStatus(statusGetOpts).getDAGCounters(); + } catch (Exception err) { + // Don't fail execution due to counters - just don't print summary info + LOG.error("Failed to get counters: " + err, err); + counters = null; + } - TezSessionPoolManager.getInstance().returnSession(session); + TezSessionPoolManager.getInstance().returnSession(session, getWork().getLlapMode()); if (LOG.isInfoEnabled() && counters != null && (conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) || http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java index 007782d,9bd811c..243017a --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java @@@ -99,8 -99,7 +99,7 @@@ public class VectorMapJoinOperator exte } @Override - public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { + public void initializeOp(Configuration hconf) throws HiveException { - // Use a final variable to properly parameterize the processVectorInspector closure. // Using a member variable in the closure will not do the right thing... final int parameterizePosBigTable = conf.getPosBigTable(); http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java index 0000000,6b9ac26..8486d12 mode 000000,100644..100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java @@@ -1,0 -1,104 +1,101 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.hadoop.hive.ql.exec.vector; + + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator; + import org.apache.hadoop.hive.ql.metadata.HiveException; + import org.apache.hadoop.hive.ql.plan.OperatorDesc; + import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc; + import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + + import java.util.Collection; + import java.util.concurrent.Future; + + /** + * Vectorized version of SparkHashTableSinkOperator + * Currently the implementation just delegates all the work to super class + * + * Copied from VectorFileSinkOperator + */ + public class VectorSparkHashTableSinkOperator extends SparkHashTableSinkOperator { + + private static final long serialVersionUID = 1L; + + private VectorizationContext vContext; + + // The above members are initialized by the constructor and must not be + // transient. + //--------------------------------------------------------------------------- + + private transient boolean firstBatch; + + private transient VectorExtractRowDynBatch vectorExtractRowDynBatch; + + protected transient Object[] singleRow; + + public VectorSparkHashTableSinkOperator() { + } + + public VectorSparkHashTableSinkOperator(VectorizationContext vContext, OperatorDesc conf) { + super(); + this.vContext = vContext; + this.conf = (SparkHashTableSinkDesc) conf; + } + + @Override - protected Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { ++ protected void initializeOp(Configuration hconf) throws HiveException { + inputObjInspectors[0] = + VectorizedBatchUtil.convertToStandardStructObjectInspector((StructObjectInspector) inputObjInspectors[0]); + - Collection<Future<?>> result = super.initializeOp(hconf); - assert result.isEmpty(); ++ super.initializeOp(hconf); + + firstBatch = true; - - return result; + } + + @Override + public void process(Object row, int tag) throws HiveException { + VectorizedRowBatch batch = (VectorizedRowBatch) row; + + if (firstBatch) { + vectorExtractRowDynBatch = new VectorExtractRowDynBatch(); + vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], vContext.getProjectedColumns()); + + singleRow = new Object[vectorExtractRowDynBatch.getCount()]; + + firstBatch = false; + } + vectorExtractRowDynBatch.setBatchOnEntry(batch); + if (batch.selectedInUse) { + int selected[] = batch.selected; + for (int logical = 0 ; logical < batch.size; logical++) { + int batchIndex = selected[logical]; + vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + super.process(singleRow, tag); + } + } else { + for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) { + vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + super.process(singleRow, tag); + } + } + + vectorExtractRowDynBatch.forgetBatchOnExit(); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java index 0000000,3bce49d..eb0b408 mode 000000,100644..100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java @@@ -1,0 -1,99 +1,96 @@@ + /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.hadoop.hive.ql.exec.vector; + + import java.util.Collection; + import java.util.concurrent.Future; + + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.hive.ql.metadata.HiveException; + import org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc; + import org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator; + import org.apache.hadoop.hive.ql.plan.OperatorDesc; + import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + import org.apache.hadoop.io.Writable; + + /** + * Vectorized version for SparkPartitionPruningSinkOperator. + * Forked from VectorAppMasterEventOperator. + **/ + public class VectorSparkPartitionPruningSinkOperator extends SparkPartitionPruningSinkOperator { + + private static final long serialVersionUID = 1L; + + private VectorizationContext vContext; + + protected transient boolean firstBatch; + + protected transient VectorExtractRowDynBatch vectorExtractRowDynBatch; + + protected transient Object[] singleRow; + + public VectorSparkPartitionPruningSinkOperator(VectorizationContext context, + OperatorDesc conf) { + super(); + this.conf = (SparkPartitionPruningSinkDesc) conf; + this.vContext = context; + } + + public VectorSparkPartitionPruningSinkOperator() { + } + + @Override - public Collection<Future<?>> initializeOp(Configuration hconf) throws HiveException { ++ public void initializeOp(Configuration hconf) throws HiveException { + inputObjInspectors[0] = + VectorizedBatchUtil.convertToStandardStructObjectInspector( + (StructObjectInspector) inputObjInspectors[0]); - Collection<Future<?>> result = super.initializeOp(hconf); - assert result.isEmpty(); ++ super.initializeOp(hconf); + + firstBatch = true; - - return result; + } + + @Override + public void process(Object data, int tag) throws HiveException { + VectorizedRowBatch batch = (VectorizedRowBatch) data; + if (firstBatch) { + vectorExtractRowDynBatch = new VectorExtractRowDynBatch(); + vectorExtractRowDynBatch.init((StructObjectInspector) inputObjInspectors[0], + vContext.getProjectedColumns()); + singleRow = new Object[vectorExtractRowDynBatch.getCount()]; + firstBatch = false; + } + + vectorExtractRowDynBatch.setBatchOnEntry(batch); + ObjectInspector rowInspector = inputObjInspectors[0]; + try { + Writable writableRow; + for (int logical = 0; logical < batch.size; logical++) { + int batchIndex = batch.selectedInUse ? batch.selected[logical] : logical; + vectorExtractRowDynBatch.extractRow(batchIndex, singleRow); + writableRow = serializer.serialize(singleRow, rowInspector); + writableRow.write(buffer); + } + } catch (Exception e) { + throw new HiveException(e); + } + + vectorExtractRowDynBatch.forgetBatchOnExit(); + } + } http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 0600deb,c7e0780..30db513 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@@ -412,7 -493,8 +499,8 @@@ public class AcidUtils } Collections.sort(working); - long current = bestBaseTxn; + long current = bestBase.txn; + int lastStmtId = -1; for(ParsedDelta next: working) { if (next.maxTransaction > current) { // are any of the new transactions ones that we care about? http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java index 0d9b644,fd16b35..3501d19 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java @@@ -35,10 -36,9 +36,12 @@@ import org.apache.hadoop.conf.Configura import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil; +import org.apache.hadoop.hive.llap.io.api.LlapIo; +import org.apache.hadoop.hive.llap.io.api.LlapIoProxy; + import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; + import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner; import org.apache.hadoop.hive.ql.plan.TableDesc; import org.apache.hadoop.hive.ql.exec.Operator; import org.apache.hadoop.hive.ql.exec.TableScanOperator; http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java index 8bb2fc9,a60ebb4..bdfe26c --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java @@@ -120,75 -110,17 +110,37 @@@ public final class OrcFile return id; } - private WriterVersion(int id) { + WriterVersion(int id) { this.id = id; } + + private static final WriterVersion[] values; + static { + // Assumes few non-negative values close to zero. + int max = Integer.MIN_VALUE; + for (WriterVersion v : WriterVersion.values()) { + if (v.id < 0) throw new AssertionError(); + if (v.id > max) { + max = v.id; + } + } + values = new WriterVersion[max + 1]; + for (WriterVersion v : WriterVersion.values()) { + values[v.id] = v; + } + } + + public static WriterVersion from(int val) { + return values[val]; + } } - public static enum EncodingStrategy { - SPEED, COMPRESSION; - } - - public static enum CompressionStrategy { - SPEED, COMPRESSION; + public enum EncodingStrategy { + SPEED, COMPRESSION } - // Note : these string definitions for table properties are deprecated, - // and retained only for backward compatibility, please do not add to - // them, add to OrcTableProperties below instead - @Deprecated public static final String COMPRESSION = "orc.compress"; - @Deprecated public static final String COMPRESSION_BLOCK_SIZE = "orc.compress.size"; - @Deprecated public static final String STRIPE_SIZE = "orc.stripe.size"; - @Deprecated public static final String ROW_INDEX_STRIDE = "orc.row.index.stride"; - @Deprecated public static final String ENABLE_INDEXES = "orc.create.index"; - @Deprecated public static final String BLOCK_PADDING = "orc.block.padding"; - - /** - * Enum container for all orc table properties. - * If introducing a new orc-specific table property, - * add it here. - */ - public static enum OrcTableProperties { - COMPRESSION("orc.compress"), - COMPRESSION_BLOCK_SIZE("orc.compress.size"), - STRIPE_SIZE("orc.stripe.size"), - BLOCK_SIZE("orc.block.size"), - ROW_INDEX_STRIDE("orc.row.index.stride"), - ENABLE_INDEXES("orc.create.index"), - BLOCK_PADDING("orc.block.padding"), - ENCODING_STRATEGY("orc.encoding.strategy"), - BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns"), - BLOOM_FILTER_FPP("orc.bloom.filter.fpp"); - - private final String propName; - - OrcTableProperties(String propName) { - this.propName = propName; - } - - public String getPropName(){ - return this.propName; - } + public enum CompressionStrategy { + SPEED, COMPRESSION } // unused http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java index 10c19f1,4e6dd7a..eed7000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java @@@ -52,9 -52,9 +52,10 @@@ import org.apache.hadoop.hive.ql.io.Aci import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; +import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader; + import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; @@@ -443,16 -436,16 +439,16 @@@ public class OrcInputFormat implement static final class SplitInfo extends ACIDSplitStrategy { private final Context context; private final FileSystem fs; - private final FileStatus file; + private final HdfsFileStatusWithId file; private final FileInfo fileInfo; private final boolean isOriginal; - private final List<Long> deltas; + private final List<DeltaMetaData> deltas; private final boolean hasBase; SplitInfo(Context context, FileSystem fs, - FileStatus file, FileInfo fileInfo, + HdfsFileStatusWithId file, FileInfo fileInfo, boolean isOriginal, - List<Long> deltas, + List<DeltaMetaData> deltas, boolean hasBase, Path dir, boolean[] covered) throws IOException { super(dir, context.numBuckets, deltas, covered); this.context = context; @@@ -472,14 -465,14 +468,14 @@@ static final class ETLSplitStrategy implements SplitStrategy<SplitInfo> { Context context; FileSystem fs; - List<FileStatus> files; + List<HdfsFileStatusWithId> files; boolean isOriginal; - List<Long> deltas; + List<DeltaMetaData> deltas; Path dir; boolean[] covered; - public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<FileStatus> children, + public ETLSplitStrategy(Context context, FileSystem fs, Path dir, List<HdfsFileStatusWithId> children, - boolean isOriginal, List<Long> deltas, boolean[] covered) { + boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) { this.context = context; this.dir = dir; this.fs = fs; @@@ -548,16 -541,16 +544,16 @@@ * as opposed to query execution (split generation does not read or cache file footers). */ static final class BISplitStrategy extends ACIDSplitStrategy { - List<FileStatus> fileStatuses; + List<HdfsFileStatusWithId> fileStatuses; boolean isOriginal; - List<Long> deltas; + List<DeltaMetaData> deltas; FileSystem fs; Context context; Path dir; public BISplitStrategy(Context context, FileSystem fs, - Path dir, List<FileStatus> fileStatuses, boolean isOriginal, + Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal, - List<Long> deltas, boolean[] covered) { + List<DeltaMetaData> deltas, boolean[] covered) { super(dir, context.numBuckets, deltas, covered); this.context = context; this.fileStatuses = fileStatuses; @@@ -650,10 -639,10 +646,10 @@@ public SplitStrategy call() throws IOException { final SplitStrategy splitStrategy; AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir, - context.conf, context.transactionList); + context.conf, context.transactionList, useFileIds); - List<Long> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); + List<DeltaMetaData> deltas = AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories()); Path base = dirInfo.getBaseDirectory(); - List<FileStatus> original = dirInfo.getOriginalFiles(); + List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles(); boolean[] covered = new boolean[context.numBuckets]; boolean isOriginal = base == null; @@@ -741,11 -714,11 +737,11 @@@ private final TreeMap<Long, BlockLocation> locations; private final FileInfo fileInfo; private List<StripeInformation> stripes; - private ReaderImpl.FileMetaInfo fileMetaInfo; - private Metadata metadata; + private FileMetaInfo fileMetaInfo; + private List<StripeStatistics> stripeStats; private List<OrcProto.Type> types; private final boolean isOriginal; - private final List<Long> deltas; + private final List<DeltaMetaData> deltas; private final boolean hasBase; private OrcFile.WriterVersion writerVersion; private long projColsUncompressedSize; @@@ -894,9 -867,9 +890,9 @@@ includeStripe[i] = (i >= stripeStats.size()) || isStripeSatisfyPredicate(stripeStats.get(i), sarg, filterColumns); - if (LOG.isDebugEnabled() && !includeStripe[i]) { + if (isDebugEnabled && !includeStripe[i]) { LOG.debug("Eliminating ORC stripe-" + i + " of file '" + - file.getPath() + "' as it did not satisfy " + + fileWithId.getFileStatus().getPath() + "' as it did not satisfy " + "predicate condition."); } } http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java index 40675c6,8cf4cc0..dfe042d --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java @@@ -25,10 -25,9 +25,12 @@@ import java.nio.ByteBuffer import java.util.ArrayList; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.ql.io.ColumnarSplit; + import org.apache.hadoop.hive.ql.io.AcidInputFormat; + import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.mapred.FileSplit; @@@ -46,12 -43,10 +48,12 @@@ public class OrcSplit extends FileSpli private boolean hasFooter; private boolean isOriginal; private boolean hasBase; - private final List<Long> deltas = new ArrayList<Long>(); + private final List<AcidInputFormat.DeltaMetaData> deltas = new ArrayList<>(); private OrcFile.WriterVersion writerVersion; + private Long fileId; private long projColsUncompressedSize; + static final int HAS_FILEID_FLAG = 8; static final int BASE_FLAG = 4; static final int ORIGINAL_FLAG = 2; static final int FOOTER_FLAG = 1; @@@ -63,13 -58,10 +65,13 @@@ super(null, 0, 0, (String[]) null); } - public OrcSplit(Path path, long offset, long length, String[] hosts, - ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase, + public OrcSplit(Path path, Long fileId, long offset, long length, String[] hosts, + FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase, - List<Long> deltas, long projectedDataSize) { + List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) { super(path, offset, length, hosts); + // We could avoid serializing file ID and just replace the path with inode-based path. + // However, that breaks bunch of stuff because Hive later looks up things by split path. + this.fileId = fileId; this.fileMetaInfo = fileMetaInfo; hasFooter = this.fileMetaInfo != null; this.isOriginal = isOriginal; @@@ -85,12 -77,11 +87,12 @@@ int flags = (hasBase ? BASE_FLAG : 0) | (isOriginal ? ORIGINAL_FLAG : 0) | - (hasFooter ? FOOTER_FLAG : 0); + (hasFooter ? FOOTER_FLAG : 0) | + (fileId != null ? HAS_FILEID_FLAG : 0); out.writeByte(flags); out.writeInt(deltas.size()); - for(Long delta: deltas) { - out.writeLong(delta); + for(AcidInputFormat.DeltaMetaData delta: deltas) { + delta.write(out); } if (hasFooter) { // serialize FileMetaInfo fields http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java ---------------------------------------------------------------------- diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java index 5117baf,f85420d..4c8389e --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java @@@ -38,10 -37,8 +36,9 @@@ import org.apache.hadoop.fs.FileSystem import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.DiskRange; import org.apache.hadoop.hive.common.DiskRangeList; -import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper; +import org.apache.hadoop.hive.common.DiskRangeList.CreateHelper; +import org.apache.hadoop.hive.common.io.storage_api.DataReader; import org.apache.hadoop.hive.common.type.HiveDecimal; - import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO; @@@ -49,18 -47,22 +46,18 @@@ import org.apache.hadoop.hive.ql.io.orc import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue; - import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc; import org.apache.hadoop.hive.serde2.io.DateWritable; + import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim; import org.apache.hadoop.io.Text; -class RecordReaderImpl implements RecordReader { - +public class RecordReaderImpl implements RecordReader { static final Log LOG = LogFactory.getLog(RecordReaderImpl.class); private static final boolean isLogDebugEnabled = LOG.isDebugEnabled(); - private final Path path; - private final FSDataInputStream file; private final long firstRow; private final List<StripeInformation> stripes = - new ArrayList<StripeInformation>(); + new ArrayList<StripeInformation>(); private OrcProto.StripeFooter stripeFooter; private final long totalRowCount; private final CompressionCodec codec; @@@ -147,16 -150,17 +144,16 @@@ } protected RecordReaderImpl(List<StripeInformation> stripes, - FileSystem fileSystem, - Path path, - Reader.Options options, - List<OrcProto.Type> types, - CompressionCodec codec, - int bufferSize, - long strideRate, - Configuration conf - ) throws IOException { + FileSystem fileSystem, + Path path, + Reader.Options options, + List<OrcProto.Type> types, + CompressionCodec codec, + int bufferSize, + long strideRate, + Configuration conf + ) throws IOException { this.path = path; - this.file = fileSystem.open(path); this.codec = codec; this.types = types; this.bufferSize = bufferSize; @@@ -185,10 -189,11 +182,13 @@@ } } - final boolean zeroCopy = (conf != null) && (HiveConf.getBoolVar(conf, HIVE_ORC_ZEROCOPY)); + Boolean zeroCopy = options.getUseZeroCopy(); + if (zeroCopy == null) { + zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(conf); + } - zcr = zeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) : null; + // TODO: we could change the ctor to pass this externally + this.dataReader = RecordReaderUtils.createDefaultDataReader(fileSystem, path, zeroCopy, codec); + this.dataReader.open(); firstRow = skippedRows; totalRowCount = rows; http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ----------------------------------------------------------------------