[spark] branch branch-3.0 updated: [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new da1f95b [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter da1f95b is described below commit da1f95be6b9af59a91a14e01613bdc4e8ac35374 Author: Tae-kyeom, Kim AuthorDate: Mon Mar 16 10:31:56 2020 -0700 [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter ### What changes were proposed in this pull request? This PR (SPARK-31116) add caseSensitive parameter to ParquetRowConverter so that it handle materialize parquet properly with respect to case sensitivity ### Why are the changes needed? From spark 3.0.0, below statement throws IllegalArgumentException in caseInsensitive mode because of explicit field index searching in ParquetRowConverter. As we already constructed parquet requested schema and catalyst requested schema during schema clipping in ParquetReadSupport, just follow these behavior. ```scala val path = "/some/temp/path" spark .range(1L) .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") .write.parquet(path) val caseInsensitiveSchema = new StructType() .add( "StructColumn", new StructType() .add("LowerCase", LongType) .add("camelcase", LongType)) spark.read.schema(caseInsensitiveSchema).parquet(path).show() ``` ### Does this PR introduce any user-facing change? No. The changes are only in unreleased branches (`master` and `branch-3.0`). ### How was this patch tested? Passed new test cases that check parquet column selection with respect to schemas and case sensitivities Closes #27888 from kimtkyeom/parquet_row_converter_case_sensitivity. Authored-by: Tae-kyeom, Kim Signed-off-by: Dongjoon Hyun (cherry picked from commit e736c62764137b2c3af90d2dc8a77e391891200a) Signed-off-by: Dongjoon Hyun --- .../datasources/parquet/ParquetRowConverter.scala | 12 +-- .../spark/sql/FileBasedDataSourceSuite.scala | 40 ++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 850adae..22422c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -33,8 +33,9 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -178,8 +179,15 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = { +// (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false +// to prevent throwing IllegalArgumentException when searching catalyst type's field index +val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) { + catalystType.fieldNames.zipWithIndex.toMap +} else { + CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap) +} parquetType.getFields.asScala.map { parquetField => - val fieldIndex = catalystType.fieldIndex(parquetField.getName) + val fieldIndex = catalystFieldNameToIndex(parquetField.getName) val catalystField = catalystType(fieldIndex) // Converted field value should be set to the `fieldIndex`-th cell of `currentRow` newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index c870958..cb410b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -842,6 +842,46 @@ class FileBasedDataSourceSuite extends QueryTest } }
[spark] branch branch-3.0 updated: [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new da1f95b [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter da1f95b is described below commit da1f95be6b9af59a91a14e01613bdc4e8ac35374 Author: Tae-kyeom, Kim AuthorDate: Mon Mar 16 10:31:56 2020 -0700 [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter ### What changes were proposed in this pull request? This PR (SPARK-31116) add caseSensitive parameter to ParquetRowConverter so that it handle materialize parquet properly with respect to case sensitivity ### Why are the changes needed? From spark 3.0.0, below statement throws IllegalArgumentException in caseInsensitive mode because of explicit field index searching in ParquetRowConverter. As we already constructed parquet requested schema and catalyst requested schema during schema clipping in ParquetReadSupport, just follow these behavior. ```scala val path = "/some/temp/path" spark .range(1L) .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") .write.parquet(path) val caseInsensitiveSchema = new StructType() .add( "StructColumn", new StructType() .add("LowerCase", LongType) .add("camelcase", LongType)) spark.read.schema(caseInsensitiveSchema).parquet(path).show() ``` ### Does this PR introduce any user-facing change? No. The changes are only in unreleased branches (`master` and `branch-3.0`). ### How was this patch tested? Passed new test cases that check parquet column selection with respect to schemas and case sensitivities Closes #27888 from kimtkyeom/parquet_row_converter_case_sensitivity. Authored-by: Tae-kyeom, Kim Signed-off-by: Dongjoon Hyun (cherry picked from commit e736c62764137b2c3af90d2dc8a77e391891200a) Signed-off-by: Dongjoon Hyun --- .../datasources/parquet/ParquetRowConverter.scala | 12 +-- .../spark/sql/FileBasedDataSourceSuite.scala | 40 ++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 850adae..22422c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -33,8 +33,9 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -178,8 +179,15 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = { +// (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false +// to prevent throwing IllegalArgumentException when searching catalyst type's field index +val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) { + catalystType.fieldNames.zipWithIndex.toMap +} else { + CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap) +} parquetType.getFields.asScala.map { parquetField => - val fieldIndex = catalystType.fieldIndex(parquetField.getName) + val fieldIndex = catalystFieldNameToIndex(parquetField.getName) val catalystField = catalystType(fieldIndex) // Converted field value should be set to the `fieldIndex`-th cell of `currentRow` newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index c870958..cb410b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -842,6 +842,46 @@ class FileBasedDataSourceSuite extends QueryTest } }
[spark] branch branch-3.0 updated: [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter
This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git The following commit(s) were added to refs/heads/branch-3.0 by this push: new da1f95b [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter da1f95b is described below commit da1f95be6b9af59a91a14e01613bdc4e8ac35374 Author: Tae-kyeom, Kim AuthorDate: Mon Mar 16 10:31:56 2020 -0700 [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter ### What changes were proposed in this pull request? This PR (SPARK-31116) add caseSensitive parameter to ParquetRowConverter so that it handle materialize parquet properly with respect to case sensitivity ### Why are the changes needed? From spark 3.0.0, below statement throws IllegalArgumentException in caseInsensitive mode because of explicit field index searching in ParquetRowConverter. As we already constructed parquet requested schema and catalyst requested schema during schema clipping in ParquetReadSupport, just follow these behavior. ```scala val path = "/some/temp/path" spark .range(1L) .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS StructColumn") .write.parquet(path) val caseInsensitiveSchema = new StructType() .add( "StructColumn", new StructType() .add("LowerCase", LongType) .add("camelcase", LongType)) spark.read.schema(caseInsensitiveSchema).parquet(path).show() ``` ### Does this PR introduce any user-facing change? No. The changes are only in unreleased branches (`master` and `branch-3.0`). ### How was this patch tested? Passed new test cases that check parquet column selection with respect to schemas and case sensitivities Closes #27888 from kimtkyeom/parquet_row_converter_case_sensitivity. Authored-by: Tae-kyeom, Kim Signed-off-by: Dongjoon Hyun (cherry picked from commit e736c62764137b2c3af90d2dc8a77e391891200a) Signed-off-by: Dongjoon Hyun --- .../datasources/parquet/ParquetRowConverter.scala | 12 +-- .../spark/sql/FileBasedDataSourceSuite.scala | 40 ++ 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala index 850adae..22422c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala @@ -33,8 +33,9 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE import org.apache.spark.internal.Logging import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions._ -import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, GenericArrayData} +import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, CaseInsensitiveMap, DateTimeUtils, GenericArrayData} import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types._ import org.apache.spark.unsafe.types.UTF8String @@ -178,8 +179,15 @@ private[parquet] class ParquetRowConverter( // Converters for each field. private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = { +// (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false +// to prevent throwing IllegalArgumentException when searching catalyst type's field index +val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) { + catalystType.fieldNames.zipWithIndex.toMap +} else { + CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap) +} parquetType.getFields.asScala.map { parquetField => - val fieldIndex = catalystType.fieldIndex(parquetField.getName) + val fieldIndex = catalystFieldNameToIndex(parquetField.getName) val catalystField = catalystType(fieldIndex) // Converted field value should be set to the `fieldIndex`-th cell of `currentRow` newConverter(parquetField, catalystField.dataType, new RowUpdater(currentRow, fieldIndex)) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala index c870958..cb410b4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala @@ -842,6 +842,46 @@ class FileBasedDataSourceSuite extends QueryTest } }