Github user dongjoon-hyun commented on a diff in the pull request: https://github.com/apache/spark/pull/19943#discussion_r160017124 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.scala --- @@ -0,0 +1,493 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.datasources.orc + +import org.apache.hadoop.mapreduce.{InputSplit, RecordReader, TaskAttemptContext} +import org.apache.hadoop.mapreduce.lib.input.FileSplit +import org.apache.orc._ +import org.apache.orc.mapred.OrcInputFormat +import org.apache.orc.storage.ql.exec.vector._ +import org.apache.orc.storage.serde2.io.HiveDecimalWritable + +import org.apache.spark.memory.MemoryMode +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.vectorized._ +import org.apache.spark.sql.types._ +import org.apache.spark.sql.vectorized._ + + +/** + * To support vectorization in WholeStageCodeGen, this reader returns ColumnarBatch. + * After creating, `initialize` and `setRequiredSchema` should be called sequentially. + */ +private[orc] class OrcColumnarBatchReader extends RecordReader[Void, ColumnarBatch] { + import OrcColumnarBatchReader._ + + /** + * ORC File Reader. + */ + private var reader: Reader = _ + + /** + * Vectorized Row Batch. + */ + private var batch: VectorizedRowBatch = _ + + /** + * Requested Column IDs. + */ + private var requestedColIds: Array[Int] = _ + + /** + * Record reader from row batch. + */ + private var recordReader: org.apache.orc.RecordReader = _ + + /** + * Required Schema. + */ + private var requiredSchema: StructType = _ + + /** + * ColumnarBatch for vectorized execution by whole-stage codegen. + */ + private var columnarBatch: ColumnarBatch = _ + + /** + * Writable column vectors of ColumnarBatch. + */ + private var columnVectors: Seq[WritableColumnVector] = _ + + /** + * The number of rows read and considered to be returned. + */ + private var rowsReturned: Long = 0L + + /** + * Total number of rows. + */ + private var totalRowCount: Long = 0L + + override def getCurrentKey: Void = null + + override def getCurrentValue: ColumnarBatch = columnarBatch + + override def getProgress: Float = rowsReturned.toFloat / totalRowCount + + override def nextKeyValue(): Boolean = nextBatch() + + override def close(): Unit = { + if (columnarBatch != null) { + columnarBatch.close() + columnarBatch = null + } + if (recordReader != null) { + recordReader.close() + recordReader = null + } + } + + /** + * Initialize ORC file reader and batch record reader. + * Please note that `setRequiredSchema` is needed to be called after this. + */ + override def initialize(inputSplit: InputSplit, taskAttemptContext: TaskAttemptContext): Unit = { + val fileSplit = inputSplit.asInstanceOf[FileSplit] + val conf = taskAttemptContext.getConfiguration + reader = OrcFile.createReader( + fileSplit.getPath, + OrcFile.readerOptions(conf) + .maxLength(OrcConf.MAX_FILE_LENGTH.getLong(conf)) + .filesystem(fileSplit.getPath.getFileSystem(conf))) + + val options = OrcInputFormat.buildOptions(conf, reader, fileSplit.getStart, fileSplit.getLength) + recordReader = reader.rows(options) + totalRowCount = reader.getNumberOfRows + } + + /** + * Set required schema and partition information. + * With this information, this creates ColumnarBatch with the full schema. + */ + def setRequiredSchema( + orcSchema: TypeDescription, + requestedColIds: Array[Int], + resultSchema: StructType, + requiredSchema: StructType, + partitionValues: InternalRow): Unit = { + batch = orcSchema.createRowBatch(DEFAULT_SIZE) + assert(!batch.selectedInUse, "No filters are allowed") --- End diff -- No, ORC fills the batch after applying the push-downed filters. `selectedInUse` is used by some operators of Hive in order to generate derived row-batches.
--- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org