Github user dongjoon-hyun commented on a diff in the pull request:

    https://github.com/apache/spark/pull/19943#discussion_r160017124
  
    --- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala
 ---
    @@ -0,0 +1,493 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.sql.execution.datasources.orc
    +
    +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, 
TaskAttemptContext}
    +import org.apache.hadoop.mapreduce.lib.input.FileSplit
    +import org.apache.orc._
    +import org.apache.orc.mapred.OrcInputFormat
    +import org.apache.orc.storage.ql.exec.vector._
    +import org.apache.orc.storage.serde2.io.HiveDecimalWritable
    +
    +import org.apache.spark.memory.MemoryMode
    +import org.apache.spark.sql.catalyst.InternalRow
    +import org.apache.spark.sql.execution.vectorized._
    +import org.apache.spark.sql.types._
    +import org.apache.spark.sql.vectorized._
    +
    +
    +/**
    + * To support vectorization in WholeStageCodeGen, this reader returns 
ColumnarBatch.
    + * After creating, `initialize` and `setRequiredSchema` should be called 
sequentially.
    + */
    +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, 
ColumnarBatch] {
    +  import OrcColumnarBatchReader._
    +
    +  /**
    +   * ORC File Reader.
    +   */
    +  private var reader: Reader = _
    +
    +  /**
    +   * Vectorized Row Batch.
    +   */
    +  private var batch: VectorizedRowBatch = _
    +
    +  /**
    +   * Requested Column IDs.
    +   */
    +  private var requestedColIds: Array[Int] = _
    +
    +  /**
    +   * Record reader from row batch.
    +   */
    +  private var recordReader: org.apache.orc.RecordReader = _
    +
    +  /**
    +   * Required Schema.
    +   */
    +  private var requiredSchema: StructType = _
    +
    +  /**
    +   * ColumnarBatch for vectorized execution by whole-stage codegen.
    +   */
    +  private var columnarBatch: ColumnarBatch = _
    +
    +  /**
    +   * Writable column vectors of ColumnarBatch.
    +   */
    +  private var columnVectors: Seq[WritableColumnVector] = _
    +
    +  /**
    +   * The number of rows read and considered to be returned.
    +   */
    +  private var rowsReturned: Long = 0L
    +
    +  /**
    +   * Total number of rows.
    +   */
    +  private var totalRowCount: Long = 0L
    +
    +  override def getCurrentKey: Void = null
    +
    +  override def getCurrentValue: ColumnarBatch = columnarBatch
    +
    +  override def getProgress: Float = rowsReturned.toFloat / totalRowCount
    +
    +  override def nextKeyValue(): Boolean = nextBatch()
    +
    +  override def close(): Unit = {
    +    if (columnarBatch != null) {
    +      columnarBatch.close()
    +      columnarBatch = null
    +    }
    +    if (recordReader != null) {
    +      recordReader.close()
    +      recordReader = null
    +    }
    +  }
    +
    +  /**
    +   * Initialize ORC file reader and batch record reader.
    +   * Please note that `setRequiredSchema` is needed to be called after 
this.
    +   */
    +  override def initialize(inputSplit: InputSplit, taskAttemptContext: 
TaskAttemptContext): Unit = {
    +    val fileSplit = inputSplit.asInstanceOf[FileSplit]
    +    val conf = taskAttemptContext.getConfiguration
    +    reader = OrcFile.createReader(
    +      fileSplit.getPath,
    +      OrcFile.readerOptions(conf)
    +        .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf))
    +        .filesystem(fileSplit.getPath.getFileSystem(conf)))
    +
    +    val options = OrcInputFormat.buildOptions(conf, reader, 
fileSplit.getStart, fileSplit.getLength)
    +    recordReader = reader.rows(options)
    +    totalRowCount = reader.getNumberOfRows
    +  }
    +
    +  /**
    +   * Set required schema and partition information.
    +   * With this information, this creates ColumnarBatch with the full 
schema.
    +   */
    +  def setRequiredSchema(
    +      orcSchema: TypeDescription,
    +      requestedColIds: Array[Int],
    +      resultSchema: StructType,
    +      requiredSchema: StructType,
    +      partitionValues: InternalRow): Unit = {
    +    batch = orcSchema.createRowBatch(DEFAULT_SIZE)
    +    assert(!batch.selectedInUse, "No filters are allowed")
    --- End diff --
    
    No, ORC fills the batch after applying the push-downed filters.
    `selectedInUse` is used by some operators of Hive in order to generate 
derived row-batches. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to