This is an automated email from the ASF dual-hosted git repository.

gurwls223 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d72571e  [SPARK-26246][SQL] Inferring TimestampType from JSON
d72571e is described below

commit d72571e51d8b41e2287750759e120547afeeb7d7
Author: Maxim Gekk <maxim.g...@databricks.com>
AuthorDate: Tue Dec 18 13:50:55 2018 +0800

    [SPARK-26246][SQL] Inferring TimestampType from JSON
    
    ## What changes were proposed in this pull request?
    
    The `JsonInferSchema` class is extended to support `TimestampType` 
inferring from string fields in JSON input:
    - If the `prefersDecimal` option is set to `true`, it tries to infer 
decimal type from the string field.
    - If decimal type inference fails or `prefersDecimal` is disabled, 
`JsonInferSchema` tries to infer `TimestampType`.
    - If timestamp type inference fails, `StringType` is returned as the 
inferred type.
    
    ## How was this patch tested?
    
    Added new test suite - `JsonInferSchemaSuite` to check date and timestamp 
types inferring from JSON using `JsonInferSchema` directly. A few tests were 
added `JsonSuite` to check type merging and roundtrip tests. This changes was 
tested by `JsonSuite`, `JsonExpressionsSuite` and `JsonFunctionsSuite` as well.
    
    Closes #23201 from MaxGekk/json-infer-time.
    
    Lead-authored-by: Maxim Gekk <maxim.g...@databricks.com>
    Co-authored-by: Maxim Gekk <max.g...@gmail.com>
    Signed-off-by: Hyukjin Kwon <gurwls...@apache.org>
---
 .../spark/sql/catalyst/json/JsonInferSchema.scala  |  22 ++++-
 .../sql/catalyst/json/JsonInferSchemaSuite.scala   | 102 +++++++++++++++++++++
 .../sql/execution/datasources/json/JsonSuite.scala |  52 +++++++++++
 3 files changed, 171 insertions(+), 5 deletions(-)

diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
index 263e05d..d1bc00c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JsonInferSchema.scala
@@ -28,7 +28,7 @@ import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.catalyst.analysis.TypeCoercion
 import org.apache.spark.sql.catalyst.expressions.ExprUtils
 import org.apache.spark.sql.catalyst.json.JacksonUtils.nextUntil
-import org.apache.spark.sql.catalyst.util.{DropMalformedMode, FailFastMode, 
ParseMode, PermissiveMode}
+import org.apache.spark.sql.catalyst.util._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
@@ -37,6 +37,12 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
 
   private val decimalParser = ExprUtils.getDecimalParser(options.locale)
 
+  @transient
+  private lazy val timestampFormatter = TimestampFormatter(
+    options.timestampFormat,
+    options.timeZone,
+    options.locale)
+
   /**
    * Infer the type of a collection of json records in three stages:
    *   1. Infer the type of each record
@@ -115,13 +121,19 @@ private[sql] class JsonInferSchema(options: JSONOptions) 
extends Serializable {
         // record fields' types have been combined.
         NullType
 
-      case VALUE_STRING if options.prefersDecimal =>
+      case VALUE_STRING =>
+        val field = parser.getText
         val decimalTry = allCatch opt {
-          val bigDecimal = decimalParser(parser.getText)
+          val bigDecimal = decimalParser(field)
             DecimalType(bigDecimal.precision, bigDecimal.scale)
         }
-        decimalTry.getOrElse(StringType)
-      case VALUE_STRING => StringType
+        if (options.prefersDecimal && decimalTry.isDefined) {
+          decimalTry.get
+        } else if ((allCatch opt timestampFormatter.parse(field)).isDefined) {
+          TimestampType
+        } else {
+          StringType
+        }
 
       case START_OBJECT =>
         val builder = Array.newBuilder[StructField]
diff --git 
a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
new file mode 100644
index 0000000..9307f9b
--- /dev/null
+++ 
b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/json/JsonInferSchemaSuite.scala
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.catalyst.json
+
+import com.fasterxml.jackson.core.JsonFactory
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.catalyst.plans.SQLHelper
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+class JsonInferSchemaSuite extends SparkFunSuite with SQLHelper {
+
+  def checkType(options: Map[String, String], json: String, dt: DataType): 
Unit = {
+    val jsonOptions = new JSONOptions(options, "UTC", "")
+    val inferSchema = new JsonInferSchema(jsonOptions)
+    val factory = new JsonFactory()
+    jsonOptions.setJacksonOptions(factory)
+    val parser = CreateJacksonParser.string(factory, json)
+    parser.nextToken()
+    val expectedType = StructType(Seq(StructField("a", dt, true)))
+
+    assert(inferSchema.inferField(parser) === expectedType)
+  }
+
+  def checkTimestampType(pattern: String, json: String): Unit = {
+    checkType(Map("timestampFormat" -> pattern), json, TimestampType)
+  }
+
+  test("inferring timestamp type") {
+    Seq(true, false).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        checkTimestampType("yyyy", """{"a": "2018"}""")
+        checkTimestampType("yyyy=MM", """{"a": "2018=12"}""")
+        checkTimestampType("yyyy MM dd", """{"a": "2018 12 02"}""")
+        checkTimestampType(
+          "yyyy-MM-dd'T'HH:mm:ss.SSS",
+          """{"a": "2018-12-02T21:04:00.123"}""")
+        checkTimestampType(
+          "yyyy-MM-dd'T'HH:mm:ss.SSSSSSXXX",
+          """{"a": "2018-12-02T21:04:00.123567+01:00"}""")
+      }
+    }
+  }
+
+  test("prefer decimals over timestamps") {
+    Seq(true, false).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        checkType(
+          options = Map(
+            "prefersDecimal" -> "true",
+            "timestampFormat" -> "yyyyMMdd.HHmmssSSS"
+          ),
+          json = """{"a": "20181202.210400123"}""",
+          dt = DecimalType(17, 9)
+        )
+      }
+    }
+  }
+
+  test("skip decimal type inferring") {
+    Seq(true, false).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        checkType(
+          options = Map(
+            "prefersDecimal" -> "false",
+            "timestampFormat" -> "yyyyMMdd.HHmmssSSS"
+          ),
+          json = """{"a": "20181202.210400123"}""",
+          dt = TimestampType
+        )
+      }
+    }
+  }
+
+  test("fallback to string type") {
+    Seq(true, false).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        checkType(
+          options = Map("timestampFormat" -> "yyyy,MM,dd.HHmmssSSS"),
+          json = """{"a": "20181202.210400123"}""",
+          dt = StringType
+        )
+      }
+    }
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
index 786335b..8f575a3 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/json/JsonSuite.scala
@@ -38,6 +38,7 @@ import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types._
+import org.apache.spark.sql.types.StructType.fromDDL
 import org.apache.spark.util.Utils
 
 class TestFileFilter extends PathFilter {
@@ -2589,4 +2590,55 @@ class JsonSuite extends QueryTest with SharedSQLContext 
with TestJsonData {
       Row(null, Array(0, 1, 2), "abc", """{"a":"-","b":[0, 1, 
2],"c":"abc"}""") ::
       Row(0.1, null, "def", """{"a":0.1,"b":{},"c":"def"}""") :: Nil)
   }
+
+  test("inferring timestamp type") {
+    Seq(true, false).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        def schemaOf(jsons: String*): StructType = 
spark.read.json(jsons.toDS).schema
+
+        assert(schemaOf(
+          """{"a":"2018-12-17T10:11:12.123-01:00"}""",
+          """{"a":"2018-12-16T22:23:24.123-02:00"}""") === fromDDL("a 
timestamp"))
+
+        assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", 
"""{"a":1}""")
+          === fromDDL("a string"))
+        assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", 
"""{"a":"123"}""")
+          === fromDDL("a string"))
+
+        assert(schemaOf("""{"a":"2018-12-17T10:11:12.123-01:00"}""", 
"""{"a":null}""")
+          === fromDDL("a timestamp"))
+        assert(schemaOf("""{"a":null}""", 
"""{"a":"2018-12-17T10:11:12.123-01:00"}""")
+          === fromDDL("a timestamp"))
+      }
+    }
+  }
+
+  test("roundtrip for timestamp type inferring") {
+    Seq(true, false).foreach { legacyParser =>
+      withSQLConf(SQLConf.LEGACY_TIME_PARSER_ENABLED.key -> 
legacyParser.toString) {
+        val customSchema = new StructType().add("date", TimestampType)
+        withTempDir { dir =>
+          val timestampsWithFormatPath = 
s"${dir.getCanonicalPath}/timestampsWithFormat.json"
+          val timestampsWithFormat = spark.read
+            .option("timestampFormat", "dd/MM/yyyy HH:mm")
+            .json(datesRecords)
+          assert(timestampsWithFormat.schema === customSchema)
+
+          timestampsWithFormat.write
+            .format("json")
+            .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
+            .option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
+            .save(timestampsWithFormatPath)
+
+          val readBack = spark.read
+            .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
+            .option(DateTimeUtils.TIMEZONE_OPTION, "UTC")
+            .json(timestampsWithFormatPath)
+
+          assert(readBack.schema === customSchema)
+          checkAnswer(readBack, timestampsWithFormat)
+        }
+      }
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to