This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d245aef  [SPARK-37728][SQL] Reading nested columns with ORC vectorized 
reader can cause ArrayIndexOutOfBoundsException
d245aef is described below

commit d245aefe87cc974fe7fc37587d09bec12c1bf911
Author: Yimin <yimi...@outlook.com>
AuthorDate: Mon Dec 27 22:09:51 2021 -0800

    [SPARK-37728][SQL] Reading nested columns with ORC vectorized reader can 
cause ArrayIndexOutOfBoundsException
    
    ### What changes were proposed in this pull request?
    
    When an OrcColumnarBatchReader is created, method initBatch will be called 
only once. In method initBatch:
    
    `orcVectorWrappers[i] = OrcColumnVectorUtils.toOrcColumnVector(dt, 
wrap.batch().cols[colId]);`
    
    When the second argument of toOrcColumnVector is a 
ListColumnVector/MapColumnVector, orcVectorWrappers[i] is initialized with the 
ListColumnVector or MapColumnVector's offsets and lengths.
    
    However, when method nextBatch of OrcColumnarBatchReader is called, method 
ensureSize of ColumnVector (and its subclasses, like MultiValuedColumnVector) 
could be called, then the ListColumnVector/MapColumnVector's offsets and 
lengths could refer to new array objects. This could result in the 
ArrayIndexOutOfBoundsException.
    
    This PR makes OrcArrayColumnVector.getArray and OrcMapColumnVector.getMap 
always get offsets and lengths from the underlying ColumnVector, which can 
resolve this issue.
    
    ### Why are the changes needed?
    
    Bugfix
    
    ### Does this PR introduce _any_ user-facing change?
    
    No
    
    ### How was this patch tested?
    
    Pass the CIs with the newly added test case.
    
    Closes #35002 from yym1995/fix-nested.
    
    Lead-authored-by: Yimin <yimi...@outlook.com>
    Co-authored-by: Yimin <26797163+yym1...@users.noreply.github.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../datasources/orc/OrcArrayColumnVector.java      | 13 +++++--------
 .../execution/datasources/orc/OrcColumnVector.java |  2 +-
 .../datasources/orc/OrcColumnVectorUtils.java      |  6 ++----
 .../datasources/orc/OrcMapColumnVector.java        | 13 +++++--------
 .../execution/datasources/orc/OrcQuerySuite.scala  | 22 ++++++++++++++++++++++
 5 files changed, 35 insertions(+), 21 deletions(-)

diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java
index 6e13e97..b0c818f 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcArrayColumnVector.java
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.orc;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.ListColumnVector;
 
 import org.apache.spark.sql.types.ArrayType;
 import org.apache.spark.sql.types.DataType;
@@ -31,26 +32,22 @@ import org.apache.spark.unsafe.types.UTF8String;
  */
 public class OrcArrayColumnVector extends OrcColumnVector {
   private final OrcColumnVector data;
-  private final long[] offsets;
-  private final long[] lengths;
 
   OrcArrayColumnVector(
       DataType type,
       ColumnVector vector,
-      OrcColumnVector data,
-      long[] offsets,
-      long[] lengths) {
+      OrcColumnVector data) {
 
     super(type, vector);
 
     this.data = data;
-    this.offsets = offsets;
-    this.lengths = lengths;
   }
 
   @Override
   public ColumnarArray getArray(int rowId) {
-    return new ColumnarArray(data, (int) offsets[rowId], (int) lengths[rowId]);
+    int offsets = (int) ((ListColumnVector) baseData).offsets[rowId];
+    int lengths = (int) ((ListColumnVector) baseData).lengths[rowId];
+    return new ColumnarArray(data, offsets, lengths);
   }
 
   @Override
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
index 0becd25..7fe1b30 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVector.java
@@ -29,7 +29,7 @@ import org.apache.spark.sql.vectorized.ColumnarBatch;
  * this column vector is used to adapt Hive ColumnVector with Spark 
ColumnarVector.
  */
 public abstract class OrcColumnVector extends 
org.apache.spark.sql.vectorized.ColumnVector {
-  private final ColumnVector baseData;
+  protected final ColumnVector baseData;
   private int batchSize;
 
   OrcColumnVector(DataType type, ColumnVector vector) {
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java
index 3bc7cc8..89f6996e 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcColumnVectorUtils.java
@@ -53,15 +53,13 @@ class OrcColumnVectorUtils {
       ListColumnVector listVector = (ListColumnVector) vector;
       OrcColumnVector dataVector = toOrcColumnVector(
         ((ArrayType) type).elementType(), listVector.child);
-      return new OrcArrayColumnVector(
-        type, vector, dataVector, listVector.offsets, listVector.lengths);
+      return new OrcArrayColumnVector(type, vector, dataVector);
     } else if (vector instanceof MapColumnVector) {
       MapColumnVector mapVector = (MapColumnVector) vector;
       MapType mapType = (MapType) type;
       OrcColumnVector keysVector = toOrcColumnVector(mapType.keyType(), 
mapVector.keys);
       OrcColumnVector valuesVector = toOrcColumnVector(mapType.valueType(), 
mapVector.values);
-      return new OrcMapColumnVector(
-        type, vector, keysVector, valuesVector, mapVector.offsets, 
mapVector.lengths);
+      return new OrcMapColumnVector(type, vector, keysVector, valuesVector);
     } else {
       throw new IllegalArgumentException(
         String.format("OrcColumnVectorUtils.toOrcColumnVector should not take 
%s as type " +
diff --git 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java
 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java
index ace8d15..7eedd8b 100644
--- 
a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java
+++ 
b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/orc/OrcMapColumnVector.java
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.execution.datasources.orc;
 
 import org.apache.hadoop.hive.ql.exec.vector.ColumnVector;
+import org.apache.hadoop.hive.ql.exec.vector.MapColumnVector;
 
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.Decimal;
@@ -32,28 +33,24 @@ import org.apache.spark.unsafe.types.UTF8String;
 public class OrcMapColumnVector extends OrcColumnVector {
   private final OrcColumnVector keys;
   private final OrcColumnVector values;
-  private final long[] offsets;
-  private final long[] lengths;
 
   OrcMapColumnVector(
       DataType type,
       ColumnVector vector,
       OrcColumnVector keys,
-      OrcColumnVector values,
-      long[] offsets,
-      long[] lengths) {
+      OrcColumnVector values) {
 
     super(type, vector);
 
     this.keys = keys;
     this.values = values;
-    this.offsets = offsets;
-    this.lengths = lengths;
   }
 
   @Override
   public ColumnarMap getMap(int ordinal) {
-    return new ColumnarMap(keys, values, (int) offsets[ordinal], (int) 
lengths[ordinal]);
+    int offsets = (int) ((MapColumnVector) baseData).offsets[ordinal];
+    int lengths = (int) ((MapColumnVector) baseData).lengths[ordinal];
+    return new ColumnarMap(keys, values, offsets, lengths);
   }
 
   @Override
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
index 8bc92f8..038606b 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/orc/OrcQuerySuite.scala
@@ -744,6 +744,28 @@ abstract class OrcQuerySuite extends OrcQueryTest with 
SharedSparkSession {
     }
   }
 
+  test("SPARK-37728: Reading nested columns with ORC vectorized reader should 
not " +
+    "cause ArrayIndexOutOfBoundsException") {
+    withTempPath { dir =>
+      val path = dir.getCanonicalPath
+      val df = spark.range(100).map { _ =>
+        val arrayColumn = (0 until 50).map(_ => (0 until 1000).map(k => 
k.toString))
+        arrayColumn
+      }.toDF("record").repartition(1)
+      df.write.format("orc").save(path)
+
+      withSQLConf(SQLConf.ORC_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> 
"true") {
+        val readDf = spark.read.orc(path)
+        val vectorizationEnabled = readDf.queryExecution.executedPlan.find {
+          case scan @ (_: FileSourceScanExec | _: BatchScanExec) => 
scan.supportsColumnar
+          case _ => false
+        }.isDefined
+        assert(vectorizationEnabled)
+        checkAnswer(readDf, df)
+      }
+    }
+  }
+
   test("SPARK-36594: ORC vectorized reader should properly check maximal 
number of fields") {
     withTempPath { dir =>
       val path = dir.getCanonicalPath

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to