Repository: spark Updated Branches: refs/heads/master ad459cfb1 -> 7a00c658d
[SPARK-21147][SS] Throws an analysis exception when a user-specified schema is given in socket/rate sources ## What changes were proposed in this pull request? This PR proposes to throw an exception if a schema is provided by user to socket source as below: **socket source** ```scala import org.apache.spark.sql.types._ val userSpecifiedSchema = StructType( StructField("name", StringType) :: StructField("area", StringType) :: Nil) val df = spark.readStream.format("socket").option("host", "localhost").option("port", 9999).schema(userSpecifiedSchema).load df.printSchema ``` Before ``` root |-- value: string (nullable = true) ``` After ``` org.apache.spark.sql.AnalysisException: The socket source does not support a user-specified schema.; at org.apache.spark.sql.execution.streaming.TextSocketSourceProvider.sourceSchema(socket.scala:199) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:192) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150) ... 50 elided ``` **rate source** ```scala spark.readStream.format("rate").schema(spark.range(1).schema).load().printSchema() ``` Before ``` root |-- timestamp: timestamp (nullable = true) |-- value: long (nullable = true)` ``` After ``` org.apache.spark.sql.AnalysisException: The rate source does not support a user-specified schema.; at org.apache.spark.sql.execution.streaming.RateSourceProvider.sourceSchema(RateSourceProvider.scala:57) at org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:192) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:87) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:87) at org.apache.spark.sql.execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:150) ... 48 elided ``` ## How was this patch tested? Unit test in `TextSocketStreamSuite` and `RateSourceSuite`. Author: hyukjinkwon <gurwls...@gmail.com> Closes #18365 from HyukjinKwon/SPARK-21147. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7a00c658 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7a00c658 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7a00c658 Branch: refs/heads/master Commit: 7a00c658d44139d950b7d3ecd670d79f76e2e747 Parents: ad459cf Author: hyukjinkwon <gurwls...@gmail.com> Authored: Wed Jun 21 10:51:17 2017 -0700 Committer: Shixiong Zhu <shixi...@databricks.com> Committed: Wed Jun 21 10:51:17 2017 -0700 ---------------------------------------------------------------------- .../sql/execution/streaming/RateSourceProvider.scala | 9 +++++++-- .../spark/sql/execution/streaming/socket.scala | 8 ++++++-- .../sql/execution/streaming/RateSourceSuite.scala | 12 ++++++++++++ .../execution/streaming/TextSocketStreamSuite.scala | 15 +++++++++++++++ 4 files changed, 40 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/7a00c658/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala index e61a8eb..e76d4dc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/RateSourceProvider.scala @@ -25,7 +25,7 @@ import org.apache.commons.io.IOUtils import org.apache.spark.internal.Logging import org.apache.spark.network.util.JavaUtils -import org.apache.spark.sql.{DataFrame, SQLContext} +import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext} import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils} import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider} @@ -52,8 +52,13 @@ class RateSourceProvider extends StreamSourceProvider with DataSourceRegister { sqlContext: SQLContext, schema: Option[StructType], providerName: String, - parameters: Map[String, String]): (String, StructType) = + parameters: Map[String, String]): (String, StructType) = { + if (schema.nonEmpty) { + throw new AnalysisException("The rate source does not support a user-specified schema.") + } + (shortName(), RateSourceProvider.SCHEMA) + } override def createSource( sqlContext: SQLContext, http://git-wip-us.apache.org/repos/asf/spark/blob/7a00c658/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala index 58bff27..8e63207 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/socket.scala @@ -195,13 +195,17 @@ class TextSocketSourceProvider extends StreamSourceProvider with DataSourceRegis if (!parameters.contains("port")) { throw new AnalysisException("Set a port to read from with option(\"port\", ...).") } - val schema = + if (schema.nonEmpty) { + throw new AnalysisException("The socket source does not support a user-specified schema.") + } + + val sourceSchema = if (parseIncludeTimestamp(parameters)) { TextSocketSource.SCHEMA_TIMESTAMP } else { TextSocketSource.SCHEMA_REGULAR } - ("textSocket", schema) + ("textSocket", sourceSchema) } override def createSource( http://git-wip-us.apache.org/repos/asf/spark/blob/7a00c658/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala index bdba536..03d0f63 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/RateSourceSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.streaming import java.util.concurrent.TimeUnit +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.{StreamingQueryException, StreamTest} import org.apache.spark.util.ManualClock @@ -179,4 +180,15 @@ class RateSourceSuite extends StreamTest { testIllegalOptionValue("rowsPerSecond", "-1", Seq("-1", "rowsPerSecond", "positive")) testIllegalOptionValue("numPartitions", "-1", Seq("-1", "numPartitions", "positive")) } + + test("user-specified schema given") { + val exception = intercept[AnalysisException] { + spark.readStream + .format("rate") + .schema(spark.range(1).schema) + .load() + } + assert(exception.getMessage.contains( + "rate source does not support a user-specified schema")) + } } http://git-wip-us.apache.org/repos/asf/spark/blob/7a00c658/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala index 5174a04..9ebf4d2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/TextSocketStreamSuite.scala @@ -148,6 +148,21 @@ class TextSocketStreamSuite extends StreamTest with SharedSQLContext with Before } } + test("user-specified schema given") { + val provider = new TextSocketSourceProvider + val userSpecifiedSchema = StructType( + StructField("name", StringType) :: + StructField("area", StringType) :: Nil) + val exception = intercept[AnalysisException] { + provider.sourceSchema( + sqlContext, Some(userSpecifiedSchema), + "", + Map("host" -> "localhost", "port" -> "1234")) + } + assert(exception.getMessage.contains( + "socket source does not support a user-specified schema")) + } + test("no server up") { val provider = new TextSocketSourceProvider val parameters = Map("host" -> "localhost", "port" -> "0") --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org