Repository: spark
Updated Branches:
  refs/heads/master cb324f611 -> 3a80f92f8


[SPARK-17492][SQL] Fix Reading Cataloged Data Sources without Extending 
SchemaRelationProvider

### What changes were proposed in this pull request?
For data sources without extending `SchemaRelationProvider`, we expect users to 
not specify schemas when they creating tables. If the schema is input from 
users, an exception is issued.

Since Spark 2.1, for any data source, to avoid infer the schema every time, we 
store the schema in the metastore catalog. Thus, when reading a cataloged data 
source table, the schema could be read from metastore catalog. In this case, we 
also got an exception. For example,

```Scala
sql(
  s"""
     |CREATE TABLE relationProvierWithSchema
     |USING org.apache.spark.sql.sources.SimpleScanSource
     |OPTIONS (
     |  From '1',
     |  To '10'
     |)
   """.stripMargin)
spark.table(tableName).show()
```
```
org.apache.spark.sql.sources.SimpleScanSource does not allow user-specified 
schemas.;
```

This PR is to fix the above issue. When building a data source, we introduce a 
flag `isSchemaFromUsers` to indicate whether the schema is really input from 
users. If true, we issue an exception. Otherwise, we will call the 
`createRelation` of `RelationProvider` to generate the `BaseRelation`, in which 
it contains the actual schema.

### How was this patch tested?
Added a few cases.

Author: gatorsmile <gatorsm...@gmail.com>

Closes #15046 from gatorsmile/tempViewCases.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3a80f92f
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3a80f92f
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3a80f92f

Branch: refs/heads/master
Commit: 3a80f92f8f4b91d0a85724bca7d81c6f5bbb78fd
Parents: cb324f6
Author: gatorsmile <gatorsm...@gmail.com>
Authored: Thu Sep 22 13:19:06 2016 +0800
Committer: Wenchen Fan <wenc...@databricks.com>
Committed: Thu Sep 22 13:19:06 2016 +0800

----------------------------------------------------------------------
 .../sql/execution/datasources/DataSource.scala  |  9 ++-
 .../apache/spark/sql/sources/InsertSuite.scala  | 20 ++++++
 .../spark/sql/sources/TableScanSuite.scala      | 64 +++++++++++++-------
 .../sql/test/DataFrameReaderWriterSuite.scala   | 33 ++++++++++
 4 files changed, 102 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3a80f92f/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index 413976a..3206701 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -333,8 +333,13 @@ case class DataSource(
         dataSource.createRelation(sparkSession.sqlContext, 
caseInsensitiveOptions)
       case (_: SchemaRelationProvider, None) =>
         throw new AnalysisException(s"A schema needs to be specified when 
using $className.")
-      case (_: RelationProvider, Some(_)) =>
-        throw new AnalysisException(s"$className does not allow user-specified 
schemas.")
+      case (dataSource: RelationProvider, Some(schema)) =>
+        val baseRelation =
+          dataSource.createRelation(sparkSession.sqlContext, 
caseInsensitiveOptions)
+        if (baseRelation.schema != schema) {
+          throw new AnalysisException(s"$className does not allow 
user-specified schemas.")
+        }
+        baseRelation
 
       // We are reading from the results of a streaming query. Load files from 
the metadata log
       // instead of listing them using HDFS APIs.

http://git-wip-us.apache.org/repos/asf/spark/blob/3a80f92f/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
index 6454d71..5eb5464 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/InsertSuite.scala
@@ -65,6 +65,26 @@ class InsertSuite extends DataSourceTest with 
SharedSQLContext {
     )
   }
 
+  test("insert into a temp view that does not point to an insertable data 
source") {
+    import testImplicits._
+    withTempView("t1", "t2") {
+      sql(
+        """
+          |CREATE TEMPORARY VIEW t1
+          |USING org.apache.spark.sql.sources.SimpleScanSource
+          |OPTIONS (
+          |  From '1',
+          |  To '10')
+        """.stripMargin)
+      sparkContext.parallelize(1 to 10).toDF("a").createOrReplaceTempView("t2")
+
+      val message = intercept[AnalysisException] {
+        sql("INSERT INTO TABLE t1 SELECT a FROM t2")
+      }.getMessage
+      assert(message.contains("does not allow insertion"))
+    }
+  }
+
   test("PreInsert casting and renaming") {
     sql(
       s"""

http://git-wip-us.apache.org/repos/asf/spark/blob/3a80f92f/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
index e8fed03..86bcb4d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/sources/TableScanSuite.scala
@@ -348,31 +348,51 @@ class TableScanSuite extends DataSourceTest with 
SharedSQLContext {
   test("exceptions") {
     // Make sure we do throw correct exception when users use a relation 
provider that
     // only implements the RelationProvider or the SchemaRelationProvider.
-    val schemaNotAllowed = intercept[Exception] {
-      sql(
-        """
-          |CREATE TEMPORARY VIEW relationProvierWithSchema (i int)
-          |USING org.apache.spark.sql.sources.SimpleScanSource
-          |OPTIONS (
-          |  From '1',
-          |  To '10'
-          |)
-        """.stripMargin)
+    Seq("TEMPORARY VIEW", "TABLE").foreach { tableType =>
+      val schemaNotAllowed = intercept[Exception] {
+        sql(
+          s"""
+             |CREATE $tableType relationProvierWithSchema (i int)
+             |USING org.apache.spark.sql.sources.SimpleScanSource
+             |OPTIONS (
+             |  From '1',
+             |  To '10'
+             |)
+           """.stripMargin)
+      }
+      assert(schemaNotAllowed.getMessage.contains("does not allow 
user-specified schemas"))
+
+      val schemaNeeded = intercept[Exception] {
+        sql(
+          s"""
+             |CREATE $tableType schemaRelationProvierWithoutSchema
+             |USING org.apache.spark.sql.sources.AllDataTypesScanSource
+             |OPTIONS (
+             |  From '1',
+             |  To '10'
+             |)
+           """.stripMargin)
+      }
+      assert(schemaNeeded.getMessage.contains("A schema needs to be specified 
when using"))
     }
-    assert(schemaNotAllowed.getMessage.contains("does not allow user-specified 
schemas"))
+  }
 
-    val schemaNeeded = intercept[Exception] {
-      sql(
-        """
-          |CREATE TEMPORARY VIEW schemaRelationProvierWithoutSchema
-          |USING org.apache.spark.sql.sources.AllDataTypesScanSource
-          |OPTIONS (
-          |  From '1',
-          |  To '10'
-          |)
-        """.stripMargin)
+  test("read the data source tables that do not extend 
SchemaRelationProvider") {
+    Seq("TEMPORARY VIEW", "TABLE").foreach { tableType =>
+      val tableName = "relationProvierWithSchema"
+      withTable (tableName) {
+        sql(
+          s"""
+             |CREATE $tableType $tableName
+             |USING org.apache.spark.sql.sources.SimpleScanSource
+             |OPTIONS (
+             |  From '1',
+             |  To '10'
+             |)
+           """.stripMargin)
+        checkAnswer(spark.table(tableName), spark.range(1, 11).toDF())
+      }
     }
-    assert(schemaNeeded.getMessage.contains("A schema needs to be specified 
when using"))
   }
 
   test("SPARK-5196 schema field with comment") {

http://git-wip-us.apache.org/repos/asf/spark/blob/3a80f92f/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
----------------------------------------------------------------------
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
index 7368dad..a7fda01 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/test/DataFrameReaderWriterSuite.scala
@@ -293,6 +293,39 @@ class DataFrameReaderWriterSuite extends QueryTest with 
SharedSQLContext with Be
     Option(dir).map(spark.read.format("org.apache.spark.sql.test").load)
   }
 
+  test("read a data source that does not extend SchemaRelationProvider") {
+    val dfReader = spark.read
+      .option("from", "1")
+      .option("TO", "10")
+      .format("org.apache.spark.sql.sources.SimpleScanSource")
+
+    // when users do not specify the schema
+    checkAnswer(dfReader.load(), spark.range(1, 11).toDF())
+
+    // when users specify the schema
+    val inputSchema = new StructType().add("s", IntegerType, nullable = false)
+    val e = intercept[AnalysisException] { dfReader.schema(inputSchema).load() 
}
+    assert(e.getMessage.contains(
+      "org.apache.spark.sql.sources.SimpleScanSource does not allow 
user-specified schemas"))
+  }
+
+  test("read a data source that does not extend RelationProvider") {
+    val dfReader = spark.read
+      .option("from", "1")
+      .option("TO", "10")
+      .option("option_with_underscores", "someval")
+      .option("option.with.dots", "someval")
+      .format("org.apache.spark.sql.sources.AllDataTypesScanSource")
+
+    // when users do not specify the schema
+    val e = intercept[AnalysisException] { dfReader.load() }
+    assert(e.getMessage.contains("A schema needs to be specified when using"))
+
+    // when users specify the schema
+    val inputSchema = new StructType().add("s", StringType, nullable = false)
+    assert(dfReader.schema(inputSchema).load().count() == 10)
+  }
+
   test("text - API and behavior regarding schema") {
     // Writer
     spark.createDataset(data).write.mode(SaveMode.Overwrite).text(dir)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to