Repository: spark
Updated Branches:
  refs/heads/master 8c911adac -> 733b81b83


[SPARK-20496][SS] Bug in KafkaWriter Looks at Unanalyzed Plans

## What changes were proposed in this pull request?

We didn't enforce analyzed plans in Spark 2.1 when writing out to Kafka.

## How was this patch tested?

New unit test.

Please review http://spark.apache.org/contributing.html before opening a pull 
request.

Author: Bill Chambers <b...@databricks.com>

Closes #17804 from anabranch/SPARK-20496-2.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/733b81b8
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/733b81b8
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/733b81b8

Branch: refs/heads/master
Commit: 733b81b835f952ab96723c749461d6afc0c71974
Parents: 8c911ad
Author: Bill Chambers <b...@databricks.com>
Authored: Fri Apr 28 10:18:31 2017 -0700
Committer: Burak Yavuz <brk...@gmail.com>
Committed: Fri Apr 28 10:18:31 2017 -0700

----------------------------------------------------------------------
 .../org/apache/spark/sql/kafka010/KafkaWriter.scala |  4 ++--
 .../apache/spark/sql/kafka010/KafkaSinkSuite.scala  | 16 ++++++++++++++++
 2 files changed, 18 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/733b81b8/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
index a637d52..61936e3 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -47,7 +47,7 @@ private[kafka010] object KafkaWriter extends Logging {
       queryExecution: QueryExecution,
       kafkaParameters: ju.Map[String, Object],
       topic: Option[String] = None): Unit = {
-    val schema = queryExecution.logical.output
+    val schema = queryExecution.analyzed.output
     schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
       if (topic == None) {
         throw new AnalysisException(s"topic option required when no " +
@@ -84,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging {
       queryExecution: QueryExecution,
       kafkaParameters: ju.Map[String, Object],
       topic: Option[String] = None): Unit = {
-    val schema = queryExecution.logical.output
+    val schema = queryExecution.analyzed.output
     validateQuery(queryExecution, kafkaParameters, topic)
     SQLExecution.withNewExecutionId(sparkSession, queryExecution) {
       queryExecution.toRdd.foreachPartition { iter =>

http://git-wip-us.apache.org/repos/asf/spark/blob/733b81b8/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
index 4bd052d..2ab336c 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSinkSuite.scala
@@ -28,6 +28,7 @@ import org.apache.spark.SparkException
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SpecificInternalRow, UnsafeProjection}
 import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.functions._
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.{BinaryType, DataType}
@@ -108,6 +109,21 @@ class KafkaSinkSuite extends StreamTest with 
SharedSQLContext {
       s"save mode overwrite not allowed for kafka"))
   }
 
+  test("SPARK-20496: batch - enforce analyzed plans") {
+    val inputEvents =
+      spark.range(1, 1000)
+        .select(to_json(struct("*")) as 'value)
+
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+    // used to throw UnresolvedException
+    inputEvents.write
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("topic", topic)
+      .save()
+  }
+
   test("streaming - write to kafka with topic field") {
     val input = MemoryStream[String]
     val topic = newTopic()


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to