This is an automated email from the ASF dual-hosted git repository. sunchao pushed a commit to branch branch-3.3 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.3 by this push: new 80efaa2e979 [SPARK-34863][SQL][FOLLOW-UP] Handle IsAllNull in OffHeapColumnVector 80efaa2e979 is described below commit 80efaa2e979276643df35b03f9c44c31340a62b3 Author: Ivan Sadikov <ivan.sadi...@databricks.com> AuthorDate: Tue Apr 26 20:44:50 2022 -0700 [SPARK-34863][SQL][FOLLOW-UP] Handle IsAllNull in OffHeapColumnVector ### What changes were proposed in this pull request? This PR fixes an issue of reading null columns with the vectorised Parquet reader when the entire column is null or does not exist. This is especially noticeable when performing a merge or schema evolution in Parquet. The issue is only exposed with the `OffHeapColumnVector` which does not handle `isAllNull` flag - `OnHeapColumnVector` already handles `isAllNull` so everything works fine there. ### Why are the changes needed? The change is needed to correctly read null columns using the vectorised reader in the off-heap mode. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? I updated the existing unit tests to ensure we cover off-heap mode. I confirmed that the tests pass with the fix and fail without. Closes #36366 from sadikovi/fix-off-heap-cv. Authored-by: Ivan Sadikov <ivan.sadi...@databricks.com> Signed-off-by: Chao Sun <sunc...@apple.com> --- .../execution/vectorized/OffHeapColumnVector.java | 2 +- .../datasources/parquet/ParquetIOSuite.scala | 102 ++++++++++++--------- .../parquet/ParquetInteroperabilitySuite.scala | 80 ++++++++-------- 3 files changed, 100 insertions(+), 84 deletions(-) diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java index 42552c7afc6..711c00856e9 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java +++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java @@ -132,7 +132,7 @@ public final class OffHeapColumnVector extends WritableColumnVector { @Override public boolean isNullAt(int rowId) { - return Platform.getByte(null, nulls + rowId) == 1; + return isAllNull || Platform.getByte(null, nulls + rowId) == 1; } // diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 4d01db999fb..5cd1bffdb50 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -376,23 +376,27 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } test("vectorized reader: missing array") { - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { - val data = Seq( - Tuple1(null), - Tuple1(Seq()), - Tuple1(Seq("a", "b", "c")), - Tuple1(Seq(null)) - ) - - val readSchema = new StructType().add("_2", new ArrayType( - new StructType().add("a", LongType, nullable = true), - containsNull = true) - ) + Seq(true, false).foreach { offheapEnabled => + withSQLConf( + SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true", + SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString) { + val data = Seq( + Tuple1(null), + Tuple1(Seq()), + Tuple1(Seq("a", "b", "c")), + Tuple1(Seq(null)) + ) - withParquetFile(data) { file => - checkAnswer(spark.read.schema(readSchema).parquet(file), - Row(null) :: Row(null) :: Row(null) :: Row(null) :: Nil + val readSchema = new StructType().add("_2", new ArrayType( + new StructType().add("a", LongType, nullable = true), + containsNull = true) ) + + withParquetFile(data) { file => + checkAnswer(spark.read.schema(readSchema).parquet(file), + Row(null) :: Row(null) :: Row(null) :: Row(null) :: Nil + ) + } } } } @@ -666,45 +670,53 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } test("vectorized reader: missing all struct fields") { - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { - val data = Seq( - Tuple1((1, "a")), - Tuple1((2, null)), - Tuple1(null) - ) + Seq(true, false).foreach { offheapEnabled => + withSQLConf( + SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true", + SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString) { + val data = Seq( + Tuple1((1, "a")), + Tuple1((2, null)), + Tuple1(null) + ) - val readSchema = new StructType().add("_1", - new StructType() - .add("_3", IntegerType, nullable = true) - .add("_4", LongType, nullable = true), - nullable = true) + val readSchema = new StructType().add("_1", + new StructType() + .add("_3", IntegerType, nullable = true) + .add("_4", LongType, nullable = true), + nullable = true) - withParquetFile(data) { file => - checkAnswer(spark.read.schema(readSchema).parquet(file), - Row(null) :: Row(null) :: Row(null) :: Nil - ) + withParquetFile(data) { file => + checkAnswer(spark.read.schema(readSchema).parquet(file), + Row(null) :: Row(null) :: Row(null) :: Nil + ) + } } } } test("vectorized reader: missing some struct fields") { - withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true") { - val data = Seq( - Tuple1((1, "a")), - Tuple1((2, null)), - Tuple1(null) - ) + Seq(true, false).foreach { offheapEnabled => + withSQLConf( + SQLConf.PARQUET_VECTORIZED_READER_NESTED_COLUMN_ENABLED.key -> "true", + SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString) { + val data = Seq( + Tuple1((1, "a")), + Tuple1((2, null)), + Tuple1(null) + ) - val readSchema = new StructType().add("_1", - new StructType() - .add("_1", IntegerType, nullable = true) - .add("_3", LongType, nullable = true), - nullable = true) + val readSchema = new StructType().add("_1", + new StructType() + .add("_1", IntegerType, nullable = true) + .add("_3", LongType, nullable = true), + nullable = true) - withParquetFile(data) { file => - checkAnswer(spark.read.schema(readSchema).parquet(file), - Row(null) :: Row(Row(1, null)) :: Row(Row(2, null)) :: Nil - ) + withParquetFile(data) { file => + checkAnswer(spark.read.schema(readSchema).parquet(file), + Row(null) :: Row(Row(1, null)) :: Row(Row(2, null)) :: Nil + ) + } } } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala index a7395a61992..8b386e8f689 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetInteroperabilitySuite.scala @@ -102,47 +102,51 @@ class ParquetInteroperabilitySuite extends ParquetCompatibilityTest with SharedS // the data can be correctly read back. Seq(false, true).foreach { legacyMode => - withSQLConf(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyMode.toString) { - withTempPath { tableDir => - val schema1 = StructType( - StructField("col-0", ArrayType( - StructType( - StructField("col-0", IntegerType, true) :: - Nil - ), - containsNull = false // allows to create 2-level Parquet LIST type in legacy mode - )) :: - Nil - ) - val row1 = Row(Seq(Row(1))) - val df1 = spark.createDataFrame(spark.sparkContext.parallelize(row1 :: Nil, 1), schema1) - df1.write.parquet(tableDir.getAbsolutePath) + Seq(false, true).foreach { offheapEnabled => + withSQLConf( + SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key -> legacyMode.toString, + SQLConf.COLUMN_VECTOR_OFFHEAP_ENABLED.key -> offheapEnabled.toString) { + withTempPath { tableDir => + val schema1 = StructType( + StructField("col-0", ArrayType( + StructType( + StructField("col-0", IntegerType, true) :: + Nil + ), + containsNull = false // allows to create 2-level Parquet LIST type in legacy mode + )) :: + Nil + ) + val row1 = Row(Seq(Row(1))) + val df1 = spark.createDataFrame(spark.sparkContext.parallelize(row1 :: Nil, 1), schema1) + df1.write.parquet(tableDir.getAbsolutePath) - val schema2 = StructType( - StructField("col-0", ArrayType( - StructType( - StructField("col-0", IntegerType, true) :: - StructField("col-1", IntegerType, true) :: // additional field - Nil - ), - containsNull = false - )) :: - Nil - ) - val row2 = Row(Seq(Row(1, 2))) - val df2 = spark.createDataFrame(spark.sparkContext.parallelize(row2 :: Nil, 1), schema2) - df2.write.mode("append").parquet(tableDir.getAbsolutePath) + val schema2 = StructType( + StructField("col-0", ArrayType( + StructType( + StructField("col-0", IntegerType, true) :: + StructField("col-1", IntegerType, true) :: // additional field + Nil + ), + containsNull = false + )) :: + Nil + ) + val row2 = Row(Seq(Row(1, 2))) + val df2 = spark.createDataFrame(spark.sparkContext.parallelize(row2 :: Nil, 1), schema2) + df2.write.mode("append").parquet(tableDir.getAbsolutePath) - // Reading of data should succeed and should not fail with - // java.lang.ClassCastException: optional int32 col-0 is not a group - withAllParquetReaders { - checkAnswer( - spark.read.schema(schema2).parquet(tableDir.getAbsolutePath), - Seq( - Row(Seq(Row(1, null))), - Row(Seq(Row(1, 2))) + // Reading of data should succeed and should not fail with + // java.lang.ClassCastException: optional int32 col-0 is not a group + withAllParquetReaders { + checkAnswer( + spark.read.schema(schema2).parquet(tableDir.getAbsolutePath), + Seq( + Row(Seq(Row(1, null))), + Row(Seq(Row(1, 2))) + ) ) - ) + } } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org