This is an automated email from the ASF dual-hosted git repository. wenchen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new d3813d8 [SPARK-27064][SS] create StreamingWrite at the beginning of streaming execution d3813d8 is described below commit d3813d8b210d127ea278015ef27ead9348365787 Author: Wenchen Fan <wenc...@databricks.com> AuthorDate: Wed Mar 13 19:47:54 2019 +0800 [SPARK-27064][SS] create StreamingWrite at the beginning of streaming execution ## What changes were proposed in this pull request? According to the [design](https://docs.google.com/document/d/1vI26UEuDpVuOjWw4WPoH2T6y8WAekwtI7qoowhOFnI4/edit?usp=sharing), the life cycle of `StreamingWrite` should be the same as the read side `MicroBatch/ContinuousStream`, i.e. each run of the stream query, instead of each epoch. This PR fixes it. ## How was this patch tested? existing tests Closes #23981 from cloud-fan/dsv2. Authored-by: Wenchen Fan <wenc...@databricks.com> Signed-off-by: Wenchen Fan <wenc...@databricks.com> --- .../sql/kafka010/KafkaContinuousSinkSuite.scala | 101 +++++---------------- .../execution/streaming/MicroBatchExecution.scala | 18 ++-- .../sql/execution/streaming/StreamExecution.scala | 2 +- .../streaming/continuous/ContinuousExecution.scala | 20 ++-- .../sources/WriteToMicroBatchDataSource.scala | 39 ++++++++ .../sources/StreamingDataSourceV2Suite.scala | 18 +++- 6 files changed, 104 insertions(+), 94 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala index b21037b..3c3aeeb 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala @@ -22,9 +22,8 @@ import java.util.Locale import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.ByteArraySerializer import org.scalatest.time.SpanSugar._ -import scala.collection.JavaConverters._ -import org.apache.spark.sql.{DataFrame, Row} +import org.apache.spark.sql.{AnalysisException, DataFrame, Row} import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} import org.apache.spark.sql.streaming._ import org.apache.spark.sql.types.{BinaryType, DataType} @@ -227,39 +226,23 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { val topic = newTopic() testUtils.createTopic(topic) - /* No topic field or topic option */ - var writer: StreamingQuery = null - var ex: Exception = null - try { - writer = createKafkaWriter(input.toDF())( + val ex = intercept[AnalysisException] { + /* No topic field or topic option */ + createKafkaWriter(input.toDF())( withSelectExpr = "value as key", "value" ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - assert(writer.exception.isDefined) - ex = writer.exception.get - } - } finally { - writer.stop() } assert(ex.getMessage .toLowerCase(Locale.ROOT) .contains("topic option required when no 'topic' attribute is present")) - try { + val ex2 = intercept[AnalysisException] { /* No value field */ - writer = createKafkaWriter(input.toDF())( + createKafkaWriter(input.toDF())( withSelectExpr = s"'$topic' as topic", "value as key" ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - assert(writer.exception.isDefined) - ex = writer.exception.get - } - } finally { - writer.stop() } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( + assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains( "required attribute 'value' not found")) } @@ -278,53 +261,30 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { val topic = newTopic() testUtils.createTopic(topic) - var writer: StreamingQuery = null - var ex: Exception = null - try { + val ex = intercept[AnalysisException] { /* topic field wrong type */ - writer = createKafkaWriter(input.toDF())( + createKafkaWriter(input.toDF())( withSelectExpr = s"CAST('1' as INT) as topic", "value" ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - assert(writer.exception.isDefined) - ex = writer.exception.get - } - } finally { - writer.stop() } assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string")) - try { + val ex2 = intercept[AnalysisException] { /* value field wrong type */ - writer = createKafkaWriter(input.toDF())( + createKafkaWriter(input.toDF())( withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value" ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - assert(writer.exception.isDefined) - ex = writer.exception.get - } - } finally { - writer.stop() } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( + assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains( "value attribute type must be a string or binary")) - try { + val ex3 = intercept[AnalysisException] { /* key field wrong type */ - writer = createKafkaWriter(input.toDF())( + createKafkaWriter(input.toDF())( withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value" ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - assert(writer.exception.isDefined) - ex = writer.exception.get - } - } finally { - writer.stop() } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( + assert(ex3.getMessage.toLowerCase(Locale.ROOT).contains( "key attribute type must be a string or binary")) } @@ -369,35 +329,22 @@ class KafkaContinuousSinkSuite extends KafkaContinuousTest { .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("subscribe", inputTopic) .load() - var writer: StreamingQuery = null - var ex: Exception = null - try { - writer = createKafkaWriter( + + val ex = intercept[IllegalArgumentException] { + createKafkaWriter( input.toDF(), withOptions = Map("kafka.key.serializer" -> "foo"))() - eventually(timeout(streamingTimeout)) { - assert(writer.exception.isDefined) - ex = writer.exception.get - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "kafka option 'key.serializer' is not supported")) - } finally { - writer.stop() } + assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( + "kafka option 'key.serializer' is not supported")) - try { - writer = createKafkaWriter( + val ex2 = intercept[IllegalArgumentException] { + createKafkaWriter( input.toDF(), withOptions = Map("kafka.value.serializer" -> "foo"))() - eventually(timeout(streamingTimeout)) { - assert(writer.exception.isDefined) - ex = writer.exception.get - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "kafka option 'value.serializer' is not supported")) - } finally { - writer.stop() } + assert(ex2.getMessage.toLowerCase(Locale.ROOT).contains( + "kafka option 'value.serializer' is not supported")) } test("generic - write big data with small producer buffer") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index de7cbe2..bedcb9f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -26,8 +26,8 @@ import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, CurrentBatch import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LocalRelation, LogicalPlan, Project} import org.apache.spark.sql.catalyst.util.truncatedString import org.apache.spark.sql.execution.SQLExecution -import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2, WriteToDataSourceV2Exec} -import org.apache.spark.sql.execution.streaming.sources.{MicroBatchWrite, RateControlMicroBatchStream} +import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, StreamWriterCommitProgress, WriteToDataSourceV2Exec} +import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchStream, WriteToMicroBatchDataSource} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2} @@ -122,7 +122,14 @@ class MicroBatchExecution( case r: StreamingDataSourceV2Relation => r.stream } uniqueSources = sources.distinct - _logicalPlan + + sink match { + case s: SupportsStreamingWrite => + val streamingWrite = createStreamingWrite(s, extraOptions, _logicalPlan) + WriteToMicroBatchDataSource(streamingWrite, _logicalPlan) + + case _ => _logicalPlan + } } /** @@ -513,9 +520,8 @@ class MicroBatchExecution( val triggerLogicalPlan = sink match { case _: Sink => newAttributePlan - case s: SupportsStreamingWrite => - val streamingWrite = createStreamingWrite(s, extraOptions, newAttributePlan) - WriteToDataSourceV2(new MicroBatchWrite(currentBatchId, streamingWrite), newAttributePlan) + case _: SupportsStreamingWrite => + newAttributePlan.asInstanceOf[WriteToMicroBatchDataSource].createPlan(currentBatchId) case _ => throw new IllegalArgumentException(s"unknown sink type for $sink") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala index bba640e..180a23c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala @@ -585,7 +585,7 @@ abstract class StreamExecution( options: Map[String, String], inputPlan: LogicalPlan): StreamingWrite = { val writeBuilder = table.newWriteBuilder(new DataSourceOptions(options.asJava)) - .withQueryId(runId.toString) + .withQueryId(id.toString) .withInputDataSchema(inputPlan.schema) outputMode match { case Append => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index aef556d..f55a45d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -61,7 +61,7 @@ class ContinuousExecution( // Throwable that caused the execution to fail private val failure: AtomicReference[Throwable] = new AtomicReference[Throwable](null) - override val logicalPlan: LogicalPlan = { + override val logicalPlan: WriteToContinuousDataSource = { val v2ToRelationMap = MutableMap[StreamingRelationV2, StreamingDataSourceV2Relation]() var nextSourceId = 0 val _logicalPlan = analyzedPlan.transform { @@ -88,7 +88,8 @@ class ContinuousExecution( } uniqueSources = sources.distinct - _logicalPlan + WriteToContinuousDataSource( + createStreamingWrite(sink, extraOptions, _logicalPlan), _logicalPlan) } private val triggerExecutor = trigger match { @@ -178,13 +179,10 @@ class ContinuousExecution( "CurrentTimestamp and CurrentDate not yet supported for continuous processing") } - val streamingWrite = createStreamingWrite(sink, extraOptions, withNewSources) - val planWithSink = WriteToContinuousDataSource(streamingWrite, withNewSources) - reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSessionForQuery, - planWithSink, + withNewSources, outputMode, checkpointFile("state"), id, @@ -194,7 +192,7 @@ class ContinuousExecution( lastExecution.executedPlan // Force the lazy generation of execution plan } - val stream = planWithSink.collect { + val stream = withNewSources.collect { case relation: StreamingDataSourceV2Relation => relation.stream.asInstanceOf[ContinuousStream] }.head @@ -215,7 +213,13 @@ class ContinuousExecution( // Use the parent Spark session for the endpoint since it's where this query ID is registered. val epochEndpoint = EpochCoordinatorRef.create( - streamingWrite, stream, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get) + logicalPlan.write, + stream, + this, + epochCoordinatorId, + currentBatchId, + sparkSession, + SparkEnv.get) val epochUpdateThread = new Thread(new Runnable { override def run: Unit = { try { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala new file mode 100644 index 0000000..a3f58fa --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/sources/WriteToMicroBatchDataSource.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.streaming.sources + +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan +import org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2 +import org.apache.spark.sql.sources.v2.writer.streaming.StreamingWrite + +/** + * The logical plan for writing data to a micro-batch stream. + * + * Note that this logical plan does not have a corresponding physical plan, as it will be converted + * to [[WriteToDataSourceV2]] with [[MicroBatchWrite]] before execution. + */ +case class WriteToMicroBatchDataSource(write: StreamingWrite, query: LogicalPlan) + extends LogicalPlan { + override def children: Seq[LogicalPlan] = Seq(query) + override def output: Seq[Attribute] = Nil + + def createPlan(batchId: Long): WriteToDataSourceV2 = { + WriteToDataSourceV2(new MicroBatchWrite(batchId, write), query) + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 97b694e..3c2c700 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -26,7 +26,8 @@ import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader._ import org.apache.spark.sql.sources.v2.reader.streaming._ -import org.apache.spark.sql.sources.v2.writer.WriteBuilder +import org.apache.spark.sql.sources.v2.writer.{WriteBuilder, WriterCommitMessage} +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamingDataWriterFactory, StreamingWrite} import org.apache.spark.sql.streaming.{OutputMode, StreamingQuery, StreamTest, Trigger} import org.apache.spark.sql.types.StructType import org.apache.spark.util.Utils @@ -59,6 +60,19 @@ class FakeScanBuilder extends ScanBuilder with Scan { override def toContinuousStream(checkpointLocation: String): ContinuousStream = new FakeDataStream } +class FakeWriteBuilder extends WriteBuilder with StreamingWrite { + override def buildForStreaming(): StreamingWrite = this + override def createStreamingWriterFactory(): StreamingDataWriterFactory = { + throw new IllegalStateException("fake sink - cannot actually write") + } + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { + throw new IllegalStateException("fake sink - cannot actually write") + } + override def abort(epochId: Long, messages: Array[WriterCommitMessage]): Unit = { + throw new IllegalStateException("fake sink - cannot actually write") + } +} + trait FakeMicroBatchReadTable extends Table with SupportsMicroBatchRead { override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) @@ -75,7 +89,7 @@ trait FakeStreamingWriteTable extends Table with SupportsStreamingWrite { override def name(): String = "fake" override def schema(): StructType = StructType(Seq()) override def newWriteBuilder(options: DataSourceOptions): WriteBuilder = { - throw new IllegalStateException("fake sink - cannot actually write") + new FakeWriteBuilder } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org