Repository: spark
Updated Branches:
  refs/heads/master 22b111ef9 -> 4decedfdb


[SPARK-22002][SQL] Read JDBC table use custom schema support specify partial 
fields.

## What changes were proposed in this pull request?

https://github.com/apache/spark/pull/18266 add a new feature to support read 
JDBC table use custom schema, but we must specify all the fields. For 
simplicity, this PR support  specify partial fields.

## How was this patch tested?
unit tests

Author: Yuming Wang <wgy...@gmail.com>

Closes #19231 from wangyum/SPARK-22002.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/4decedfd
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/4decedfd
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/4decedfd

Branch: refs/heads/master
Commit: 4decedfdbd31525e66d33eecc9d38a747c98547b
Parents: 22b111e
Author: Yuming Wang <wgy...@gmail.com>
Authored: Thu Sep 14 23:35:55 2017 -0700
Committer: gatorsmile <gatorsm...@gmail.com>
Committed: Thu Sep 14 23:35:55 2017 -0700

----------------------------------------------------------------------
 docs/sql-programming-guide.md                   |  2 +-
 .../execution/datasources/jdbc/JdbcUtils.scala  | 36 +++++++--------
 .../datasources/jdbc/JdbcUtilsSuite.scala       | 47 ++++++--------------
 .../org/apache/spark/sql/jdbc/JDBCSuite.scala   | 17 ++++---
 4 files changed, 40 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/4decedfd/docs/sql-programming-guide.md
----------------------------------------------------------------------
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index 95d7040..5db60cc 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1333,7 +1333,7 @@ the following case-insensitive options:
   <tr>
     <td><code>customSchema</code></td>
     <td>
-     The custom schema to use for reading data from JDBC connectors. For 
example, "id DECIMAL(38, 0), name STRING"). The column names should be 
identical to the corresponding column names of JDBC table. Users can specify 
the corresponding data types of Spark SQL instead of using the defaults. This 
option applies only to reading.
+     The custom schema to use for reading data from JDBC connectors. For 
example, <code>"id DECIMAL(38, 0), name STRING"</code>. You can also specify 
partial fields, and the others use the default type mapping. For example, 
<code>"id DECIMAL(38, 0)"</code>. The column names should be identical to the 
corresponding column names of JDBC table. Users can specify the corresponding 
data types of Spark SQL instead of using the defaults. This option applies only 
to reading.
     </td>
   </tr>
 </table>

http://git-wip-us.apache.org/repos/asf/spark/blob/4decedfd/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
index 75327f0..7113366 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala
@@ -301,12 +301,11 @@ object JdbcUtils extends Logging {
       } else {
         rsmd.isNullable(i + 1) != ResultSetMetaData.columnNoNulls
       }
-      val metadata = new MetadataBuilder()
-        .putLong("scale", fieldScale)
+      val metadata = new MetadataBuilder().putLong("scale", fieldScale)
       val columnType =
         dialect.getCatalystType(dataType, typeName, fieldSize, 
metadata).getOrElse(
           getCatalystType(dataType, fieldSize, fieldScale, isSigned))
-      fields(i) = StructField(columnName, columnType, nullable, 
metadata.build())
+      fields(i) = StructField(columnName, columnType, nullable)
       i = i + 1
     }
     new StructType(fields)
@@ -768,31 +767,30 @@ object JdbcUtils extends Logging {
   }
 
   /**
-   * Parses the user specified customSchema option value to DataFrame schema,
-   * and returns it if it's all columns are equals to default schema's.
+   * Parses the user specified customSchema option value to DataFrame schema, 
and
+   * returns a schema that is replaced by the custom schema's dataType if 
column name is matched.
    */
   def getCustomSchema(
       tableSchema: StructType,
       customSchema: String,
       nameEquality: Resolver): StructType = {
-    val userSchema = CatalystSqlParser.parseTableSchema(customSchema)
+    if (null != customSchema && customSchema.nonEmpty) {
+      val userSchema = CatalystSqlParser.parseTableSchema(customSchema)
 
-    SchemaUtils.checkColumnNameDuplication(
-      userSchema.map(_.name), "in the customSchema option value", nameEquality)
-
-    val colNames = tableSchema.fieldNames.mkString(",")
-    val errorMsg = s"Please provide all the columns, all columns are: 
$colNames"
-    if (userSchema.size != tableSchema.size) {
-      throw new AnalysisException(errorMsg)
-    }
+      SchemaUtils.checkColumnNameDuplication(
+        userSchema.map(_.name), "in the customSchema option value", 
nameEquality)
 
-    // This is resolved by names, only check the column names.
-    userSchema.fieldNames.foreach { col =>
-      tableSchema.find(f => nameEquality(f.name, col)).getOrElse {
-        throw new AnalysisException(errorMsg)
+      // This is resolved by names, use the custom filed dataType to replace 
the default dataType.
+      val newSchema = tableSchema.map { col =>
+        userSchema.find(f => nameEquality(f.name, col.name)) match {
+          case Some(c) => col.copy(dataType = c.dataType)
+          case None => col
+        }
       }
+      StructType(newSchema)
+    } else {
+      tableSchema
     }
-    userSchema
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/spark/blob/4decedfd/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala
index 1255f26..7d277c1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtilsSuite.scala
@@ -30,57 +30,38 @@ class JdbcUtilsSuite extends SparkFunSuite {
   val caseInsensitive = 
org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution
 
   test("Parse user specified column types") {
-    assert(
-      JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", 
caseInsensitive) ===
-      StructType(Seq(StructField("C1", DateType, true), StructField("C2", 
StringType, true))))
-    assert(JdbcUtils.getCustomSchema(tableSchema, "C1 DATE, C2 STRING", 
caseSensitive) ===
-      StructType(Seq(StructField("C1", DateType, true), StructField("C2", 
StringType, true))))
+    assert(JdbcUtils.getCustomSchema(tableSchema, null, caseInsensitive) === 
tableSchema)
+    assert(JdbcUtils.getCustomSchema(tableSchema, "", caseInsensitive) === 
tableSchema)
+
+    assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseInsensitive) 
===
+      StructType(Seq(StructField("C1", DateType, false), StructField("C2", 
IntegerType, false))))
+    assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE", caseSensitive) ===
+      StructType(Seq(StructField("C1", StringType, false), StructField("C2", 
IntegerType, false))))
+
     assert(
       JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", 
caseInsensitive) ===
-        StructType(Seq(StructField("c1", DateType, true), StructField("C2", 
StringType, true))))
-    assert(JdbcUtils.getCustomSchema(
-      tableSchema, "c1 DECIMAL(38, 0), C2 STRING", caseInsensitive) ===
-      StructType(Seq(StructField("c1", DecimalType(38, 0), true),
-        StructField("C2", StringType, true))))
+      StructType(Seq(StructField("C1", DateType, false), StructField("C2", 
StringType, false))))
+    assert(JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", 
caseSensitive) ===
+      StructType(Seq(StructField("C1", StringType, false), StructField("C2", 
StringType, false))))
 
     // Throw AnalysisException
     val duplicate = intercept[AnalysisException]{
       JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, c1 STRING", 
caseInsensitive) ===
-        StructType(Seq(StructField("c1", DateType, true), StructField("c1", 
StringType, true)))
+        StructType(Seq(StructField("c1", DateType, false), StructField("c1", 
StringType, false)))
     }
     assert(duplicate.getMessage.contains(
       "Found duplicate column(s) in the customSchema option value"))
 
-    val allColumns = intercept[AnalysisException]{
-      JdbcUtils.getCustomSchema(tableSchema, "C1 STRING", caseSensitive) ===
-        StructType(Seq(StructField("C1", DateType, true)))
-    }
-    assert(allColumns.getMessage.contains("Please provide all the columns,"))
-
-    val caseSensitiveColumnNotFound = intercept[AnalysisException]{
-      JdbcUtils.getCustomSchema(tableSchema, "c1 DATE, C2 STRING", 
caseSensitive) ===
-        StructType(Seq(StructField("c1", DateType, true), StructField("C2", 
StringType, true)))
-    }
-    assert(caseSensitiveColumnNotFound.getMessage.contains(
-      "Please provide all the columns, all columns are: C1,C2;"))
-
-    val caseInsensitiveColumnNotFound = intercept[AnalysisException]{
-      JdbcUtils.getCustomSchema(tableSchema, "c3 DATE, C2 STRING", 
caseInsensitive) ===
-        StructType(Seq(StructField("c3", DateType, true), StructField("C2", 
StringType, true)))
-    }
-    assert(caseInsensitiveColumnNotFound.getMessage.contains(
-      "Please provide all the columns, all columns are: C1,C2;"))
-
     // Throw ParseException
     val dataTypeNotSupported = intercept[ParseException]{
       JdbcUtils.getCustomSchema(tableSchema, "c3 DATEE, C2 STRING", 
caseInsensitive) ===
-        StructType(Seq(StructField("c3", DateType, true), StructField("C2", 
StringType, true)))
+        StructType(Seq(StructField("c3", DateType, false), StructField("C2", 
StringType, false)))
     }
     assert(dataTypeNotSupported.getMessage.contains("DataType datee is not 
supported"))
 
     val mismatchedInput = intercept[ParseException]{
       JdbcUtils.getCustomSchema(tableSchema, "c3 DATE. C2 STRING", 
caseInsensitive) ===
-        StructType(Seq(StructField("c3", DateType, true), StructField("C2", 
StringType, true)))
+        StructType(Seq(StructField("c3", DateType, false), StructField("C2", 
StringType, false)))
     }
     assert(mismatchedInput.getMessage.contains("mismatched input '.' 
expecting"))
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/4decedfd/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
index 4017926..689f410 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala
@@ -26,6 +26,7 @@ import org.scalatest.{BeforeAndAfter, PrivateMethodTester}
 
 import org.apache.spark.{SparkException, SparkFunSuite}
 import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
+import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.DataSourceScanExec
 import org.apache.spark.sql.execution.command.ExplainCommand
@@ -970,30 +971,28 @@ class JDBCSuite extends SparkFunSuite
 
   test("jdbc API support custom schema") {
     val parts = Array[String]("THEID < 2", "THEID >= 2")
+    val customSchema = "NAME STRING, THEID INT"
     val props = new Properties()
-    props.put("customSchema", "NAME STRING, THEID BIGINT")
-    val schema = StructType(Seq(
-      StructField("NAME", StringType, true), StructField("THEID", LongType, 
true)))
+    props.put("customSchema", customSchema)
     val df = spark.read.jdbc(urlWithUserAndPass, "TEST.PEOPLE", parts, props)
     assert(df.schema.size === 2)
-    assert(df.schema === schema)
+    assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema))
     assert(df.count() === 3)
   }
 
   test("jdbc API custom schema DDL-like strings.") {
     withTempView("people_view") {
+      val customSchema = "NAME STRING, THEID INT"
       sql(
         s"""
            |CREATE TEMPORARY VIEW people_view
            |USING org.apache.spark.sql.jdbc
            |OPTIONS (uRl '$url', DbTaBlE 'TEST.PEOPLE', User 'testUser', 
PassWord 'testPass',
-           |customSchema 'NAME STRING, THEID INT')
+           |customSchema '$customSchema')
         """.stripMargin.replaceAll("\n", " "))
-      val schema = StructType(
-        Seq(StructField("NAME", StringType, true), StructField("THEID", 
IntegerType, true)))
       val df = sql("select * from people_view")
-      assert(df.schema.size === 2)
-      assert(df.schema === schema)
+      assert(df.schema.length === 2)
+      assert(df.schema === CatalystSqlParser.parseTableSchema(customSchema))
       assert(df.count() === 3)
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to