This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new da1f95b  [SPARK-31116][SQL] Fix nested schema case-sensitivity in 
ParquetRowConverter
da1f95b is described below

commit da1f95be6b9af59a91a14e01613bdc4e8ac35374
Author: Tae-kyeom, Kim <kimtky...@devsisters.com>
AuthorDate: Mon Mar 16 10:31:56 2020 -0700

    [SPARK-31116][SQL] Fix nested schema case-sensitivity in ParquetRowConverter
    
    ### What changes were proposed in this pull request?
    
    This PR (SPARK-31116) add caseSensitive parameter to ParquetRowConverter so 
that it handle materialize parquet properly with respect to case sensitivity
    
    ### Why are the changes needed?
    
    From spark 3.0.0, below statement throws IllegalArgumentException in 
caseInsensitive mode because of explicit field index searching in 
ParquetRowConverter. As we already constructed parquet requested schema and 
catalyst requested schema during schema clipping in ParquetReadSupport, just 
follow these behavior.
    
    ```scala
    val path = "/some/temp/path"
    
    spark
      .range(1L)
      .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) AS 
StructColumn")
      .write.parquet(path)
    
    val caseInsensitiveSchema = new StructType()
      .add(
        "StructColumn",
        new StructType()
          .add("LowerCase", LongType)
          .add("camelcase", LongType))
    
    spark.read.schema(caseInsensitiveSchema).parquet(path).show()
    ```
    
    ### Does this PR introduce any user-facing change?
    
    No. The changes are only in unreleased branches (`master` and `branch-3.0`).
    
    ### How was this patch tested?
    
    Passed new test cases that check parquet column selection with respect to 
schemas and case sensitivities
    
    Closes #27888 from kimtkyeom/parquet_row_converter_case_sensitivity.
    
    Authored-by: Tae-kyeom, Kim <kimtky...@devsisters.com>
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
    (cherry picked from commit e736c62764137b2c3af90d2dc8a77e391891200a)
    Signed-off-by: Dongjoon Hyun <dongj...@apache.org>
---
 .../datasources/parquet/ParquetRowConverter.scala  | 12 +++++--
 .../spark/sql/FileBasedDataSourceSuite.scala       | 40 ++++++++++++++++++++++
 2 files changed, 50 insertions(+), 2 deletions(-)

diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
index 850adae..22422c0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
@@ -33,8 +33,9 @@ import 
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.{BINARY, DOUBLE
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils, 
GenericArrayData}
+import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, 
CaseInsensitiveMap, DateTimeUtils, GenericArrayData}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils.SQLTimestamp
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.types.UTF8String
 
@@ -178,8 +179,15 @@ private[parquet] class ParquetRowConverter(
 
   // Converters for each field.
   private[this] val fieldConverters: Array[Converter with 
HasParentContainerUpdater] = {
+    // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is 
false
+    // to prevent throwing IllegalArgumentException when searching catalyst 
type's field index
+    val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
+      catalystType.fieldNames.zipWithIndex.toMap
+    } else {
+      CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
+    }
     parquetType.getFields.asScala.map { parquetField =>
-      val fieldIndex = catalystType.fieldIndex(parquetField.getName)
+      val fieldIndex = catalystFieldNameToIndex(parquetField.getName)
       val catalystField = catalystType(fieldIndex)
       // Converted field value should be set to the `fieldIndex`-th cell of 
`currentRow`
       newConverter(parquetField, catalystField.dataType, new 
RowUpdater(currentRow, fieldIndex))
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
index c870958..cb410b4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/FileBasedDataSourceSuite.scala
@@ -842,6 +842,46 @@ class FileBasedDataSourceSuite extends QueryTest
       }
     }
   }
+
+  test("SPARK-31116: Select nested schema with case insensitive mode") {
+    // This test case failed at only Parquet. ORC is added for test coverage 
parity.
+    Seq("orc", "parquet").foreach { format =>
+      Seq("true", "false").foreach { nestedSchemaPruningEnabled =>
+        withSQLConf(
+          SQLConf.CASE_SENSITIVE.key -> "false",
+          SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key -> 
nestedSchemaPruningEnabled) {
+          withTempPath { dir =>
+            val path = dir.getCanonicalPath
+
+            // Prepare values for testing nested parquet data
+            spark
+              .range(1L)
+              .selectExpr("NAMED_STRUCT('lowercase', id, 'camelCase', id + 1) 
AS StructColumn")
+              .write
+              .format(format)
+              .save(path)
+
+            val exactSchema = "StructColumn struct<lowercase: LONG, camelCase: 
LONG>"
+
+            
checkAnswer(spark.read.schema(exactSchema).format(format).load(path), 
Row(Row(0, 1)))
+
+            // In case insensitive manner, parquet's column cases are ignored
+            val innerColumnCaseInsensitiveSchema =
+              "StructColumn struct<Lowercase: LONG, camelcase: LONG>"
+            checkAnswer(
+              
spark.read.schema(innerColumnCaseInsensitiveSchema).format(format).load(path),
+              Row(Row(0, 1)))
+
+            val rootColumnCaseInsensitiveSchema =
+              "structColumn struct<lowercase: LONG, camelCase: LONG>"
+            checkAnswer(
+              
spark.read.schema(rootColumnCaseInsensitiveSchema).format(format).load(path),
+              Row(Row(0, 1)))
+          }
+        }
+      }
+    }
+  }
 }
 
 object TestingUDT {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to