http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
----------------------------------------------------------------------
diff --cc 
llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
index 152230c,0000000..333772c
mode 100644,000000..100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/api/impl/LlapInputFormat.java
@@@ -1,323 -1,0 +1,322 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +
 +package org.apache.hadoop.hive.llap.io.api.impl;
 +
 +import java.io.IOException;
 +import java.util.LinkedList;
 +import java.util.List;
 +
 +import org.apache.hadoop.hive.llap.ConsumerFeedback;
 +import org.apache.hadoop.hive.llap.DebugUtils;
 +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter;
 +import org.apache.hadoop.hive.llap.io.decode.ColumnVectorProducer;
 +import org.apache.hadoop.hive.llap.io.decode.ReadPipeline;
 +import org.apache.hadoop.hive.ql.exec.Utilities;
- import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
 +import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface;
 +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
 +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx;
 +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 +import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
++import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
 +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
- import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory;
 +import org.apache.hadoop.hive.ql.metadata.HiveException;
 +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
 +import org.apache.hadoop.io.NullWritable;
 +import org.apache.hadoop.mapred.FileSplit;
 +import org.apache.hadoop.mapred.InputFormat;
 +import org.apache.hadoop.mapred.InputSplit;
 +import org.apache.hadoop.mapred.JobConf;
 +import org.apache.hadoop.mapred.RecordReader;
 +import org.apache.hadoop.mapred.Reporter;
 +import org.apache.hive.common.util.HiveStringUtils;
 +
 +import com.google.common.util.concurrent.FutureCallback;
 +import com.google.common.util.concurrent.Futures;
 +import com.google.common.util.concurrent.ListenableFuture;
 +import com.google.common.util.concurrent.ListeningExecutorService;
 +
 +public class LlapInputFormat
 +  implements InputFormat<NullWritable, VectorizedRowBatch>, 
VectorizedInputFormatInterface {
 +  @SuppressWarnings("rawtypes")
 +  private final InputFormat sourceInputFormat;
 +  private final ColumnVectorProducer cvp;
 +  private final ListeningExecutorService executor;
 +  private final String hostName;
 +
 +  @SuppressWarnings("rawtypes")
 +  LlapInputFormat(InputFormat sourceInputFormat, ColumnVectorProducer cvp,
 +      ListeningExecutorService executor) {
 +    // TODO: right now, we do nothing with source input format, ORC-only in 
the first cut.
 +    //       We'd need to plumb it thru and use it to get data to cache/etc.
 +    assert sourceInputFormat instanceof OrcInputFormat;
 +    this.executor = executor;
 +    this.cvp = cvp;
 +    this.sourceInputFormat = sourceInputFormat;
 +    this.hostName = HiveStringUtils.getHostname();
 +  }
 +
 +  @Override
 +  public RecordReader<NullWritable, VectorizedRowBatch> getRecordReader(
 +      InputSplit split, JobConf job, Reporter reporter) throws IOException {
 +    boolean isVectorMode = Utilities.isVectorMode(job);
 +    if (!isVectorMode) {
 +      LlapIoImpl.LOG.error("No llap in non-vectorized mode");
 +      throw new UnsupportedOperationException("No llap in non-vectorized 
mode");
 +    }
 +    FileSplit fileSplit = (FileSplit)split;
 +    reporter.setStatus(fileSplit.toString());
 +    try {
 +      List<Integer> includedCols = ColumnProjectionUtils.isReadAllColumns(job)
 +          ? null : ColumnProjectionUtils.getReadColumnIDs(job);
 +      return new LlapRecordReader(job, fileSplit, includedCols, hostName);
 +    } catch (Exception ex) {
 +      throw new IOException(ex);
 +    }
 +  }
 +
 +  @Override
 +  public InputSplit[] getSplits(JobConf job, int numSplits) throws 
IOException {
 +    return sourceInputFormat.getSplits(job, numSplits);
 +  }
 +
 +  private class LlapRecordReader
 +      implements RecordReader<NullWritable, VectorizedRowBatch>, 
Consumer<ColumnVectorBatch> {
 +    private final InputSplit split;
 +    private final List<Integer> columnIds;
 +    private final SearchArgument sarg;
 +    private final String[] columnNames;
 +    private final VectorizedRowBatchCtx rbCtx;
 +
 +    private final LinkedList<ColumnVectorBatch> pendingData = new 
LinkedList<ColumnVectorBatch>();
 +    private ColumnVectorBatch lastCvb = null;
 +    private boolean isFirst = true;
 +
 +    private Throwable pendingError = null;
 +    /** Vector that is currently being processed by our user. */
 +    private boolean isDone = false, isClosed = false;
 +    private ConsumerFeedback<ColumnVectorBatch> feedback;
 +    private final QueryFragmentCounters counters;
 +    private long firstReturnTime;
 +
 +    public LlapRecordReader(
 +        JobConf job, FileSplit split, List<Integer> includedCols, String 
hostName) {
 +      this.split = split;
 +      this.columnIds = includedCols;
-       this.sarg = SearchArgumentFactory.createFromConf(job);
++      this.sarg = ConvertAstToSearchArg.createFromConf(job);
 +      this.columnNames = ColumnProjectionUtils.getReadColumnNames(job);
 +      this.counters = new QueryFragmentCounters(job);
 +      this.counters.setDesc(QueryFragmentCounters.Desc.MACHINE, hostName);
 +      try {
 +        rbCtx = new VectorizedRowBatchCtx();
 +        rbCtx.init(job, split);
 +      } catch (Exception e) {
 +        throw new RuntimeException(e);
 +      }
 +      startRead();
 +    }
 +
 +    @Override
 +    public boolean next(NullWritable key, VectorizedRowBatch value) throws 
IOException {
 +      assert value != null;
 +      if (isClosed) {
 +        throw new AssertionError("next called after close");
 +      }
 +      // Add partition cols if necessary (see VectorizedOrcInputFormat for 
details).
 +      boolean wasFirst = isFirst;
 +      if (isFirst) {
 +        try {
 +          rbCtx.addPartitionColsToBatch(value);
 +        } catch (HiveException e) {
 +          throw new IOException(e);
 +        }
 +        isFirst = false;
 +      }
 +      ColumnVectorBatch cvb = null;
 +      try {
 +        cvb = nextCvb();
 +      } catch (InterruptedException e) {
 +        // Query might have been canceled. Stop the background processing.
 +        feedback.stop();
 +        throw new IOException(e);
 +      }
 +      if (cvb == null) {
 +        if (wasFirst) {
 +          firstReturnTime = counters.startTimeCounter();
 +        }
 +        counters.incrTimeCounter(Counter.CONSUMER_TIME_US, firstReturnTime);
 +        return false;
 +      }
 +      if (columnIds.size() != cvb.cols.length) {
 +        throw new RuntimeException("Unexpected number of columns, VRB has " + 
columnIds.size()
 +            + " included, but the reader returned " + cvb.cols.length);
 +      }
 +      // VRB was created from VrbCtx, so we already have pre-allocated column 
vectors
 +      for (int i = 0; i < cvb.cols.length; ++i) {
 +        // Return old CVs (if any) to caller. We assume these things all have 
the same schema.
 +        cvb.swapColumnVector(i, value.cols, columnIds.get(i));
 +      }
 +      value.selectedInUse = false;
 +      value.size = cvb.size;
 +      if (wasFirst) {
 +        firstReturnTime = counters.startTimeCounter();
 +      }
 +      return true;
 +    }
 +
 +
 +    private final class UncaughtErrorHandler implements FutureCallback<Void> {
 +      @Override
 +      public void onSuccess(Void result) {
 +        // Successful execution of reader is supposed to call setDone.
 +      }
 +
 +      @Override
 +      public void onFailure(Throwable t) {
 +        // Reader is not supposed to throw AFTER calling setError.
 +        LlapIoImpl.LOG.error("Unhandled error from reader thread " + 
t.getMessage());
 +        setError(t);
 +      }
 +    }
 +
 +    private void startRead() {
 +      // Create the consumer of encoded data; it will coordinate decoding to 
CVBs.
 +      ReadPipeline rp = cvp.createReadPipeline(
 +          this, split, columnIds, sarg, columnNames, counters);
 +      feedback = rp;
 +      ListenableFuture<Void> future = executor.submit(rp.getReadCallable());
 +      // TODO: we should NOT do this thing with handler. Reader needs to do 
cleanup in most cases.
 +      Futures.addCallback(future, new UncaughtErrorHandler());
 +    }
 +
 +    ColumnVectorBatch nextCvb() throws InterruptedException, IOException {
 +      boolean isFirst = (lastCvb == null);
 +      if (!isFirst) {
 +        feedback.returnData(lastCvb);
 +      }
 +      synchronized (pendingData) {
 +        // We are waiting for next block. Either we will get it, or be told 
we are done.
 +        boolean doLogBlocking = DebugUtils.isTraceMttEnabled() && 
isNothingToReport();
 +        if (doLogBlocking) {
 +          LlapIoImpl.LOG.info("next will block");
 +        }
 +        while (isNothingToReport()) {
 +          pendingData.wait(100);
 +        }
 +        if (doLogBlocking) {
 +          LlapIoImpl.LOG.info("next is unblocked");
 +        }
 +        rethrowErrorIfAny();
 +        lastCvb = pendingData.poll();
 +      }
 +      if (DebugUtils.isTraceMttEnabled() && lastCvb != null) {
 +        LlapIoImpl.LOG.info("Processing will receive vector " + lastCvb);
 +      }
 +      return lastCvb;
 +    }
 +
 +    private boolean isNothingToReport() {
 +      return !isDone && pendingData.isEmpty() && pendingError == null;
 +    }
 +
 +    @Override
 +    public NullWritable createKey() {
 +      return NullWritable.get();
 +    }
 +
 +    @Override
 +    public VectorizedRowBatch createValue() {
 +      try {
 +        return rbCtx.createVectorizedRowBatch();
 +      } catch (HiveException e) {
 +        throw new RuntimeException("Error creating a batch", e);
 +      }
 +    }
 +
 +    @Override
 +    public long getPos() throws IOException {
 +      return -1; // Position doesn't make sense for async reader, chunk order 
is arbitrary.
 +    }
 +
 +    @Override
 +    public void close() throws IOException {
 +      if (DebugUtils.isTraceMttEnabled()) {
 +        LlapIoImpl.LOG.info("close called; closed " + isClosed + ", done " + 
isDone
 +            + ", err " + pendingError + ", pending " + pendingData.size());
 +      }
 +      LlapIoImpl.LOG.info(counters); // This is where counters are logged!
 +      feedback.stop();
 +      rethrowErrorIfAny();
 +    }
 +
 +    private void rethrowErrorIfAny() throws IOException {
 +      if (pendingError == null) return;
 +      if (pendingError instanceof IOException) {
 +        throw (IOException)pendingError;
 +      }
 +      throw new IOException(pendingError);
 +    }
 +
 +    @Override
 +    public void setDone() {
 +      if (DebugUtils.isTraceMttEnabled()) {
 +        LlapIoImpl.LOG.info("setDone called; closed " + isClosed
 +          + ", done " + isDone + ", err " + pendingError + ", pending " + 
pendingData.size());
 +      }
 +      synchronized (pendingData) {
 +        isDone = true;
 +        pendingData.notifyAll();
 +      }
 +    }
 +
 +    @Override
 +    public void consumeData(ColumnVectorBatch data) {
 +      if (DebugUtils.isTraceMttEnabled()) {
 +        LlapIoImpl.LOG.info("consume called; closed " + isClosed + ", done " 
+ isDone
 +            + ", err " + pendingError + ", pending " + pendingData.size());
 +      }
 +      synchronized (pendingData) {
 +        if (isClosed) {
 +          return;
 +        }
 +        pendingData.add(data);
 +        pendingData.notifyAll();
 +      }
 +    }
 +
 +    @Override
 +    public void setError(Throwable t) {
 +      counters.incrCounter(QueryFragmentCounters.Counter.NUM_ERRORS);
 +      LlapIoImpl.LOG.info("setError called; closed " + isClosed
 +        + ", done " + isDone + ", err " + pendingError + ", pending " + 
pendingData.size());
 +      assert t != null;
 +      synchronized (pendingData) {
 +        pendingError = t;
 +        pendingData.notifyAll();
 +      }
 +    }
 +
 +    @Override
 +    public float getProgress() throws IOException {
 +      // TODO: plumb progress info thru the reader if we can get metadata 
from loader first.
 +      return 0.0f;
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
----------------------------------------------------------------------
diff --cc 
llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
index f88a943,0000000..f310fd5
mode 100644,000000..100644
--- 
a/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
+++ 
b/llap-server/src/java/org/apache/hadoop/hive/llap/io/encoded/OrcEncodedDataReader.java
@@@ -1,906 -1,0 +1,906 @@@
 +package org.apache.hadoop.hive.llap.io.encoded;
 +
 +import java.io.IOException;
 +import java.nio.ByteBuffer;
 +import java.util.ArrayList;
 +import java.util.Arrays;
 +import java.util.Collections;
 +import java.util.List;
 +
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
 +import org.apache.hadoop.conf.Configuration;
 +import org.apache.hadoop.fs.FileSystem;
 +import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hive.common.CallableWithNdc;
 +import org.apache.hadoop.hive.common.DiskRange;
 +import org.apache.hadoop.hive.common.DiskRangeList;
 +import org.apache.hadoop.hive.common.io.storage_api.Allocator;
 +import org.apache.hadoop.hive.common.io.storage_api.DataCache;
 +import org.apache.hadoop.hive.common.io.storage_api.DataReader;
 +import org.apache.hadoop.hive.common.io.storage_api.MemoryBuffer;
 +import 
org.apache.hadoop.hive.common.io.storage_api.EncodedColumnBatch.ColumnStreamData;
 +import org.apache.hadoop.hive.conf.HiveConf;
 +import org.apache.hadoop.hive.llap.ConsumerFeedback;
 +import org.apache.hadoop.hive.llap.DebugUtils;
 +import org.apache.hadoop.hive.llap.cache.Cache;
 +import org.apache.hadoop.hive.llap.cache.LowLevelCache;
 +import org.apache.hadoop.hive.llap.cache.LowLevelCache.Priority;
 +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters;
 +import org.apache.hadoop.hive.llap.counters.QueryFragmentCounters.Counter;
 +import org.apache.hadoop.hive.llap.io.api.impl.LlapIoImpl;
 +import org.apache.hadoop.hive.llap.io.decode.OrcEncodedDataConsumer;
 +import org.apache.hadoop.hive.llap.io.metadata.OrcFileMetadata;
 +import org.apache.hadoop.hive.llap.io.metadata.OrcMetadataCache;
 +import org.apache.hadoop.hive.llap.io.metadata.OrcStripeMetadata;
 +import org.apache.hadoop.hive.ql.exec.DDLTask;
 +import org.apache.hadoop.hive.ql.io.AcidUtils;
 +import org.apache.hadoop.hive.ql.io.HdfsUtils;
 +import org.apache.hadoop.hive.ql.io.orc.CompressionKind;
 +import org.apache.hadoop.hive.ql.io.orc.EncodedReader;
 +import org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl;
 +import 
org.apache.hadoop.hive.ql.io.orc.EncodedReaderImpl.OrcEncodedColumnBatch;
 +import org.apache.hadoop.hive.ql.io.orc.MetadataReader;
 +import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 +import org.apache.hadoop.hive.ql.io.orc.OrcFile.ReaderOptions;
++import org.apache.hadoop.hive.ql.io.orc.OrcConf;
 +import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 +import org.apache.hadoop.hive.ql.io.orc.OrcProto;
 +import org.apache.hadoop.hive.ql.io.orc.OrcSplit;
 +import org.apache.hadoop.hive.ql.io.orc.Reader;
 +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl;
 +import org.apache.hadoop.hive.ql.io.orc.RecordReaderImpl.SargApplier;
 +import org.apache.hadoop.hive.ql.io.orc.llap.Consumer;
 +import org.apache.hadoop.hive.ql.io.orc.llap.OrcBatchKey;
 +import org.apache.hadoop.hive.ql.io.orc.llap.OrcCacheKey;
 +import org.apache.hadoop.hive.ql.io.orc.RecordReaderUtils;
 +import org.apache.hadoop.hive.ql.io.orc.StripeInformation;
 +import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
 +import org.apache.hadoop.hive.shims.HadoopShims;
 +import org.apache.hadoop.hive.shims.ShimLoader;
 +import org.apache.hadoop.mapred.FileSplit;
 +import org.apache.hadoop.mapred.InputSplit;
 +
 +/**
 + * This produces EncodedColumnBatch via ORC EncodedDataImpl.
 + * It serves as Consumer for EncodedColumnBatch too, for the high-level cache 
scenario where
 + * it inserts itself into the pipeline to put the data in cache, before 
passing it to the real
 + * consumer. It also serves as ConsumerFeedback that receives processed 
EncodedColumnBatch-es.
 + */
 +public class OrcEncodedDataReader extends CallableWithNdc<Void>
 +    implements ConsumerFeedback<OrcEncodedColumnBatch>, 
Consumer<OrcEncodedColumnBatch> {
 +  private static final Log LOG = 
LogFactory.getLog(OrcEncodedDataReader.class);
 +
 +  private final OrcMetadataCache metadataCache;
 +  private final LowLevelCache lowLevelCache;
 +  private final Configuration conf;
 +  private final Cache<OrcCacheKey> cache;
 +  private final FileSplit split;
 +  private List<Integer> columnIds;
 +  private final SearchArgument sarg;
 +  private final String[] columnNames;
 +  private final OrcEncodedDataConsumer consumer;
 +  private final QueryFragmentCounters counters;
 +
 +  // Read state.
 +  private int stripeIxFrom;
 +  private OrcFileMetadata fileMetadata;
 +  private Reader orcReader;
 +  private MetadataReader metadataReader;
 +  private EncodedReader stripeReader;
 +  private long fileId;
 +  private FileSystem fs;
 +  /**
 +   * readState[stripeIx'][colIx'] => boolean array (could be a bitmask) of 
rg-s that need to be
 +   * read. Contains only stripes that are read, and only columns included. 
null => read all RGs.
 +   */
 +  private boolean[][][] readState;
 +  private volatile boolean isStopped = false;
 +  @SuppressWarnings("unused")
 +  private volatile boolean isPaused = false;
 +
 +  public OrcEncodedDataReader(LowLevelCache lowLevelCache, Cache<OrcCacheKey> 
cache,
 +      OrcMetadataCache metadataCache, Configuration conf, InputSplit split,
 +      List<Integer> columnIds, SearchArgument sarg, String[] columnNames,
 +      OrcEncodedDataConsumer consumer, QueryFragmentCounters counters) {
 +    this.lowLevelCache = lowLevelCache;
 +    this.metadataCache = metadataCache;
 +    this.cache = cache;
 +    this.conf = conf;
 +    this.split = (FileSplit)split;
 +    this.columnIds = columnIds;
 +    if (this.columnIds != null) {
 +      Collections.sort(this.columnIds);
 +    }
 +    this.sarg = sarg;
 +    this.columnNames = columnNames;
 +    this.consumer = consumer;
 +    this.counters = counters;
 +  }
 +
 +  @Override
 +  public void stop() {
 +    if (LOG.isInfoEnabled()) {
 +      LOG.info("Encoded reader is being stopped");
 +    }
 +    isStopped = true;
 +  }
 +
 +  @Override
 +  public void pause() {
 +    isPaused = true;
 +    // TODO: pause fetching
 +  }
 +
 +  @Override
 +  public void unpause() {
 +    isPaused = false;
 +    // TODO: unpause fetching
 +  }
 +
 +  @Override
 +  protected Void callInternal() throws IOException {
 +    long startTime = counters.startTimeCounter();
 +    if (LlapIoImpl.LOGL.isInfoEnabled()) {
 +      LlapIoImpl.LOG.info("Processing data for " + split.getPath());
 +    }
 +    if (processStop()) {
 +      recordReaderTime(startTime);
 +      return null;
 +    }
 +    counters.setDesc(QueryFragmentCounters.Desc.TABLE, 
getDbAndTableName(split.getPath()));
 +    orcReader = null;
 +    // 1. Get file metadata from cache, or create the reader and read it.
 +    // Don't cache the filesystem object for now; Tez closes it and FS cache 
will fix all that
 +    fs = split.getPath().getFileSystem(conf);
 +    fileId = determineFileId(fs, split);
 +    counters.setDesc(QueryFragmentCounters.Desc.FILE, fileId);
 +
 +    try {
 +      fileMetadata = getOrReadFileMetadata();
 +      consumer.setFileMetadata(fileMetadata);
 +      validateFileMetadata();
 +      if (columnIds == null) {
 +        columnIds = createColumnIds(fileMetadata);
 +      }
 +
 +      // 2. Determine which stripes to read based on the split.
 +      determineStripesToRead();
 +    } catch (Throwable t) {
 +      recordReaderTime(startTime);
 +      consumer.setError(t);
 +      return null;
 +    }
 +
 +    if (readState.length == 0) {
 +      consumer.setDone();
 +      recordReaderTime(startTime);
 +      return null; // No data to read.
 +    }
 +    counters.setDesc(QueryFragmentCounters.Desc.STRIPES, stripeIxFrom + "," + 
readState.length);
 +
 +    // 3. Apply SARG if needed, and otherwise determine what RGs to read.
 +    int stride = fileMetadata.getRowIndexStride();
 +    ArrayList<OrcStripeMetadata> stripeMetadatas = null;
 +    boolean[] globalIncludes = null;
 +    boolean[] sargColumns = null;
 +    try {
 +      globalIncludes = 
OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), columnIds, true);
 +      if (sarg != null && stride != 0) {
 +        // TODO: move this to a common method
 +        int[] filterColumns = 
RecordReaderImpl.mapSargColumns(sarg.getLeaves(), columnNames, 0);
 +        // included will not be null, row options will fill the array with 
trues if null
 +        sargColumns = new boolean[globalIncludes.length];
 +        for (int i : filterColumns) {
 +          // filter columns may have -1 as index which could be partition 
column in SARG.
 +          if (i > 0) {
 +            sargColumns[i] = true;
 +          }
 +        }
 +
 +        // If SARG is present, get relevant stripe metadata from cache or 
readers.
 +        stripeMetadatas = readStripesMetadata(globalIncludes, sargColumns);
 +      }
 +
 +      // Now, apply SARG if any; w/o sarg, this will just initialize 
readState.
 +      boolean hasData = determineRgsToRead(globalIncludes, stride, 
stripeMetadatas);
 +      if (!hasData) {
 +        consumer.setDone();
 +        recordReaderTime(startTime);
 +        return null; // No data to read.
 +      }
 +    } catch (Throwable t) {
 +      cleanupReaders();
 +      consumer.setError(t);
 +      recordReaderTime(startTime);
 +      return null;
 +    }
 +
 +    if (processStop()) {
 +      cleanupReaders();
 +      recordReaderTime(startTime);
 +      return null;
 +    }
 +
 +    // 4. Get data from high-level cache.
 +    //    If some cols are fully in cache, this will also give us the 
modified list of columns to
 +    //    read for every stripe (null means read all of them - the usual 
path). In any case,
 +    //    readState will be modified for column x rgs that were fetched from 
high-level cache.
 +    List<Integer>[] stripeColsToRead = null;
 +    if (cache != null) {
 +      try {
 +        stripeColsToRead = produceDataFromCache(stride);
 +      } catch (Throwable t) {
 +        // produceDataFromCache handles its own cleanup.
 +        consumer.setError(t);
 +        cleanupReaders();
 +        recordReaderTime(startTime);
 +        return null;
 +      }
 +    }
 +
 +    // 5. Create encoded data reader.
 +    // In case if we have high-level cache, we will intercept the data and 
add it there;
 +    // otherwise just pass the data directly to the consumer.
 +    Consumer<OrcEncodedColumnBatch> dataConsumer = (cache == null) ? 
this.consumer : this;
 +    try {
 +      ensureOrcReader();
 +      // Reader creating updates HDFS counters, don't do it here.
 +      DataWrapperForOrc dw = new DataWrapperForOrc();
 +      stripeReader = orcReader.encodedReader(fileId, dw, dw);
 +      stripeReader.setDebugTracing(DebugUtils.isTraceOrcEnabled());
 +    } catch (Throwable t) {
 +      consumer.setError(t);
 +      recordReaderTime(startTime);
 +      cleanupReaders();
 +      return null;
 +    }
 +
 +    // 6. Read data.
 +    // TODO: I/O threadpool could be here - one thread per stripe; for now, 
linear.
 +    OrcBatchKey stripeKey = new OrcBatchKey(fileId, -1, 0);
 +    for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
 +      if (processStop()) {
 +        cleanupReaders();
 +        recordReaderTime(startTime);
 +        return null;
 +      }
 +      int stripeIx = stripeIxFrom + stripeIxMod;
 +      boolean[][] colRgs = null;
 +      boolean[] stripeIncludes = null;
 +      OrcStripeMetadata stripeMetadata = null;
 +      StripeInformation stripe;
 +      try {
 +        List<Integer> cols = stripeColsToRead == null ? null : 
stripeColsToRead[stripeIxMod];
 +        if (cols != null && cols.isEmpty()) continue; // No need to read this 
stripe.
 +        stripe = fileMetadata.getStripes().get(stripeIx);
 +
 +        if (DebugUtils.isTraceOrcEnabled()) {
 +          LlapIoImpl.LOG.info("Reading stripe " + stripeIx + ": "
 +              + stripe.getOffset() + ", " + stripe.getLength());
 +        }
 +        colRgs = readState[stripeIxMod];
 +        // We assume that NO_RGS value is only set from SARG filter and for 
all columns;
 +        // intermediate changes for individual columns will unset values in 
the array.
 +        // Skip this case for 0-column read. We could probably special-case 
it just like we do
 +        // in EncodedReaderImpl, but for now it's not that important.
 +        if (colRgs.length > 0 && colRgs[0] == SargApplier.READ_NO_RGS) 
continue;
 +
 +        // 6.1. Determine the columns to read (usually the same as requested).
 +        if (cache == null || cols == null || cols.size() == colRgs.length) {
 +          cols = columnIds;
 +          stripeIncludes = globalIncludes;
 +        } else {
 +          // We are reading subset of the original columns, remove 
unnecessary bitmasks/etc.
 +          // This will never happen w/o high-level cache.
 +          stripeIncludes = 
OrcInputFormat.genIncludedColumns(fileMetadata.getTypes(), cols, true);
 +          colRgs = genStripeColRgs(cols, colRgs);
 +        }
 +
 +        // 6.2. Ensure we have stripe metadata. We might have read it before 
for RG filtering.
 +        boolean isFoundInCache = false;
 +        if (stripeMetadatas != null) {
 +          stripeMetadata = stripeMetadatas.get(stripeIxMod);
 +        } else {
 +          stripeKey.stripeIx = stripeIx;
 +          stripeMetadata = metadataCache.getStripeMetadata(stripeKey);
 +          isFoundInCache = (stripeMetadata != null);
 +          if (!isFoundInCache) {
 +            counters.incrCounter(Counter.METADATA_CACHE_MISS);
 +            ensureMetadataReader();
 +            long startTimeHdfs = counters.startTimeCounter();
 +            stripeMetadata = new OrcStripeMetadata(
 +                stripeKey, metadataReader, stripe, stripeIncludes, 
sargColumns);
 +            counters.incrTimeCounter(Counter.HDFS_TIME_US, startTimeHdfs);
 +            stripeMetadata = metadataCache.putStripeMetadata(stripeMetadata);
 +            if (DebugUtils.isTraceOrcEnabled()) {
 +              LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
 +                  + " metadata with includes: " + 
DebugUtils.toString(stripeIncludes));
 +            }
 +            stripeKey = new OrcBatchKey(fileId, -1, 0);
 +          }
 +          consumer.setStripeMetadata(stripeMetadata);
 +        }
 +        if (!stripeMetadata.hasAllIndexes(stripeIncludes)) {
 +          if (DebugUtils.isTraceOrcEnabled()) {
 +            LlapIoImpl.LOG.info("Updating indexes in stripe " + 
stripeKey.stripeIx
 +                + " metadata for includes: " + 
DebugUtils.toString(stripeIncludes));
 +          }
 +          assert isFoundInCache;
 +          counters.incrCounter(Counter.METADATA_CACHE_MISS);
 +          ensureMetadataReader();
 +          updateLoadedIndexes(stripeMetadata, stripe, stripeIncludes, 
sargColumns);
 +        } else if (isFoundInCache) {
 +          counters.incrCounter(Counter.METADATA_CACHE_HIT);
 +        }
 +      } catch (Throwable t) {
 +        consumer.setError(t);
 +        cleanupReaders();
 +        recordReaderTime(startTime);
 +        return null;
 +      }
 +      if (processStop()) {
 +        cleanupReaders();
 +        recordReaderTime(startTime);
 +        return null;
 +      }
 +
 +      // 6.3. Finally, hand off to the stripe reader to produce the data.
 +      //      This is a sync call that will feed data to the consumer.
 +      try {
 +        // TODO: readEncodedColumns is not supposed to throw; errors should 
be propagated thru
 +        // consumer. It is potentially holding locked buffers, and must 
perform its own cleanup.
 +        // Also, currently readEncodedColumns is not stoppable. The consumer 
will discard the
 +        // data it receives for one stripe. We could probably interrupt it, 
if it checked that.
 +        stripeReader.readEncodedColumns(stripeIx, stripe, 
stripeMetadata.getRowIndexes(),
 +            stripeMetadata.getEncodings(), stripeMetadata.getStreams(), 
stripeIncludes,
 +            colRgs, dataConsumer);
 +      } catch (Throwable t) {
 +        consumer.setError(t);
 +        cleanupReaders();
 +        recordReaderTime(startTime);
 +        return null;
 +      }
 +    }
 +
 +    // Done with all the things.
 +    recordReaderTime(startTime);
 +    dataConsumer.setDone();
 +    if (DebugUtils.isTraceMttEnabled()) {
 +      LlapIoImpl.LOG.info("done processing " + split);
 +    }
 +
 +    // Close the stripe reader, we are done reading.
 +    cleanupReaders();
 +    return null;
 +  }
 +
 +  private void recordReaderTime(long startTime) {
 +    counters.incrTimeCounter(Counter.TOTAL_IO_TIME_US, startTime);
 +  }
 +
 +  private static String getDbAndTableName(Path path) {
 +    // Ideally, we'd get this from split; however, split doesn't contain any 
such thing and it's
 +    // actually pretty hard to get cause even split generator only uses 
paths. We only need this
 +    // for metrics; therefore, brace for BLACK MAGIC!
 +    String[] parts = path.toUri().getPath().toString().split(Path.SEPARATOR);
 +    int dbIx = -1;
 +    // Try to find the default db postfix; don't check two last components - 
at least there
 +    // should be a table and file (we could also try to throw away 
partition/bucket/acid stuff).
 +    for (int i = 0; i < parts.length - 2; ++i) {
 +      if (!parts[i].endsWith(DDLTask.DATABASE_PATH_SUFFIX)) continue;
 +      if (dbIx >= 0) {
 +        dbIx = -1; // Let's not guess.
 +        break;
 +      }
 +      dbIx = i;
 +    }
 +    if (dbIx >= 0) {
 +      return parts[dbIx].substring(0, parts[dbIx].length() - 3) + "." + 
parts[dbIx + 1];
 +    }
 +
 +    // Just go from the back and throw away everything we think is wrong; 
skip last item, the file.
 +    boolean isInPartFields = false;
 +    for (int i = parts.length - 2; i >= 0; --i) {
 +      String p = parts[i];
 +      boolean isPartField = p.contains("=");
 +      if ((isInPartFields && !isPartField) || (!isPartField && 
!p.startsWith(AcidUtils.BASE_PREFIX)
 +          && !p.startsWith(AcidUtils.DELTA_PREFIX) && 
!p.startsWith(AcidUtils.BUCKET_PREFIX))) {
 +        dbIx = i - 1;
 +        break;
 +      }
 +      isInPartFields = isPartField;
 +    }
 +    // If we found something before we ran out of components, use it.
 +    if (dbIx >= 0) {
 +      String dbName = parts[dbIx];
 +      if (dbName.endsWith(DDLTask.DATABASE_PATH_SUFFIX)) {
 +        dbName = dbName.substring(0, dbName.length() - 3);
 +      }
 +      return dbName + "." + parts[dbIx + 1];
 +    }
 +    return "unknown";
 +  }
 +
 +  private void validateFileMetadata() throws IOException {
 +    if (fileMetadata.getCompressionKind() == CompressionKind.NONE) return;
 +    int bufferSize = fileMetadata.getCompressionBufferSize();
 +    int minAllocSize = HiveConf.getIntVar(conf, 
HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC);
 +    if (bufferSize < minAllocSize) {
 +      LOG.warn("ORC compression buffer size (" + bufferSize + ") is smaller 
than LLAP low-level "
 +            + "cache minimum allocation size (" + minAllocSize + "). Decrease 
the value for "
 +            + HiveConf.ConfVars.LLAP_ORC_CACHE_MIN_ALLOC.toString() + " to 
avoid wasting memory");
 +    }
 +  }
 +
 +  private boolean processStop() {
 +    if (!isStopped) return false;
 +    LOG.info("Encoded data reader is stopping");
 +    cleanupReaders();
 +    return true;
 +  }
 +
 +  private static long determineFileId(FileSystem fs, FileSplit split) throws 
IOException {
 +    if (split instanceof OrcSplit) {
 +      Long fileId = ((OrcSplit)split).getFileId();
 +      if (fileId != null) {
 +        return fileId;
 +      }
 +    }
 +    LOG.warn("Split for " + split.getPath() + " (" + split.getClass() + ") 
does not have file ID");
 +    return HdfsUtils.getFileId(fs, split.getPath());
 +  }
 +
 +  private boolean[][] genStripeColRgs(List<Integer> stripeCols, boolean[][] 
globalColRgs) {
 +    boolean[][] stripeColRgs = new boolean[stripeCols.size()][];
 +    for (int i = 0, i2 = -1; i < globalColRgs.length; ++i) {
 +      if (globalColRgs[i] == null) continue;
 +      stripeColRgs[i2] = globalColRgs[i];
 +      ++i2;
 +    }
 +    return stripeColRgs;
 +  }
 +
 +  /**
 +   * Puts all column indexes from metadata to make a column list to read all 
column.
 +   */
 +  private static List<Integer> createColumnIds(OrcFileMetadata metadata) {
 +    List<Integer> columnIds = new 
ArrayList<Integer>(metadata.getTypes().size());
 +    for (int i = 1; i < metadata.getTypes().size(); ++i) {
 +      columnIds.add(i);
 +    }
 +    return columnIds;
 +  }
 +
 +  /**
 +   * In case if stripe metadata in cache does not have all indexes for 
current query, load
 +   * the missing one. This is a temporary cludge until real metadata cache 
becomes available.
 +   */
 +  private void updateLoadedIndexes(OrcStripeMetadata stripeMetadata,
 +      StripeInformation stripe, boolean[] stripeIncludes, boolean[] 
sargColumns) throws IOException {
 +    // We only synchronize on write for now - design of metadata cache is 
very temporary;
 +    // we pre-allocate the array and never remove entries; so readers should 
be safe.
 +    synchronized (stripeMetadata) {
 +      if (stripeMetadata.hasAllIndexes(stripeIncludes)) return;
 +      long startTime = counters.startTimeCounter();
 +      stripeMetadata.loadMissingIndexes(metadataReader, stripe, 
stripeIncludes, sargColumns);
 +      counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
 +    }
 +  }
 +
 +  /**
 +   * Closes the stripe readers (on error).
 +   */
 +  private void cleanupReaders() {
 +    if (metadataReader != null) {
 +      try {
 +        metadataReader.close();
 +      } catch (IOException ex) {
 +        // Ignore.
 +      }
 +    }
 +    if (stripeReader != null) {
 +      try {
 +        stripeReader.close();
 +      } catch (IOException ex) {
 +        // Ignore.
 +      }
 +    }
 +  }
 +
 +  /**
 +   * Ensures orcReader is initialized for the split.
 +   */
 +  private void ensureOrcReader() throws IOException {
 +    if (orcReader != null) return;
 +    Path path = HdfsUtils.getFileIdPath(fs, split.getPath(), fileId);
 +    if (DebugUtils.isTraceOrcEnabled()) {
 +      LOG.info("Creating reader for " + path + " (" + split.getPath() + ")");
 +    }
 +    long startTime = counters.startTimeCounter();
 +    ReaderOptions opts = 
OrcFile.readerOptions(conf).filesystem(fs).fileMetadata(fileMetadata);
 +    orcReader = OrcFile.createReader(path, opts);
 +    counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
 +  }
 +
 +  /**
 +   *  Gets file metadata for the split from cache, or reads it from the file.
 +   */
 +  private OrcFileMetadata getOrReadFileMetadata() throws IOException {
 +    OrcFileMetadata metadata = metadataCache.getFileMetadata(fileId);
 +    if (metadata != null) {
 +      counters.incrCounter(Counter.METADATA_CACHE_HIT);
 +      return metadata;
 +    }
 +    counters.incrCounter(Counter.METADATA_CACHE_MISS);
 +    ensureOrcReader();
 +    // We assume this call doesn't touch HDFS because everything is already 
read; don't add time.
 +    metadata = new OrcFileMetadata(fileId, orcReader);
 +    return metadataCache.putFileMetadata(metadata);
 +  }
 +
 +  /**
 +   * Reads the metadata for all stripes in the file.
 +   */
 +  private ArrayList<OrcStripeMetadata> readStripesMetadata(
 +      boolean[] globalInc, boolean[] sargColumns) throws IOException {
 +    ArrayList<OrcStripeMetadata> result = new 
ArrayList<OrcStripeMetadata>(readState.length);
 +    OrcBatchKey stripeKey = new OrcBatchKey(fileId, 0, 0);
 +    for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
 +      stripeKey.stripeIx = stripeIxMod + stripeIxFrom;
 +      OrcStripeMetadata value = metadataCache.getStripeMetadata(stripeKey);
 +      if (value == null || !value.hasAllIndexes(globalInc)) {
 +        counters.incrCounter(Counter.METADATA_CACHE_MISS);
 +        ensureMetadataReader();
 +        StripeInformation si = 
fileMetadata.getStripes().get(stripeKey.stripeIx);
 +        if (value == null) {
 +          long startTime = counters.startTimeCounter();
 +          value = new OrcStripeMetadata(stripeKey, metadataReader, si, 
globalInc, sargColumns);
 +          counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
 +          value = metadataCache.putStripeMetadata(value);
 +          if (DebugUtils.isTraceOrcEnabled()) {
 +            LlapIoImpl.LOG.info("Caching stripe " + stripeKey.stripeIx
 +                + " metadata with includes: " + 
DebugUtils.toString(globalInc));
 +          }
 +          // Create new key object to reuse for gets; we've used the old one 
to put in cache.
 +          stripeKey = new OrcBatchKey(fileId, 0, 0);
 +        }
 +        // We might have got an old value from cache; recheck it has indexes.
 +        if (!value.hasAllIndexes(globalInc)) {
 +          if (DebugUtils.isTraceOrcEnabled()) {
 +            LlapIoImpl.LOG.info("Updating indexes in stripe " + 
stripeKey.stripeIx
 +                + " metadata for includes: " + 
DebugUtils.toString(globalInc));
 +          }
 +          updateLoadedIndexes(value, si, globalInc, sargColumns);
 +        }
 +      } else {
 +        counters.incrCounter(Counter.METADATA_CACHE_HIT);
 +      }
 +      result.add(value);
 +      consumer.setStripeMetadata(value);
 +    }
 +    return result;
 +  }
 +
 +  private void ensureMetadataReader() throws IOException {
 +    ensureOrcReader();
 +    if (metadataReader != null) return;
 +    long startTime = counters.startTimeCounter();
 +    metadataReader = orcReader.metadata();
 +    counters.incrTimeCounter(Counter.HDFS_TIME_US, startTime);
 +  }
 +
 +  @Override
 +  public void returnData(OrcEncodedColumnBatch ecb) {
 +    for (ColumnStreamData[] datas : ecb.getColumnData()) {
 +      for (ColumnStreamData data : datas) {
 +        if (data.decRef() != 0) continue;
 +        if (DebugUtils.isTraceLockingEnabled()) {
 +          for (MemoryBuffer buf : data.getCacheBuffers()) {
 +            LlapIoImpl.LOG.info("Unlocking " + buf + " at the end of 
processing");
 +          }
 +        }
 +        lowLevelCache.releaseBuffers(data.getCacheBuffers());
 +        EncodedReaderImpl.SB_POOL.offer(data);
 +      }
 +    }
 +    // We can offer ECB even with some streams not discarded; reset() will 
clear the arrays.
 +    EncodedReaderImpl.ECB_POOL.offer(ecb);
 +  }
 +
 +  /**
 +   * Determines which RGs need to be read, after stripes have been determined.
 +   * SARG is applied, and readState is populated for each stripe accordingly.
 +   * @param stripes All stripes in the file (field state is used to determine 
stripes to read).
 +   */
 +  private boolean determineRgsToRead(boolean[] globalIncludes, int 
rowIndexStride,
 +      ArrayList<OrcStripeMetadata> metadata) throws IOException {
 +    SargApplier sargApp = null;
 +    if (sarg != null && rowIndexStride != 0) {
 +      List<OrcProto.Type> types = fileMetadata.getTypes();
 +      String[] colNamesForSarg = OrcInputFormat.getSargColumnNames(
 +          columnNames, types, globalIncludes, 
fileMetadata.isOriginalFormat());
 +      sargApp = new SargApplier(sarg, colNamesForSarg, rowIndexStride, types, 
globalIncludes.length);
 +    }
 +    boolean hasAnyData = false;
 +    // readState should have been initialized by this time with an empty 
array.
 +    for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
 +      int stripeIx = stripeIxMod + stripeIxFrom;
 +      StripeInformation stripe = fileMetadata.getStripes().get(stripeIx);
 +      int rgCount = getRgCount(stripe, rowIndexStride);
 +      boolean[] rgsToRead = null;
 +      if (sargApp != null) {
 +        OrcStripeMetadata stripeMetadata = metadata.get(stripeIxMod);
 +        rgsToRead = sargApp.pickRowGroups(stripe, 
stripeMetadata.getRowIndexes(),
 +            stripeMetadata.getBloomFilterIndexes(), true);
 +      }
 +      boolean isNone = rgsToRead == SargApplier.READ_NO_RGS,
 +          isAll = rgsToRead == SargApplier.READ_ALL_RGS;
 +      hasAnyData = hasAnyData || !isNone;
 +      if (DebugUtils.isTraceOrcEnabled()) {
 +        if (isNone) {
 +          LlapIoImpl.LOG.info("SARG eliminated all RGs for stripe " + 
stripeIx);
 +        } else if (!isAll) {
 +          LlapIoImpl.LOG.info("SARG picked RGs for stripe " + stripeIx + ": "
 +              + DebugUtils.toString(rgsToRead));
 +        } else {
 +          LlapIoImpl.LOG.info("Will read all " + rgCount + " RGs for stripe " 
+ stripeIx);
 +        }
 +      }
 +      assert isAll || isNone || rgsToRead.length == rgCount;
 +      readState[stripeIxMod] = new boolean[columnIds.size()][];
 +      for (int j = 0; j < columnIds.size(); ++j) {
 +        readState[stripeIxMod][j] = (isAll || isNone) ? rgsToRead :
 +          Arrays.copyOf(rgsToRead, rgsToRead.length);
 +      }
 +
 +      adjustRgMetric(rgCount, rgsToRead, isNone, isAll);
 +    }
 +    return hasAnyData;
 +  }
 +
 +  private void adjustRgMetric(int rgCount, boolean[] rgsToRead, boolean 
isNone,
 +      boolean isAll) {
 +    int count = 0;
 +    if (!isAll) {
 +      for (boolean b : rgsToRead) {
 +        if (b)
 +          count++;
 +      }
 +    } else if (!isNone) {
 +      count = rgCount;
 +    }
 +    counters.setCounter(QueryFragmentCounters.Counter.SELECTED_ROWGROUPS, 
count);
 +  }
 +
 +
 +  private int getRgCount(StripeInformation stripe, int rowIndexStride) {
 +    return (int)Math.ceil((double)stripe.getNumberOfRows() / rowIndexStride);
 +  }
 +
 +  /**
 +   * Determine which stripes to read for a split. Populates stripeIxFrom and 
readState.
 +   */
 +  public void determineStripesToRead() {
 +    // The unit of caching for ORC is (rg x column) (see OrcBatchKey).
 +    List<StripeInformation> stripes = fileMetadata.getStripes();
 +    long offset = split.getStart(), maxOffset = offset + split.getLength();
 +    stripeIxFrom = -1;
 +    int stripeIxTo = -1;
 +    if (LlapIoImpl.LOGL.isDebugEnabled()) {
 +      String tmp = "FileSplit {" + split.getStart() + ", " + 
split.getLength() + "}; stripes ";
 +      for (StripeInformation stripe : stripes) {
 +        tmp += "{" + stripe.getOffset() + ", " + stripe.getLength() + "}, ";
 +      }
 +      LlapIoImpl.LOG.debug(tmp);
 +    }
 +
 +    int stripeIx = 0;
 +    for (StripeInformation stripe : stripes) {
 +      long stripeStart = stripe.getOffset();
 +      if (offset > stripeStart) {
 +        // We assume splits will never start in the middle of the stripe.
 +        ++stripeIx;
 +        continue;
 +      }
 +      if (stripeIxFrom == -1) {
 +        if (DebugUtils.isTraceOrcEnabled()) {
 +          LlapIoImpl.LOG.info("Including stripes from " + stripeIx
 +              + " (" + stripeStart + " >= " + offset + ")");
 +        }
 +        stripeIxFrom = stripeIx;
 +      }
 +      if (stripeStart >= maxOffset) {
 +        stripeIxTo = stripeIx;
 +        if (DebugUtils.isTraceOrcEnabled()) {
 +          LlapIoImpl.LOG.info("Including stripes until " + stripeIxTo + " (" 
+ stripeStart
 +              + " >= " + maxOffset + "); " + (stripeIxTo - stripeIxFrom) + " 
stripes");
 +        }
 +        break;
 +      }
 +      ++stripeIx;
 +    }
 +    if (stripeIxTo == -1) {
 +      stripeIxTo = stripeIx;
 +      if (DebugUtils.isTraceOrcEnabled()) {
 +        LlapIoImpl.LOG.info("Including stripes until " + stripeIx + " (end of 
file); "
 +            + (stripeIxTo - stripeIxFrom) + " stripes");
 +      }
 +    }
 +    readState = new boolean[stripeIxTo - stripeIxFrom][][];
 +  }
 +
 +  // TODO: split by stripe? we do everything by stripe, and it might be faster
 +  /**
 +   * Takes the data from high-level cache for all stripes and returns to 
consumer.
 +   * @return List of columns to read per stripe, if any columns were fully 
eliminated by cache.
 +   */
 +  private List<Integer>[] produceDataFromCache(int rowIndexStride) throws 
IOException {
 +    OrcCacheKey key = new OrcCacheKey(fileId, -1, -1, -1);
 +    // For each stripe, keep a list of columns that are not fully in cache 
(null => all of them).
 +    @SuppressWarnings("unchecked")
 +    List<Integer>[] stripeColsNotInCache = new List[readState.length];
 +    for (int stripeIxMod = 0; stripeIxMod < readState.length; ++stripeIxMod) {
 +      key.stripeIx = stripeIxFrom + stripeIxMod;
 +      boolean[][] cols = readState[stripeIxMod];
 +      boolean[] isMissingAnyRgs = new boolean[cols.length];
 +      int totalRgCount = 
getRgCount(fileMetadata.getStripes().get(key.stripeIx), rowIndexStride);
 +      for (int rgIx = 0; rgIx < totalRgCount; ++rgIx) {
 +        OrcEncodedColumnBatch col = EncodedReaderImpl.ECB_POOL.take();
 +        col.init(fileId, key.stripeIx, rgIx, cols.length);
 +        boolean hasAnyCached = false;
 +        try {
 +          key.rgIx = rgIx;
 +          for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
 +            boolean[] readMask = cols[colIxMod];
 +            // Check if RG is eliminated by SARG
 +            if ((readMask == SargApplier.READ_NO_RGS) || (readMask != 
SargApplier.READ_ALL_RGS
 +                && (readMask.length <= rgIx || !readMask[rgIx]))) continue;
 +            key.colIx = columnIds.get(colIxMod);
 +            ColumnStreamData[] cached = cache.get(key);
 +            if (cached == null) {
 +              isMissingAnyRgs[colIxMod] = true;
 +              continue;
 +            }
 +            col.setAllStreamsData(colIxMod, key.colIx, cached);
 +            hasAnyCached = true;
 +            if (readMask == SargApplier.READ_ALL_RGS) {
 +              // We were going to read all RGs, but some were in cache, 
allocate the mask.
 +              cols[colIxMod] = readMask = new boolean[totalRgCount];
 +              Arrays.fill(readMask, true);
 +            }
 +            readMask[rgIx] = false; // Got from cache, don't read from disk.
 +          }
 +        } catch (Throwable t) {
 +          // TODO: Any cleanup needed to release data in col back to cache 
should be here.
 +          throw (t instanceof IOException) ? (IOException)t : new 
IOException(t);
 +        }
 +        if (hasAnyCached) {
 +          consumer.consumeData(col);
 +        }
 +      }
 +      boolean makeStripeColList = false; // By default assume we'll fetch all 
original columns.
 +      for (int colIxMod = 0; colIxMod < cols.length; ++colIxMod) {
 +        if (isMissingAnyRgs[colIxMod]) {
 +          if (makeStripeColList) {
 +            stripeColsNotInCache[stripeIxMod].add(columnIds.get(colIxMod));
 +          }
 +        } else if (!makeStripeColList) {
 +          // Some columns were fully in cache. Make a per-stripe col list, 
add previous columns.
 +          makeStripeColList = true;
 +          stripeColsNotInCache[stripeIxMod] = new 
ArrayList<Integer>(cols.length - 1);
 +          for (int i = 0; i < colIxMod; ++i) {
 +            stripeColsNotInCache[stripeIxMod].add(columnIds.get(i));
 +          }
 +        }
 +      }
 +    }
 +    return stripeColsNotInCache;
 +  }
 +
 +  @Override
 +  public void setDone() {
 +    consumer.setDone();
 +  }
 +
 +  @Override
 +  public void consumeData(OrcEncodedColumnBatch data) {
 +    // Store object in cache; create new key object - cannot be reused.
 +    assert cache != null;
 +    for (int i = 0; i < data.getColumnData().length; ++i) {
 +      OrcCacheKey key = new OrcCacheKey(data.getBatchKey(), 
data.getColumnIxs()[i]);
 +      ColumnStreamData[] toCache = data.getColumnData()[i];
 +      ColumnStreamData[] cached = cache.cacheOrGet(key, toCache);
 +      if (toCache != cached) {
 +        for (ColumnStreamData sb : toCache) {
 +          if (sb.decRef() != 0) continue;
 +          lowLevelCache.releaseBuffers(sb.getCacheBuffers());
 +        }
 +        data.getColumnData()[i] = cached;
 +      }
 +    }
 +    consumer.consumeData(data);
 +  }
 +
 +  @Override
 +  public void setError(Throwable t) {
 +    consumer.setError(t);
 +  }
 +
 +  private class DataWrapperForOrc implements DataReader, DataCache {
 +    private DataReader orcDataReader;
 +
 +    public DataWrapperForOrc() {
-       boolean useZeroCopy = (conf != null)
-           && HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_ORC_ZEROCOPY);
++      boolean useZeroCopy = (conf != null) && 
OrcConf.USE_ZEROCOPY.getBoolean(conf);
 +      if (useZeroCopy && !lowLevelCache.getAllocator().isDirectAlloc()) {
 +        throw new UnsupportedOperationException("Cannot use zero-copy reader 
with non-direct cache "
 +            + "buffers; either disable zero-copy or enable direct cache 
allocation");
 +      }
 +      this.orcDataReader = orcReader.createDefaultDataReader(useZeroCopy);
 +    }
 +
 +    @Override
 +    public DiskRangeList getFileData(long fileId, DiskRangeList range,
 +        long baseOffset, DiskRangeListFactory factory, BooleanRef gotAllData) 
{
 +      return lowLevelCache.getFileData(fileId, range, baseOffset, factory, 
counters, gotAllData);
 +    }
 +
 +    @Override
 +    public long[] putFileData(long fileId, DiskRange[] ranges,
 +        MemoryBuffer[] data, long baseOffset) {
 +      return lowLevelCache.putFileData(
 +          fileId, ranges, data, baseOffset, Priority.NORMAL, counters);
 +    }
 +
 +    @Override
 +    public void releaseBuffer(MemoryBuffer buffer) {
 +      lowLevelCache.releaseBuffer(buffer);
 +    }
 +
 +    @Override
 +    public void reuseBuffer(MemoryBuffer buffer) {
 +      boolean isReused = lowLevelCache.reuseBuffer(buffer);
 +      assert isReused;
 +    }
 +
 +    @Override
 +    public Allocator getAllocator() {
 +      return lowLevelCache.getAllocator();
 +    }
 +
 +    @Override
 +    public void close() throws IOException {
 +      orcDataReader.close();
 +    }
 +
 +    @Override
 +    public DiskRangeList readFileData(DiskRangeList range, long baseOffset,
 +        boolean doForceDirect) throws IOException {
 +      long startTime = counters.startTimeCounter();
 +      DiskRangeList result = orcDataReader.readFileData(range, baseOffset, 
doForceDirect);
 +      counters.recordHdfsTime(startTime);
 +      if (DebugUtils.isTraceOrcEnabled() && LOG.isInfoEnabled()) {
 +        LOG.info("Disk ranges after disk read (file " + fileId + ", base 
offset " + baseOffset
 +              + "): " + RecordReaderUtils.stringifyDiskRanges(result));
 +      }
 +      return result;
 +    }
 +
 +    @Override
 +    public boolean isTrackingDiskRanges() {
 +      return orcDataReader.isTrackingDiskRanges();
 +    }
 +
 +    @Override
 +    public void releaseBuffer(ByteBuffer buffer) {
 +      orcDataReader.releaseBuffer(buffer);
 +    }
 +
 +    @Override
 +    public void open() throws IOException {
 +      long startTime = counters.startTimeCounter();
 +      orcDataReader.open();
 +      counters.recordHdfsTime(startTime);
 +    }
 +  }
 +}

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/pom.xml
----------------------------------------------------------------------
diff --cc pom.xml
index 24b8b49,d2a5d52..9ff08a8
--- a/pom.xml
+++ b/pom.xml
@@@ -47,9 -47,9 +47,10 @@@
      <module>ql</module>
      <module>serde</module>
      <module>service</module>
 +    <module>llap-client</module>
      <module>shims</module>
      <module>spark-client</module>
+     <module>storage-api</module>
      <module>testutils</module>
      <module>packaging</module>
    </modules>
@@@ -160,9 -160,9 +161,9 @@@
      <stax.version>1.0.1</stax.version>
      <slf4j.version>1.7.5</slf4j.version>
      <ST4.version>4.0.4</ST4.version>
 -    <tez.version>0.5.2</tez.version>
 +    <tez.version>0.8.0-TEZ-2003-SNAPSHOT</tez.version>
      <super-csv.version>2.2.0</super-csv.version>
-     <spark.version>1.3.1</spark.version>
+     <spark.version>1.4.0</spark.version>
      <scala.binary.version>2.10</scala.binary.version>
      <scala.version>2.10.4</scala.version>
      <tempus-fugit.version>1.1</tempus-fugit.version>

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/pom.xml
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/HashTableSinkOperator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
index ed75639,1b9d7ef..d4bb38c
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MapJoinOperator.java
@@@ -171,9 -177,8 +173,8 @@@ public class MapJoinOperator extends Ab
                    return loadHashTable(mapContext, mrContext);
                  }
                });
 -      result.add(future);
 +      asyncInitOperations.add(future);
-     } else if (mapContext == null || mapContext.getLocalWork() == null
-         || mapContext.getLocalWork().getInputFileChangeSensitive() == false) {
+     } else if (!isInputFileChangeSensitive(mapContext)) {
        loadHashTable(mapContext, mrContext);
        hashTblInitedOnce = true;
      }

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
index 861f536,0f02737..9f1b20a
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
@@@ -1385,7 -1350,14 +1385,13 @@@ public abstract class Operator<T extend
      }
  
      @Override
 -    protected Collection<Future<?>> initializeOp(Configuration conf) {
 -      return childOperators;
 +    protected void initializeOp(Configuration conf) {
      }
    }
+ 
+   public void removeParents() {
+     for (Operator<?> parent : new 
ArrayList<Operator<?>>(getParentOperators())) {
+       removeParent(parent);
+     }
+   }
  }

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
index 4a5f0b9,aa8808a..8dbe45c
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/SparkHashTableSinkOperator.java
@@@ -61,9 -58,10 +58,10 @@@ public class SparkHashTableSinkOperato
    }
  
    @Override
 -  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
 -    Collection<Future<?>> result = super.initializeOp(hconf);
 +  protected void initializeOp(Configuration hconf) throws HiveException {
 +    super.initializeOp(hconf);
      ObjectInspector[] inputOIs = new ObjectInspector[conf.getTagLength()];
+     byte tag = conf.getTag();
      inputOIs[tag] = inputObjInspectors[0];
      conf.setTagOrder(new Byte[]{ tag });
      htsOperator.setConf(conf);

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkPlanGenerator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/SparkReduceRecordHandler.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/MergeFileRecordProcessor.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
index 77677a0,d649672..f177f0d
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java
@@@ -52,10 -50,9 +52,11 @@@ import org.apache.tez.runtime.api.Input
  import org.apache.tez.runtime.api.LogicalInput;
  import org.apache.tez.runtime.api.LogicalOutput;
  import org.apache.tez.runtime.api.ProcessorContext;
+ import org.apache.tez.runtime.api.Reader;
  import org.apache.tez.runtime.library.api.KeyValuesReader;
  
 +import com.google.common.collect.Lists;
 +
  /**
   * Process input from tez LogicalInput and write output - for a map plan
   * Just pump the records through the query plan.

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index 7f50bea,73263ee..8d35855
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@@ -173,9 -171,15 +173,15 @@@ public class TezTask extends Task<TezWo
        }
  
        // fetch the counters
-       Set<StatusGetOpts> statusGetOpts = 
EnumSet.of(StatusGetOpts.GET_COUNTERS);
-       counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
+       try {
+         Set<StatusGetOpts> statusGetOpts = 
EnumSet.of(StatusGetOpts.GET_COUNTERS);
+         counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
+       } catch (Exception err) {
+         // Don't fail execution due to counters - just don't print summary 
info
+         LOG.error("Failed to get counters: " + err, err);
+         counters = null;
+       }
 -      TezSessionPoolManager.getInstance().returnSession(session);
 +      TezSessionPoolManager.getInstance().returnSession(session, 
getWork().getLlapMode());
  
        if (LOG.isInfoEnabled() && counters != null
            && (conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
index 007782d,9bd811c..243017a
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java
@@@ -99,8 -99,7 +99,7 @@@ public class VectorMapJoinOperator exte
    }
  
    @Override
 -  public Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
 +  public void initializeOp(Configuration hconf) throws HiveException {
- 
      // Use a final variable to properly parameterize the 
processVectorInspector closure.
      // Using a member variable in the closure will not do the right thing...
      final int parameterizePosBigTable = conf.getPosBigTable();

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
index 0000000,6b9ac26..8486d12
mode 000000,100644..100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkHashTableSinkOperator.java
@@@ -1,0 -1,104 +1,101 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.ql.exec.vector;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hive.ql.exec.SparkHashTableSinkOperator;
+ import org.apache.hadoop.hive.ql.metadata.HiveException;
+ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+ import org.apache.hadoop.hive.ql.plan.SparkHashTableSinkDesc;
+ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+ 
+ import java.util.Collection;
+ import java.util.concurrent.Future;
+ 
+ /**
+  * Vectorized version of SparkHashTableSinkOperator
+  * Currently the implementation just delegates all the work to super class
+  *
+  * Copied from VectorFileSinkOperator
+  */
+ public class VectorSparkHashTableSinkOperator extends 
SparkHashTableSinkOperator {
+ 
+   private static final long serialVersionUID = 1L;
+ 
+   private VectorizationContext vContext;
+ 
+   // The above members are initialized by the constructor and must not be
+   // transient.
+   
//---------------------------------------------------------------------------
+ 
+   private transient boolean firstBatch;
+ 
+   private transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+ 
+   protected transient Object[] singleRow;
+ 
+   public VectorSparkHashTableSinkOperator() {
+   }
+ 
+   public VectorSparkHashTableSinkOperator(VectorizationContext vContext, 
OperatorDesc conf) {
+     super();
+     this.vContext = vContext;
+     this.conf = (SparkHashTableSinkDesc) conf;
+   }
+ 
+   @Override
 -  protected Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
++  protected void initializeOp(Configuration hconf) throws HiveException {
+     inputObjInspectors[0] =
+         
VectorizedBatchUtil.convertToStandardStructObjectInspector((StructObjectInspector)
 inputObjInspectors[0]);
+ 
 -    Collection<Future<?>> result = super.initializeOp(hconf);
 -    assert result.isEmpty();
++    super.initializeOp(hconf);
+ 
+     firstBatch = true;
 -
 -    return result;
+   }
+ 
+   @Override
+   public void process(Object row, int tag) throws HiveException {
+     VectorizedRowBatch batch = (VectorizedRowBatch) row;
+ 
+     if (firstBatch) {
+       vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
+       vectorExtractRowDynBatch.init((StructObjectInspector) 
inputObjInspectors[0], vContext.getProjectedColumns());
+ 
+       singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+ 
+       firstBatch = false;
+     }
+     vectorExtractRowDynBatch.setBatchOnEntry(batch);
+     if (batch.selectedInUse) {
+       int selected[] = batch.selected;
+       for (int logical = 0 ; logical < batch.size; logical++) {
+         int batchIndex = selected[logical];
+         vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+         super.process(singleRow, tag);
+       }
+     } else {
+       for (int batchIndex = 0 ; batchIndex < batch.size; batchIndex++) {
+         vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+         super.process(singleRow, tag);
+       }
+     }
+ 
+     vectorExtractRowDynBatch.forgetBatchOnExit();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
----------------------------------------------------------------------
diff --cc 
ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
index 0000000,3bce49d..eb0b408
mode 000000,100644..100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSparkPartitionPruningSinkOperator.java
@@@ -1,0 -1,99 +1,96 @@@
+ /**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  *     http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+ 
+ package org.apache.hadoop.hive.ql.exec.vector;
+ 
+ import java.util.Collection;
+ import java.util.concurrent.Future;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.hive.ql.metadata.HiveException;
+ import 
org.apache.hadoop.hive.ql.optimizer.spark.SparkPartitionPruningSinkDesc;
+ import 
org.apache.hadoop.hive.ql.parse.spark.SparkPartitionPruningSinkOperator;
+ import org.apache.hadoop.hive.ql.plan.OperatorDesc;
+ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+ import org.apache.hadoop.io.Writable;
+ 
+ /**
+  * Vectorized version for SparkPartitionPruningSinkOperator.
+  * Forked from VectorAppMasterEventOperator.
+  **/
+ public class VectorSparkPartitionPruningSinkOperator extends 
SparkPartitionPruningSinkOperator {
+ 
+   private static final long serialVersionUID = 1L;
+ 
+   private VectorizationContext vContext;
+ 
+   protected transient boolean firstBatch;
+ 
+   protected transient VectorExtractRowDynBatch vectorExtractRowDynBatch;
+ 
+   protected transient Object[] singleRow;
+ 
+   public VectorSparkPartitionPruningSinkOperator(VectorizationContext context,
+       OperatorDesc conf) {
+     super();
+     this.conf = (SparkPartitionPruningSinkDesc) conf;
+     this.vContext = context;
+   }
+ 
+   public VectorSparkPartitionPruningSinkOperator() {
+   }
+ 
+   @Override
 -  public Collection<Future<?>> initializeOp(Configuration hconf) throws 
HiveException {
++  public void initializeOp(Configuration hconf) throws HiveException {
+     inputObjInspectors[0] =
+         VectorizedBatchUtil.convertToStandardStructObjectInspector(
+             (StructObjectInspector) inputObjInspectors[0]);
 -    Collection<Future<?>> result = super.initializeOp(hconf);
 -    assert result.isEmpty();
++    super.initializeOp(hconf);
+ 
+     firstBatch = true;
 -
 -    return result;
+   }
+ 
+   @Override
+   public void process(Object data, int tag) throws HiveException {
+     VectorizedRowBatch batch = (VectorizedRowBatch) data;
+     if (firstBatch) {
+       vectorExtractRowDynBatch = new VectorExtractRowDynBatch();
+       vectorExtractRowDynBatch.init((StructObjectInspector) 
inputObjInspectors[0],
+           vContext.getProjectedColumns());
+       singleRow = new Object[vectorExtractRowDynBatch.getCount()];
+       firstBatch = false;
+     }
+ 
+     vectorExtractRowDynBatch.setBatchOnEntry(batch);
+     ObjectInspector rowInspector = inputObjInspectors[0];
+     try {
+       Writable writableRow;
+       for (int logical = 0; logical < batch.size; logical++) {
+         int batchIndex = batch.selectedInUse ? batch.selected[logical] : 
logical;
+         vectorExtractRowDynBatch.extractRow(batchIndex, singleRow);
+         writableRow = serializer.serialize(singleRow, rowInspector);
+         writableRow.write(buffer);
+       }
+     } catch (Exception e) {
+       throw new HiveException(e);
+     }
+ 
+     vectorExtractRowDynBatch.forgetBatchOnExit();
+   }
+ }

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 0600deb,c7e0780..30db513
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@@ -412,7 -493,8 +499,8 @@@ public class AcidUtils 
      }
  
      Collections.sort(working);
 -    long current = bestBaseTxn;
 +    long current = bestBase.txn;
+     int lastStmtId = -1;
      for(ParsedDelta next: working) {
        if (next.maxTransaction > current) {
          // are any of the new transactions ones that we care about?

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
index 0d9b644,fd16b35..3501d19
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
@@@ -35,10 -36,9 +36,12 @@@ import org.apache.hadoop.conf.Configura
  import org.apache.hadoop.conf.Configuration;
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.conf.HiveConf;
 +import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
  import org.apache.hadoop.hive.io.HiveIOExceptionHandlerUtil;
 +import org.apache.hadoop.hive.llap.io.api.LlapIo;
 +import org.apache.hadoop.hive.llap.io.api.LlapIoProxy;
+ import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+ import org.apache.hadoop.hive.ql.exec.spark.SparkDynamicPartitionPruner;
  import org.apache.hadoop.hive.ql.plan.TableDesc;
  import org.apache.hadoop.hive.ql.exec.Operator;
  import org.apache.hadoop.hive.ql.exec.TableScanOperator;

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/IOContextMap.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
index 8bb2fc9,a60ebb4..bdfe26c
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcFile.java
@@@ -120,75 -110,17 +110,37 @@@ public final class OrcFile 
        return id;
      }
  
-     private WriterVersion(int id) {
+     WriterVersion(int id) {
        this.id = id;
      }
 +
 +    private static final WriterVersion[] values;
 +    static {
 +      // Assumes few non-negative values close to zero.
 +      int max = Integer.MIN_VALUE;
 +      for (WriterVersion v : WriterVersion.values()) {
 +        if (v.id < 0) throw new AssertionError();
 +        if (v.id > max) {
 +          max = v.id;
 +        }
 +      }
 +      values = new WriterVersion[max + 1];
 +      for (WriterVersion v : WriterVersion.values()) {
 +        values[v.id] = v;
 +      }
 +    }
 +
 +    public static WriterVersion from(int val) {
 +      return values[val];
 +    }
    }
  
-   public static enum EncodingStrategy {
-     SPEED, COMPRESSION;
-   }
- 
-   public static enum CompressionStrategy {
-     SPEED, COMPRESSION;
+   public enum EncodingStrategy {
+     SPEED, COMPRESSION
    }
  
-   // Note : these string definitions for table properties are deprecated,
-   // and retained only for backward compatibility, please do not add to
-   // them, add to OrcTableProperties below instead
-   @Deprecated public static final String COMPRESSION = "orc.compress";
-   @Deprecated public static final String COMPRESSION_BLOCK_SIZE = 
"orc.compress.size";
-   @Deprecated public static final String STRIPE_SIZE = "orc.stripe.size";
-   @Deprecated public static final String ROW_INDEX_STRIDE = 
"orc.row.index.stride";
-   @Deprecated public static final String ENABLE_INDEXES = "orc.create.index";
-   @Deprecated public static final String BLOCK_PADDING = "orc.block.padding";
- 
-   /**
-    * Enum container for all orc table properties.
-    * If introducing a new orc-specific table property,
-    * add it here.
-    */
-   public static enum OrcTableProperties {
-     COMPRESSION("orc.compress"),
-     COMPRESSION_BLOCK_SIZE("orc.compress.size"),
-     STRIPE_SIZE("orc.stripe.size"),
-     BLOCK_SIZE("orc.block.size"),
-     ROW_INDEX_STRIDE("orc.row.index.stride"),
-     ENABLE_INDEXES("orc.create.index"),
-     BLOCK_PADDING("orc.block.padding"),
-     ENCODING_STRATEGY("orc.encoding.strategy"),
-     BLOOM_FILTER_COLUMNS("orc.bloom.filter.columns"),
-     BLOOM_FILTER_FPP("orc.bloom.filter.fpp");
- 
-     private final String propName;
- 
-     OrcTableProperties(String propName) {
-       this.propName = propName;
-     }
- 
-     public String getPropName(){
-       return this.propName;
-     }
+   public enum CompressionStrategy {
+     SPEED, COMPRESSION
    }
  
    // unused

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
index 10c19f1,4e6dd7a..eed7000
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcInputFormat.java
@@@ -52,9 -52,9 +52,10 @@@ import org.apache.hadoop.hive.ql.io.Aci
  import org.apache.hadoop.hive.ql.io.AcidUtils;
  import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
  import org.apache.hadoop.hive.ql.io.InputFormatChecker;
 +import org.apache.hadoop.hive.ql.io.LlapWrappableInputFormatInterface;
  import org.apache.hadoop.hive.ql.io.RecordIdentifier;
  import org.apache.hadoop.hive.ql.io.StatsProvidingRecordReader;
+ import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
  import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
@@@ -443,16 -436,16 +439,16 @@@ public class OrcInputFormat  implement
    static final class SplitInfo extends ACIDSplitStrategy {
      private final Context context;
      private final FileSystem fs;
 -    private final FileStatus file;
 +    private final HdfsFileStatusWithId file;
      private final FileInfo fileInfo;
      private final boolean isOriginal;
-     private final List<Long> deltas;
+     private final List<DeltaMetaData> deltas;
      private final boolean hasBase;
  
      SplitInfo(Context context, FileSystem fs,
 -        FileStatus file, FileInfo fileInfo,
 +        HdfsFileStatusWithId file, FileInfo fileInfo,
          boolean isOriginal,
-         List<Long> deltas,
+         List<DeltaMetaData> deltas,
          boolean hasBase, Path dir, boolean[] covered) throws IOException {
        super(dir, context.numBuckets, deltas, covered);
        this.context = context;
@@@ -472,14 -465,14 +468,14 @@@
    static final class ETLSplitStrategy implements SplitStrategy<SplitInfo> {
      Context context;
      FileSystem fs;
 -    List<FileStatus> files;
 +    List<HdfsFileStatusWithId> files;
      boolean isOriginal;
-     List<Long> deltas;
+     List<DeltaMetaData> deltas;
      Path dir;
      boolean[] covered;
  
 -    public ETLSplitStrategy(Context context, FileSystem fs, Path dir, 
List<FileStatus> children,
 +    public ETLSplitStrategy(Context context, FileSystem fs, Path dir, 
List<HdfsFileStatusWithId> children,
-         boolean isOriginal, List<Long> deltas, boolean[] covered) {
+         boolean isOriginal, List<DeltaMetaData> deltas, boolean[] covered) {
        this.context = context;
        this.dir = dir;
        this.fs = fs;
@@@ -548,16 -541,16 +544,16 @@@
     * as opposed to query execution (split generation does not read or cache 
file footers).
     */
    static final class BISplitStrategy extends ACIDSplitStrategy {
 -    List<FileStatus> fileStatuses;
 +    List<HdfsFileStatusWithId> fileStatuses;
      boolean isOriginal;
-     List<Long> deltas;
+     List<DeltaMetaData> deltas;
      FileSystem fs;
      Context context;
      Path dir;
  
      public BISplitStrategy(Context context, FileSystem fs,
 -        Path dir, List<FileStatus> fileStatuses, boolean isOriginal,
 +        Path dir, List<HdfsFileStatusWithId> fileStatuses, boolean isOriginal,
-         List<Long> deltas, boolean[] covered) {
+         List<DeltaMetaData> deltas, boolean[] covered) {
        super(dir, context.numBuckets, deltas, covered);
        this.context = context;
        this.fileStatuses = fileStatuses;
@@@ -650,10 -639,10 +646,10 @@@
      public SplitStrategy call() throws IOException {
        final SplitStrategy splitStrategy;
        AcidUtils.Directory dirInfo = AcidUtils.getAcidState(dir,
 -          context.conf, context.transactionList);
 +          context.conf, context.transactionList, useFileIds);
-       List<Long> deltas = 
AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
+       List<DeltaMetaData> deltas = 
AcidUtils.serializeDeltas(dirInfo.getCurrentDirectories());
        Path base = dirInfo.getBaseDirectory();
 -      List<FileStatus> original = dirInfo.getOriginalFiles();
 +      List<HdfsFileStatusWithId> original = dirInfo.getOriginalFiles();
        boolean[] covered = new boolean[context.numBuckets];
        boolean isOriginal = base == null;
  
@@@ -741,11 -714,11 +737,11 @@@
      private final TreeMap<Long, BlockLocation> locations;
      private final FileInfo fileInfo;
      private List<StripeInformation> stripes;
 -    private ReaderImpl.FileMetaInfo fileMetaInfo;
 -    private Metadata metadata;
 +    private FileMetaInfo fileMetaInfo;
 +    private List<StripeStatistics> stripeStats;
      private List<OrcProto.Type> types;
      private final boolean isOriginal;
-     private final List<Long> deltas;
+     private final List<DeltaMetaData> deltas;
      private final boolean hasBase;
      private OrcFile.WriterVersion writerVersion;
      private long projColsUncompressedSize;
@@@ -894,9 -867,9 +890,9 @@@
                includeStripe[i] = (i >= stripeStats.size()) ||
                    isStripeSatisfyPredicate(stripeStats.get(i), sarg,
                        filterColumns);
-               if (LOG.isDebugEnabled() && !includeStripe[i]) {
+               if (isDebugEnabled && !includeStripe[i]) {
                  LOG.debug("Eliminating ORC stripe-" + i + " of file '" +
 -                    file.getPath() + "'  as it did not satisfy " +
 +                    fileWithId.getFileStatus().getPath() + "'  as it did not 
satisfy " +
                      "predicate condition.");
                }
              }

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcNewSplit.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
index 40675c6,8cf4cc0..dfe042d
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/OrcSplit.java
@@@ -25,10 -25,9 +25,12 @@@ import java.nio.ByteBuffer
  import java.util.ArrayList;
  import java.util.List;
  
 +import org.apache.commons.logging.Log;
 +import org.apache.commons.logging.LogFactory;
  import org.apache.hadoop.fs.Path;
 +import org.apache.hadoop.hive.ql.io.ColumnarSplit;
+ import org.apache.hadoop.hive.ql.io.AcidInputFormat;
+ import org.apache.hadoop.hive.ql.io.AcidUtils;
  import org.apache.hadoop.io.Text;
  import org.apache.hadoop.io.WritableUtils;
  import org.apache.hadoop.mapred.FileSplit;
@@@ -46,12 -43,10 +48,12 @@@ public class OrcSplit extends FileSpli
    private boolean hasFooter;
    private boolean isOriginal;
    private boolean hasBase;
-   private final List<Long> deltas = new ArrayList<Long>();
+   private final List<AcidInputFormat.DeltaMetaData> deltas = new 
ArrayList<>();
    private OrcFile.WriterVersion writerVersion;
 +  private Long fileId;
    private long projColsUncompressedSize;
  
 +  static final int HAS_FILEID_FLAG = 8;
    static final int BASE_FLAG = 4;
    static final int ORIGINAL_FLAG = 2;
    static final int FOOTER_FLAG = 1;
@@@ -63,13 -58,10 +65,13 @@@
      super(null, 0, 0, (String[]) null);
    }
  
 -  public OrcSplit(Path path, long offset, long length, String[] hosts,
 -      ReaderImpl.FileMetaInfo fileMetaInfo, boolean isOriginal, boolean 
hasBase,
 +  public OrcSplit(Path path, Long fileId, long offset, long length, String[] 
hosts,
 +      FileMetaInfo fileMetaInfo, boolean isOriginal, boolean hasBase,
-       List<Long> deltas, long projectedDataSize) {
+       List<AcidInputFormat.DeltaMetaData> deltas, long projectedDataSize) {
      super(path, offset, length, hosts);
 +    // We could avoid serializing file ID and just replace the path with 
inode-based path.
 +    // However, that breaks bunch of stuff because Hive later looks up things 
by split path.
 +    this.fileId = fileId;
      this.fileMetaInfo = fileMetaInfo;
      hasFooter = this.fileMetaInfo != null;
      this.isOriginal = isOriginal;
@@@ -85,12 -77,11 +87,12 @@@
  
      int flags = (hasBase ? BASE_FLAG : 0) |
          (isOriginal ? ORIGINAL_FLAG : 0) |
 -        (hasFooter ? FOOTER_FLAG : 0);
 +        (hasFooter ? FOOTER_FLAG : 0) |
 +        (fileId != null ? HAS_FILEID_FLAG : 0);
      out.writeByte(flags);
      out.writeInt(deltas.size());
-     for(Long delta: deltas) {
-       out.writeLong(delta);
+     for(AcidInputFormat.DeltaMetaData delta: deltas) {
+       delta.write(out);
      }
      if (hasFooter) {
        // serialize FileMetaInfo fields

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/Reader.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
----------------------------------------------------------------------
diff --cc ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
index 5117baf,f85420d..4c8389e
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/RecordReaderImpl.java
@@@ -38,10 -37,8 +36,9 @@@ import org.apache.hadoop.fs.FileSystem
  import org.apache.hadoop.fs.Path;
  import org.apache.hadoop.hive.common.DiskRange;
  import org.apache.hadoop.hive.common.DiskRangeList;
 -import org.apache.hadoop.hive.common.DiskRangeList.DiskRangeListCreateHelper;
 +import org.apache.hadoop.hive.common.DiskRangeList.CreateHelper;
 +import org.apache.hadoop.hive.common.io.storage_api.DataReader;
  import org.apache.hadoop.hive.common.type.HiveDecimal;
- import org.apache.hadoop.hive.conf.HiveConf;
  import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
  import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch;
  import org.apache.hadoop.hive.ql.io.filters.BloomFilterIO;
@@@ -49,18 -47,22 +46,18 @@@ import org.apache.hadoop.hive.ql.io.orc
  import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
  import org.apache.hadoop.hive.ql.io.sarg.SearchArgument.TruthValue;
- import org.apache.hadoop.hive.ql.plan.ExprNodeConstantDesc;
  import org.apache.hadoop.hive.serde2.io.DateWritable;
+ import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
  import org.apache.hadoop.hive.serde2.io.TimestampWritable;
 -import org.apache.hadoop.hive.shims.HadoopShims.ZeroCopyReaderShim;
  import org.apache.hadoop.io.Text;
  
 -class RecordReaderImpl implements RecordReader {
 -
 +public class RecordReaderImpl implements RecordReader {
    static final Log LOG = LogFactory.getLog(RecordReaderImpl.class);
    private static final boolean isLogDebugEnabled = LOG.isDebugEnabled();
 -
    private final Path path;
 -  private final FSDataInputStream file;
    private final long firstRow;
    private final List<StripeInformation> stripes =
 -    new ArrayList<StripeInformation>();
 +      new ArrayList<StripeInformation>();
    private OrcProto.StripeFooter stripeFooter;
    private final long totalRowCount;
    private final CompressionCodec codec;
@@@ -147,16 -150,17 +144,16 @@@
    }
  
    protected RecordReaderImpl(List<StripeInformation> stripes,
-                    FileSystem fileSystem,
-                    Path path,
-                    Reader.Options options,
-                    List<OrcProto.Type> types,
-                    CompressionCodec codec,
-                    int bufferSize,
-                    long strideRate,
-                    Configuration conf
-                    ) throws IOException {
+                              FileSystem fileSystem,
+                              Path path,
+                              Reader.Options options,
+                              List<OrcProto.Type> types,
+                              CompressionCodec codec,
+                              int bufferSize,
+                              long strideRate,
+                              Configuration conf
+                              ) throws IOException {
      this.path = path;
 -    this.file = fileSystem.open(path);
      this.codec = codec;
      this.types = types;
      this.bufferSize = bufferSize;
@@@ -185,10 -189,11 +182,13 @@@
        }
      }
  
-     final boolean zeroCopy = (conf != null) && (HiveConf.getBoolVar(conf, 
HIVE_ORC_ZEROCOPY));
+     Boolean zeroCopy = options.getUseZeroCopy();
+     if (zeroCopy == null) {
+       zeroCopy = OrcConf.USE_ZEROCOPY.getBoolean(conf);
+     }
 -    zcr = zeroCopy ? RecordReaderUtils.createZeroCopyShim(file, codec, pool) 
: null;
 +    // TODO: we could change the ctor to pass this externally
 +    this.dataReader = RecordReaderUtils.createDefaultDataReader(fileSystem, 
path, zeroCopy, codec);
 +    this.dataReader.open();
  
      firstRow = skippedRows;
      totalRowCount = rows;

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/io/orc/TreeReaderFactory.java
----------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/hive/blob/c8ae1fbf/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java
----------------------------------------------------------------------

Reply via email to