This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new d3813d8  [SPARK-27064][SS] create StreamingWrite at the beginning of 
streaming execution
d3813d8 is described below

commit d3813d8b210d127ea278015ef27ead9348365787
Author: Wenchen Fan <wenc...@databricks.com>
AuthorDate: Wed Mar 13 19:47:54 2019 +0800

    [SPARK-27064][SS] create StreamingWrite at the beginning of streaming 
execution
    
    ## What changes were proposed in this pull request?
    
    According to the 
[design](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing),
 the life cycle of `StreamingWrite` should be the same as the read side 
`MicroBatch/ContinuousStream`, i.e. each run of the stream query, instead of 
each epoch.
    
    This PR fixes it.
    
    ## How was this patch tested?
    
    existing tests
    
    Closes #23981 from cloud-fan/dsv2.
    
    Authored-by: Wenchen Fan <wenc...@databricks.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../sql/kafka010/KafkaContinuousSinkSuite.scala    | 101 +++++----------------
 .../execution/streaming/MicroBatchExecution.scala  |  18 ++--
 .../sql/execution/streaming/StreamExecution.scala  |   2 +-
 .../streaming/continuous/ContinuousExecution.scala |  20 ++--
 .../sources/WriteToMicroBatchDataSource.scala      |  39 ++++++++
 .../sources/StreamingDataSourceV2Suite.scala       |  18 +++-
 6 files changed, 104 insertions(+), 94 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
index b21037b..3c3aeeb 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
@@ -22,9 +22,8 @@ import java.util.Locale
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.scalatest.time.SpanSugar._
-import scala.collection.JavaConverters._
 
-import org.apache.spark.sql.{DataFrame, Row}
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row}
 import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SpecificInternalRow, UnsafeProjection}
 import org.apache.spark.sql.streaming._
 import org.apache.spark.sql.types.{BinaryType, DataType}
@@ -227,39 +226,23 @@ class KafkaContinuousSinkSuite extends 
KafkaContinuousTest {
     val topic = newTopic()
     testUtils.createTopic(topic)
 
-    /* No topic field or topic option */
-    var writer: StreamingQuery = null
-    var ex: Exception = null
-    try {
-      writer = createKafkaWriter(input.toDF())(
+    val ex = intercept[AnalysisException] {
+      /* No topic field or topic option */
+      createKafkaWriter(input.toDF())(
         withSelectExpr = "value as key", "value"
       )
-      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
-      eventually(timeout(streamingTimeout)) {
-        assert(writer.exception.isDefined)
-        ex = writer.exception.get
-      }
-    } finally {
-      writer.stop()
     }
     assert(ex.getMessage
       .toLowerCase(Locale.ROOT)
       .contains("topic option required when no 'topic' attribute is present"))
 
-    try {
+    val ex2 = intercept[AnalysisException] {
       /* No value field */
-      writer = createKafkaWriter(input.toDF())(
+      createKafkaWriter(input.toDF())(
         withSelectExpr = s"'$topic' as topic", "value as key"
       )
-      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
-      eventually(timeout(streamingTimeout)) {
-        assert(writer.exception.isDefined)
-        ex = writer.exception.get
-      }
-    } finally {
-      writer.stop()
     }
-    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+    assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
       "required attribute 'value' not found"))
   }
 
@@ -278,53 +261,30 @@ class KafkaContinuousSinkSuite extends 
KafkaContinuousTest {
     val topic = newTopic()
     testUtils.createTopic(topic)
 
-    var writer: StreamingQuery = null
-    var ex: Exception = null
-    try {
+    val ex = intercept[AnalysisException] {
       /* topic field wrong type */
-      writer = createKafkaWriter(input.toDF())(
+      createKafkaWriter(input.toDF())(
         withSelectExpr = s"CAST('1' as INT) as topic", "value"
       )
-      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
-      eventually(timeout(streamingTimeout)) {
-        assert(writer.exception.isDefined)
-        ex = writer.exception.get
-      }
-    } finally {
-      writer.stop()
     }
     assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be 
a string"))
 
-    try {
+    val ex2 = intercept[AnalysisException] {
       /* value field wrong type */
-      writer = createKafkaWriter(input.toDF())(
+      createKafkaWriter(input.toDF())(
         withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
       )
-      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
-      eventually(timeout(streamingTimeout)) {
-        assert(writer.exception.isDefined)
-        ex = writer.exception.get
-      }
-    } finally {
-      writer.stop()
     }
-    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+    assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
       "value attribute type must be a string or binary"))
 
-    try {
+    val ex3 = intercept[AnalysisException] {
       /* key field wrong type */
-      writer = createKafkaWriter(input.toDF())(
+      createKafkaWriter(input.toDF())(
         withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", 
"value"
       )
-      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
-      eventually(timeout(streamingTimeout)) {
-        assert(writer.exception.isDefined)
-        ex = writer.exception.get
-      }
-    } finally {
-      writer.stop()
     }
-    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+    assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains(
       "key attribute type must be a string or binary"))
   }
 
@@ -369,35 +329,22 @@ class KafkaContinuousSinkSuite extends 
KafkaContinuousTest {
       .option("kafka.bootstrap.servers", testUtils.brokerAddress)
       .option("subscribe", inputTopic)
       .load()
-    var writer: StreamingQuery = null
-    var ex: Exception = null
-    try {
-      writer = createKafkaWriter(
+
+    val ex = intercept[IllegalArgumentException] {
+      createKafkaWriter(
         input.toDF(),
         withOptions = Map("kafka.key.serializer" -> "foo"))()
-      eventually(timeout(streamingTimeout)) {
-        assert(writer.exception.isDefined)
-        ex = writer.exception.get
-      }
-      assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
-        "kafka option 'key.serializer' is not supported"))
-    } finally {
-      writer.stop()
     }
+    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+      "kafka option 'key.serializer' is not supported"))
 
-    try {
-      writer = createKafkaWriter(
+    val ex2 = intercept[IllegalArgumentException] {
+      createKafkaWriter(
         input.toDF(),
         withOptions = Map("kafka.value.serializer" -> "foo"))()
-      eventually(timeout(streamingTimeout)) {
-        assert(writer.exception.isDefined)
-        ex = writer.exception.get
-      }
-      assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
-        "kafka option 'value.serializer' is not supported"))
-    } finally {
-      writer.stop()
     }
+    assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains(
+      "kafka option 'value.serializer' is not supported"))
   }
 
   test("generic - write big data with small producer buffer") {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index de7cbe2..bedcb9f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -26,8 +26,8 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, 
Attribute, CurrentBatch
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, 
LogicalPlan, Project}
 import org.apache.spark.sql.catalyst.util.truncatedString
 import org.apache.spark.sql.execution.SQLExecution
-import 
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, 
StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec}
-import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, 
RateControlMicroBatchStream}
+import 
org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, 
StreamWriterCommitProgress, WriteToDataSourceV2Exec}
+import 
org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, 
WriteToMicroBatchDataSource}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, 
Offset => OffsetV2}
@@ -122,7 +122,14 @@ class MicroBatchExecution(
       case r: StreamingDataSourceV2Relation => r.stream
     }
     uniqueSources = sources.distinct
-    _logicalPlan
+
+    sink match {
+      case s: SupportsStreamingWrite =>
+        val streamingWrite = createStreamingWrite(s, extraOptions, 
_logicalPlan)
+        WriteToMicroBatchDataSource(streamingWrite, _logicalPlan)
+
+      case _ => _logicalPlan
+    }
   }
 
   /**
@@ -513,9 +520,8 @@ class MicroBatchExecution(
 
     val triggerLogicalPlan = sink match {
       case _: Sink => newAttributePlan
-      case s: SupportsStreamingWrite =>
-        val streamingWrite = createStreamingWrite(s, extraOptions, 
newAttributePlan)
-        WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, 
streamingWrite), newAttributePlan)
+      case _: SupportsStreamingWrite =>
+        
newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].createPlan(currentBatchId)
       case _ => throw new IllegalArgumentException(s"unknown sink type for 
$sink")
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
index bba640e..180a23c 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
@@ -585,7 +585,7 @@ abstract class StreamExecution(
       options: Map[String, String],
       inputPlan: LogicalPlan): StreamingWrite = {
     val writeBuilder = table.newWriteBuilder(new 
DataSourceOptions(options.asJava))
-      .withQueryId(runId.toString)
+      .withQueryId(id.toString)
       .withInputDataSchema(inputPlan.schema)
     outputMode match {
       case Append =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index aef556d..f55a45d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -61,7 +61,7 @@ class ContinuousExecution(
   // Throwable that caused the execution to fail
   private val failure: AtomicReference[Throwable] = new 
AtomicReference[Throwable](null)
 
-  override val logicalPlan: LogicalPlan = {
+  override val logicalPlan: WriteToContinuousDataSource = {
     val v2ToRelationMap = MutableMap[StreamingRelationV2, 
StreamingDataSourceV2Relation]()
     var nextSourceId = 0
     val _logicalPlan = analyzedPlan.transform {
@@ -88,7 +88,8 @@ class ContinuousExecution(
     }
     uniqueSources = sources.distinct
 
-    _logicalPlan
+    WriteToContinuousDataSource(
+      createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan)
   }
 
   private val triggerExecutor = trigger match {
@@ -178,13 +179,10 @@ class ContinuousExecution(
           "CurrentTimestamp and CurrentDate not yet supported for continuous 
processing")
     }
 
-    val streamingWrite = createStreamingWrite(sink, extraOptions, 
withNewSources)
-    val planWithSink = WriteToContinuousDataSource(streamingWrite, 
withNewSources)
-
     reportTimeTaken("queryPlanning") {
       lastExecution = new IncrementalExecution(
         sparkSessionForQuery,
-        planWithSink,
+        withNewSources,
         outputMode,
         checkpointFile("state"),
         id,
@@ -194,7 +192,7 @@ class ContinuousExecution(
       lastExecution.executedPlan // Force the lazy generation of execution plan
     }
 
-    val stream = planWithSink.collect {
+    val stream = withNewSources.collect {
       case relation: StreamingDataSourceV2Relation =>
         relation.stream.asInstanceOf[ContinuousStream]
     }.head
@@ -215,7 +213,13 @@ class ContinuousExecution(
 
     // Use the parent Spark session for the endpoint since it's where this 
query ID is registered.
     val epochEndpoint = EpochCoordinatorRef.create(
-      streamingWrite, stream, this, epochCoordinatorId, currentBatchId, 
sparkSession, SparkEnv.get)
+      logicalPlan.write,
+      stream,
+      this,
+      epochCoordinatorId,
+      currentBatchId,
+      sparkSession,
+      SparkEnv.get)
     val epochUpdateThread = new Thread(new Runnable {
       override def run: Unit = {
         try {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala
new file mode 100644
index 0000000..a3f58fa
--- /dev/null
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.streaming.sources
+
+import org.apache.spark.sql.catalyst.expressions.Attribute
+import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
+import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2
+import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite
+
+/**
+ * The logical plan for writing data to a micro-batch stream.
+ *
+ * Note that this logical plan does not have a corresponding physical plan, as 
it will be converted
+ * to [[WriteToDataSourceV2]] with [[MicroBatchWrite]] before execution.
+ */
+case class WriteToMicroBatchDataSource(write: StreamingWrite, query: 
LogicalPlan)
+  extends LogicalPlan {
+  override def children: Seq[LogicalPlan] = Seq(query)
+  override def output: Seq[Attribute] = Nil
+
+  def createPlan(batchId: Long): WriteToDataSourceV2 = {
+    WriteToDataSourceV2(new MicroBatchWrite(batchId, write), query)
+  }
+}
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 97b694e..3c2c700 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -26,7 +26,8 @@ import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSinkProvider}
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader._
 import org.apache.spark.sql.sources.v2.reader.streaming._
-import org.apache.spark.sql.sources.v2.writer.WriteBuilder
+import org.apache.spark.sql.sources.v2.writer.{WriteBuilder, 
WriterCommitMessage}
+import 
org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, 
StreamingWrite}
 import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, 
Trigger}
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.util.Utils
@@ -59,6 +60,19 @@ class FakeScanBuilder extends ScanBuilder with Scan {
   override def toContinuousStream(checkpointLocation: String): 
ContinuousStream = new FakeDataStream
 }
 
+class FakeWriteBuilder extends WriteBuilder with StreamingWrite {
+  override def buildForStreaming(): StreamingWrite = this
+  override def createStreamingWriterFactory(): StreamingDataWriterFactory = {
+    throw new IllegalStateException("fake sink - cannot actually write")
+  }
+  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {
+    throw new IllegalStateException("fake sink - cannot actually write")
+  }
+  override def abort(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {
+    throw new IllegalStateException("fake sink - cannot actually write")
+  }
+}
+
 trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead {
   override def name(): String = "fake"
   override def schema(): StructType = StructType(Seq())
@@ -75,7 +89,7 @@ trait FakeStreamingWriteTable extends Table with 
SupportsStreamingWrite {
   override def name(): String = "fake"
   override def schema(): StructType = StructType(Seq())
   override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = {
-    throw new IllegalStateException("fake sink - cannot actually write")
+    new FakeWriteBuilder
   }
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to