Github user viirya commented on a diff in the pull request:

    https://github.com/apache/spark/pull/20205#discussion_r160613754
  
    --- Diff: 
sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnarBatchReader.java
 ---
    @@ -167,27 +172,61 @@ public void initBatch(
         }
     
         int capacity = DEFAULT_SIZE;
    -    if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
    -      columnVectors = OffHeapColumnVector.allocateColumns(capacity, 
resultSchema);
    -    } else {
    -      columnVectors = OnHeapColumnVector.allocateColumns(capacity, 
resultSchema);
    -    }
    -    columnarBatch = new ColumnarBatch(resultSchema, columnVectors, 
capacity);
     
    -    if (partitionValues.numFields() > 0) {
    -      int partitionIdx = requiredFields.length;
    -      for (int i = 0; i < partitionValues.numFields(); i++) {
    -        ColumnVectorUtils.populate(columnVectors[i + partitionIdx], 
partitionValues, i);
    -        columnVectors[i + partitionIdx].setIsConstant();
    +    if (copyToSpark) {
    +      if (MEMORY_MODE == MemoryMode.OFF_HEAP) {
    +        columnVectors = OffHeapColumnVector.allocateColumns(capacity, 
resultSchema);
    +      } else {
    +        columnVectors = OnHeapColumnVector.allocateColumns(capacity, 
resultSchema);
           }
    -    }
     
    -    // Initialize the missing columns once.
    -    for (int i = 0; i < requiredFields.length; i++) {
    -      if (requestedColIds[i] == -1) {
    -        columnVectors[i].putNulls(0, columnarBatch.capacity());
    -        columnVectors[i].setIsConstant();
    +      // Initialize the missing columns once.
    +      for (int i = 0; i < requiredFields.length; i++) {
    +        if (requestedColIds[i] == -1) {
    +          columnVectors[i].putNulls(0, capacity);
    +          columnVectors[i].setIsConstant();
    +        }
    +      }
    +
    +      if (partitionValues.numFields() > 0) {
    +        int partitionIdx = requiredFields.length;
    +        for (int i = 0; i < partitionValues.numFields(); i++) {
    +          ColumnVectorUtils.populate(columnVectors[i + partitionIdx], 
partitionValues, i);
    +          columnVectors[i + partitionIdx].setIsConstant();
    +        }
    +      }
    +
    +      columnarBatch = new ColumnarBatch(resultSchema, columnVectors, 
capacity);
    +    } else {
    +      // Just wrap the ORC column vector instead of copying it to Spark 
column vector.
    +      orcVectorWrappers = new 
org.apache.spark.sql.vectorized.ColumnVector[resultSchema.length()];
    +
    +      for (int i = 0; i < requiredFields.length; i++) {
    +        DataType dt = requiredFields[i].dataType();
    +        int colId = requestedColIds[i];
    +        // Initialize the missing columns once.
    +        if (colId == -1) {
    +          OnHeapColumnVector missingCol = new OnHeapColumnVector(capacity, 
dt);
    +          missingCol.putNulls(0, capacity);
    +          missingCol.setIsConstant();
    +          orcVectorWrappers[i] = missingCol;
    +        } else {
    +          orcVectorWrappers[i] = new OrcColumnVector(dt, 
batch.cols[colId]);
    +        }
           }
    +
    +      if (partitionValues.numFields() > 0) {
    +        int partitionIdx = requiredFields.length;
    +        for (int i = 0; i < partitionValues.numFields(); i++) {
    +          DataType dt = partitionSchema.fields()[i].dataType();
    +          OnHeapColumnVector partitionCol = new 
OnHeapColumnVector(capacity, dt);
    --- End diff --
    
    ditto.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to