This is an automated email from the ASF dual-hosted git repository.

maxgekk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ec6a3ae  [SPARK-37176][SQL] Sync JsonInferSchema#infer method's 
exception handle logic with JacksonParser#parse method
ec6a3ae is described below

commit ec6a3ae6dff1dc9c63978ae14a1793ccd771ffff
Author: Xianjin YE <yexianjin....@bytedance.com>
AuthorDate: Tue Nov 2 12:40:09 2021 +0300

    [SPARK-37176][SQL] Sync JsonInferSchema#infer method's exception handle 
logic with JacksonParser#parse method
    
    ### What changes were proposed in this pull request?
    Change `JsonInferSchema#infer`'s exception handle logic to be aligned with 
`JacksonParser#parse`
    
    ### Why are the changes needed?
    To reduce behavior inconsistency, users can have the same expectation for 
schema infer and json parse when dealing with some malformed input.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes.
    Before this patch, json's inferring schema could be failed for some 
malformed input but succeeded when parsing.
    After this patch, they have the same exception handle logic.
    
    ### How was this patch tested?
    Added one new test and modify one exist test to cover the new case.
    
    Closes #34455 from advancedxy/SPARK-37176.
    
    Authored-by: Xianjin YE <yexianjin....@bytedance.com>
    Signed-off-by: Max Gekk <max.g...@gmail.com>
---
 .../spark/sql/catalyst/json/JsonInferSchema.scala  | 33 +++++++++++++++-----
 .../test/resources/test-data/malformed_utf8.json   |  3 ++
 .../sql/execution/datasources/json/JsonSuite.scala | 35 ++++++++++++++++++++++
 3 files changed, 63 insertions(+), 8 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 3b17cde..3b62b16 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql.catalyst.json
 
+import java.io.CharConversionException
+import java.nio.charset.MalformedInputException
 import java.util.Comparator
 
 import scala.util.control.Exception.allCatch
@@ -45,6 +47,18 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
     legacyFormat = FAST_DATE_FORMAT,
     isParsing = true)
 
+  private def handleJsonErrorsByParseMode(parseMode: ParseMode,
+      columnNameOfCorruptRecord: String, e: Throwable): Option[StructType] = {
+    parseMode match {
+      case PermissiveMode =>
+        Some(StructType(Seq(StructField(columnNameOfCorruptRecord, 
StringType))))
+      case DropMalformedMode =>
+        None
+      case FailFastMode =>
+        throw 
QueryExecutionErrors.malformedRecordsDetectedInSchemaInferenceError(e)
+    }
+  }
+
   /**
    * Infer the type of a collection of json records in three stages:
    *   1. Infer the type of each record
@@ -68,14 +82,17 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
             Some(inferField(parser))
           }
         } catch {
-          case  e @ (_: RuntimeException | _: JsonProcessingException) => 
parseMode match {
-            case PermissiveMode =>
-              Some(StructType(Seq(StructField(columnNameOfCorruptRecord, 
StringType))))
-            case DropMalformedMode =>
-              None
-            case FailFastMode =>
-              throw 
QueryExecutionErrors.malformedRecordsDetectedInSchemaInferenceError(e)
-          }
+          case e @ (_: RuntimeException | _: JsonProcessingException |
+                    _: MalformedInputException) =>
+            handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, 
e)
+          case e: CharConversionException if options.encoding.isEmpty =>
+            val msg =
+              """JSON parser cannot handle a character in its input.
+                |Specifying encoding as an input option explicitly might help 
to resolve the issue.
+                |""".stripMargin + e.getMessage
+            val wrappedCharException = new CharConversionException(msg)
+            wrappedCharException.initCause(e)
+            handleJsonErrorsByParseMode(parseMode, columnNameOfCorruptRecord, 
wrappedCharException)
         }
       }.reduceOption(typeMerger).toIterator
     }
diff --git a/sql/core/src/test/resources/test-data/malformed_utf8.json 
b/sql/core/src/test/resources/test-data/malformed_utf8.json
new file mode 100644
index 0000000..c57eb43
--- /dev/null
+++ b/sql/core/src/test/resources/test-data/malformed_utf8.json
@@ -0,0 +1,3 @@
+{"a": 1}
+{"a": 1}
+�
\ No newline at end of file
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 3a30e1b..075d6e9 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -2453,6 +2453,41 @@ abstract class JsonSuite
     checkAnswer(
       spark.read.option("mode", "PERMISSIVE").option("encoding", 
"UTF-8").json(Seq(badJson).toDS()),
       Row(badJson))
+    checkAnswer(
+      // encoding auto detection should also be possible with json schema infer
+      spark.read.option("mode", "PERMISSIVE").json(Seq(badJson).toDS()),
+      Row(badJson))
+  }
+
+  test("SPARK-37176: inferring should be possible when parse mode is 
permissive") {
+    withTempPath { tempDir =>
+      val path = tempDir.getAbsolutePath
+      // normal input to let spark correctly infer schema
+      val record = """{"a":1}"""
+      Seq(record, badJson + record).toDS().write.text(path)
+      val expected = s"""${badJson}{"a":1}"""
+      val df = spark.read.format("json")
+        .option("mode", "PERMISSIVE")
+        .load(path)
+      checkAnswer(df, Seq(Row(null, 1), Row(expected, null)))
+    }
+  }
+
+  test("SPARK-31716: inferring should handle malformed input") {
+    val schema = new StructType().add("a", IntegerType)
+    val dfWithSchema = spark.read.format("json")
+      .option("mode", "DROPMALFORMED")
+      .option("encoding", "utf-8")
+      .schema(schema)
+      .load(testFile("test-data/malformed_utf8.json"))
+    checkAnswer(dfWithSchema, Seq(Row(1), Row(1)))
+
+    val df = spark.read.format("json")
+      .option("mode", "DROPMALFORMED")
+      .option("encoding", "utf-8")
+      .load(testFile("test-data/malformed_utf8.json"))
+
+    checkAnswer(df, Seq(Row(1), Row(1)))
   }
 
   test("SPARK-23772 ignore column of all null values or empty array during 
schema inference") {

---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to