Repository: spark
Updated Branches:
  refs/heads/master 2cb976355 -> bc0498d58


[SPARK-24583][SQL] Wrong schema type in InsertIntoDataSourceCommand

## What changes were proposed in this pull request?

Change insert input schema type: "insertRelationType" -> 
"insertRelationType.asNullable", in order to avoid nullable being overridden.

## How was this patch tested?

Added one test in InsertSuite.

Author: Maryann Xue <maryann...@apache.org>

Closes #21585 from maryannxue/spark-24583.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/bc0498d5
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/bc0498d5
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/bc0498d5

Branch: refs/heads/master
Commit: bc0498d5820ded2b428277e396502e74ef0ce36d
Parents: 2cb9763
Author: Maryann Xue <maryann...@apache.org>
Authored: Tue Jun 19 15:27:20 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Tue Jun 19 15:27:20 2018 -0700

----------------------------------------------------------------------
 .../InsertIntoDataSourceCommand.scala           |  5 +-
 .../apache/spark/sql/sources/InsertSuite.scala  | 51 +++++++++++++++++++-
 2 files changed, 52 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/bc0498d5/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
index a813829..80d7608 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/InsertIntoDataSourceCommand.scala
@@ -38,9 +38,8 @@ case class InsertIntoDataSourceCommand(
   override def run(sparkSession: SparkSession): Seq[Row] = {
     val relation = logicalRelation.relation.asInstanceOf[InsertableRelation]
     val data = Dataset.ofRows(sparkSession, query)
-    // Apply the schema of the existing table to the new data.
-    val df = sparkSession.internalCreateDataFrame(data.queryExecution.toRdd, 
logicalRelation.schema)
-    relation.insert(df, overwrite)
+    // Data has been casted to the target relation's schema by the 
PreprocessTableInsertion rule.
+    relation.insert(data, overwrite)
 
     // Re-cache all cached plans(including this relation itself, if it's 
cached) that refer to this
     // data source relation.

http://git-wip-us.apache.org/repos/asf/spark/blob/bc0498d5/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index fef01c8..438d5d8 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -20,12 +20,36 @@ package org.apache.spark.sql.sources
 import java.io.File
 
 import org.apache.spark.SparkException
-import org.apache.spark.sql.{AnalysisException, Row}
+import org.apache.spark.sql._
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogStorageFormat, 
CatalogTable, CatalogTableType}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.SQLConf.PartitionOverwriteMode
 import org.apache.spark.sql.test.SharedSQLContext
+import org.apache.spark.sql.types._
 import org.apache.spark.util.Utils
 
+class SimpleInsertSource extends SchemaRelationProvider {
+  override def createRelation(
+      sqlContext: SQLContext,
+      parameters: Map[String, String],
+      schema: StructType): BaseRelation = {
+    SimpleInsert(schema)(sqlContext.sparkSession)
+  }
+}
+
+case class SimpleInsert(userSpecifiedSchema: StructType)(@transient val 
sparkSession: SparkSession)
+  extends BaseRelation with InsertableRelation {
+
+  override def sqlContext: SQLContext = sparkSession.sqlContext
+
+  override def schema: StructType = userSpecifiedSchema
+
+  override def insert(input: DataFrame, overwrite: Boolean): Unit = {
+    input.collect
+  }
+}
+
 class InsertSuite extends DataSourceTest with SharedSQLContext {
   import testImplicits._
 
@@ -520,4 +544,29 @@ class InsertSuite extends DataSourceTest with 
SharedSQLContext {
       }
     }
   }
+
+  test("SPARK-24583 Wrong schema type in InsertIntoDataSourceCommand") {
+    withTable("test_table") {
+      val schema = new StructType()
+        .add("i", LongType, false)
+        .add("s", StringType, false)
+      val newTable = CatalogTable(
+        identifier = TableIdentifier("test_table", None),
+        tableType = CatalogTableType.EXTERNAL,
+        storage = CatalogStorageFormat(
+          locationUri = None,
+          inputFormat = None,
+          outputFormat = None,
+          serde = None,
+          compressed = false,
+          properties = Map.empty),
+        schema = schema,
+        provider = Some(classOf[SimpleInsertSource].getName))
+
+      spark.sessionState.catalog.createTable(newTable, false)
+
+      sql("INSERT INTO TABLE test_table SELECT 1, 'a'")
+      sql("INSERT INTO TABLE test_table SELECT 2, null")
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to