This is an automated email from the ASF dual-hosted git repository. kabhwan pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 66b1f79b728 [SPARK-39805][SS] Deprecate Trigger.Once and Promote Trigger.AvailableNow 66b1f79b728 is described below commit 66b1f79b72855af35351ff995492f2c13872dac5 Author: Jungtaek Lim <kabhwan.opensou...@gmail.com> AuthorDate: Thu Jul 21 07:35:00 2022 +0900 [SPARK-39805][SS] Deprecate Trigger.Once and Promote Trigger.AvailableNow ### What changes were proposed in this pull request? This PR proposes to deprecate Trigger.Once and suggest Trigger.AvailableNow as a replacement. This PR also tries to replace Trigger.Once to Trigger.AvailableNow in the test code as well, except the cases Trigger.Once is used intentionally. ### Why are the changes needed? Trigger.Once() exposes various issues, including: 1) weak guarantee of the contract This is the javadoc content of `Trigger.Once`: > A trigger that processes all available data in a single batch then terminates the query. Spark does not respect the contract when there is "uncommitted" batch in the previous run. It really works as the name represents, "just run a single batch", hence if there is "uncommitted" batch, Spark will execute the "uncommitted" batch and terminate without processing new data. 2) scalable issue on batch This is the main rationalization we introduced Trigger.AvailableNow. 3) huge output latency for stateful operator due to the lack of no-data batch Since Trigger.Once executes the single batch and terminates, the processing for watermark advancement is deferred to the next execution of the query, which tends to be multiple hours or even day(s). ### Does this PR introduce _any_ user-facing change? Yes, end users will start to see the deprecation message when they use Trigger.Once. The deprecation message guides the end users to migrate to Trigger.Available, with the rationalization on migration. ### How was this patch tested? Existing UTs Closes #37213 from HeartSaVioR/SPARK-39805. Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com> --- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 3 +++ docs/ss-migration-guide.md | 4 ++++ docs/structured-streaming-programming-guide.md | 20 ++++++++++++++++---- .../org/apache/spark/sql/streaming/Trigger.java | 19 ++++++++++++++++--- .../streaming/MicroBatchExecutionSuite.scala | 2 +- .../streaming/sources/ForeachBatchSinkSuite.scala | 4 ++-- .../sources/RatePerMicroBatchProviderSuite.scala | 1 + .../sql/streaming/EventTimeWatermarkSuite.scala | 22 ++++++++++------------ .../sql/streaming/FileStreamSourceSuite.scala | 2 ++ .../spark/sql/streaming/StreamingQuerySuite.scala | 17 ++++++++++++++++- .../sources/StreamingDataSourceV2Suite.scala | 12 ++++++++---- 11 files changed, 79 insertions(+), 27 deletions(-) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 5a8caef9e5e..af66ecd21c0 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -338,6 +338,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) // When Trigger.Once() is used, the read limit should be ignored + // NOTE: the test uses the deprecated Trigger.Once() by intention, do not change. val allData = Seq(1) ++ (10 to 20) ++ (100 to 200) withTempDir { dir => testStream(mapped)( @@ -435,6 +436,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { 13, 14, 15, 16, 17, 18, 19, 2, 20, 21, 22, 23, 24, 25) ) // When Trigger.Once() is used, the read limit should be ignored + // NOTE: the test uses the deprecated Trigger.Once() by intention, do not change. val allData = Seq(1, 2) ++ (10 to 25) ++ (100 to 125) withTempDir { dir => testStream(mapped)( @@ -537,6 +539,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { ) // When Trigger.Once() is used, the read limit should be ignored + // NOTE: the test uses the deprecated Trigger.Once() by intention, do not change. val allData = Seq(1, 2) ++ (10 to 30) ++ (100 to 128) withTempDir { dir => testStream(mapped)( diff --git a/docs/ss-migration-guide.md b/docs/ss-migration-guide.md index c28724576bc..0ca5b00debc 100644 --- a/docs/ss-migration-guide.md +++ b/docs/ss-migration-guide.md @@ -26,6 +26,10 @@ Note that this migration guide describes the items specific to Structured Stream Many items of SQL migration can be applied when migrating Structured Streaming to higher versions. Please refer [Migration Guide: SQL, Datasets and DataFrame](sql-migration-guide.html). +## Upgrading from Structured Streaming 3.3 to 3.4 + +- Since Spark 3.4, `Trigger.Once` is deprecated, and users are encouraged to migrate from `Trigger.Once` to `Trigger.AvailableNow`. Please refer [SPARK-39805](https://issues.apache.org/jira/browse/SPARK-39805) for more details. + ## Upgrading from Structured Streaming 3.2 to 3.3 - Since Spark 3.3, all stateful operators require hash partitioning with exact grouping keys. In previous versions, all stateful operators except stream-stream join require loose partitioning criteria which opens the possibility on correctness issue. (See [SPARK-38204](https://issues.apache.org/jira/browse/SPARK-38204) for more details.) To ensure backward compatibility, we retain the old behavior with the checkpoint built from older versions. diff --git a/docs/structured-streaming-programming-guide.md b/docs/structured-streaming-programming-guide.md index c0f501a3d92..c3b88a6d165 100644 --- a/docs/structured-streaming-programming-guide.md +++ b/docs/structured-streaming-programming-guide.md @@ -2779,12 +2779,15 @@ Here are the different kinds of triggers that are supported. </td> </tr> <tr> - <td><b>One-time micro-batch</b></td> + <td><b>One-time micro-batch</b><i>(deprecated)</i></td> <td> The query will execute <strong>only one</strong> micro-batch to process all the available data and then stop on its own. This is useful in scenarios you want to periodically spin up a cluster, process everything that is available since the last period, and then shutdown the cluster. In some case, this may lead to significant cost savings. + Note that this trigger is deprecated and users are encouraged to migrate to <b>Available-now micro-batch</b>, + as it provides the better guarantee of processing, fine-grained scale of batches, and better gradual processing + of watermark advancement including no-data batch. </td> </tr> <tr> @@ -2794,6 +2797,15 @@ Here are the different kinds of triggers that are supported. stop on its own. The difference is that, it will process the data in (possibly) multiple micro-batches based on the source options (e.g. <code>maxFilesPerTrigger</code> for file source), which will result in better query scalability. + <ul> + <li>This trigger provides a strong guarantee of processing: regardless of how many batches were + left over in previous run, it ensures all available data at the time of execution gets + processed before termination. All uncommitted batches will be processed first.</li> + + <li>Watermark gets advanced per each batch, and no-data batch gets executed before termination + if the last batch advances the watermark. This helps to maintain smaller and predictable + state size and smaller latency on the output of stateful operators.</li> + </ul> </td> </tr> <tr> @@ -2824,7 +2836,7 @@ df.writeStream .trigger(Trigger.ProcessingTime("2 seconds")) .start() -// One-time trigger +// One-time trigger (Deprecated, encouraged to use Available-now trigger) df.writeStream .format("console") .trigger(Trigger.Once()) @@ -2862,7 +2874,7 @@ df.writeStream .trigger(Trigger.ProcessingTime("2 seconds")) .start(); -// One-time trigger +// One-time trigger (Deprecated, encouraged to use Available-now trigger) df.writeStream .format("console") .trigger(Trigger.Once()) @@ -2898,7 +2910,7 @@ df.writeStream \ .trigger(processingTime='2 seconds') \ .start() -# One-time trigger +# One-time trigger (Deprecated, encouraged to use Available-now trigger) df.writeStream \ .format("console") \ .trigger(once=True) \ diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java index b6e105cfe91..328c5290e77 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java @@ -92,11 +92,13 @@ public class Trigger { /** * A trigger that processes all available data in a single batch then terminates the query. * - * For better scalability, AvailableNow can be used alternatively to process the data in - * multiple batches. - * * @since 2.2.0 + * @deprecated This is deprecated as of Spark 3.4.0. Use {@link #AvailableNow()} to leverage + * better guarantee of processing, fine-grained scale of batches, and better gradual + * processing of watermark advancement including no-data batch. + * See the NOTES in {@link #AvailableNow()} for details. */ + @Deprecated public static Trigger Once() { return OneTimeTrigger$.MODULE$; } @@ -105,6 +107,17 @@ public class Trigger { * A trigger that processes all available data at the start of the query in one or multiple * batches, then terminates the query. * + * Users are encouraged to set the source options to control the size of the batch as similar as + * controlling the size of the batch in {@link #ProcessingTime(long)} trigger. + * + * NOTES: + * - This trigger provides a strong guarantee of processing: regardless of how many batches were + * left over in previous run, it ensures all available data at the time of execution gets + * processed before termination. All uncommitted batches will be processed first. + * - Watermark gets advanced per each batch, and no-data batch gets executed before termination + * if the last batch advances the watermark. This helps to maintain smaller and predictable + * state size and smaller latency on the output of stateful operators. + * * @since 3.3.0 */ public static Trigger AvailableNow() { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala index f06e62b33b1..749ca9d06ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala @@ -92,7 +92,7 @@ class MicroBatchExecutionSuite extends StreamTest with BeforeAndAfter { testStream(streamEvent) ( AddData(inputData, 1, 2, 3, 4, 5, 6), - StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath), + StartStream(Trigger.AvailableNow(), checkpointLocation = checkpointDir.getAbsolutePath), ExpectFailure[IllegalStateException] { e => assert(e.getMessage.contains("batch 3 doesn't exist")) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala index dbac4af90c0..91e915235d7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala @@ -169,7 +169,7 @@ class ForeachBatchSinkSuite extends StreamTest { stream.addData(1, 2, 3, 4, 5) - val query = ds.writeStream.trigger(Trigger.Once()).foreachBatch(writer).start() + val query = ds.writeStream.trigger(Trigger.AvailableNow()).foreachBatch(writer).start() query.awaitTermination() assert(planAsserted, "ForeachBatch writer should be called!") @@ -210,7 +210,7 @@ class ForeachBatchSinkSuite extends StreamTest { stream.addData(1, 2, 3, 4, 5) - val query = ds.writeStream.trigger(Trigger.Once()).foreachBatch(writer).start() + val query = ds.writeStream.trigger(Trigger.AvailableNow()).foreachBatch(writer).start() query.awaitTermination() assert(planAsserted, "ForeachBatch writer should be called!") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala index 0084e0c65b2..5ef531d4540 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala @@ -84,6 +84,7 @@ class RatePerMicroBatchProviderSuite extends StreamTest { } test("Trigger.Once") { + // NOTE: the test uses the deprecated Trigger.Once() by intention, do not change. testTrigger(Trigger.Once()) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala index 5c74176bf8e..058c335ad43 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala @@ -184,6 +184,8 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // Also, the data to process in the next trigger is added *before* starting the stream in // Trigger.Once to ensure that first and only trigger picks up the new data. + // NOTE: the test uses the deprecated Trigger.Once() by intention, do not change. + testStream(aggWithWatermark)( StartStream(Trigger.Once), // to make sure the query is not running when adding data 1st time awaitTermination(), @@ -261,28 +263,24 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche // Offset log should have watermark recorded as 5. */ - StartStream(Trigger.Once), + StartStream(Trigger.AvailableNow), awaitTermination(), AddData(inputData, 25), - StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath), + StartStream(Trigger.AvailableNow, checkpointLocation = checkpointDir.getAbsolutePath), awaitTermination(), - CheckNewAnswer(), - assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5), - // watermark should be updated to 25 - 10 = 15 + CheckNewAnswer((10, 3)), // watermark should be updated to 25 - 10 = 15 AddData(inputData, 50), - StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath), + StartStream(Trigger.AvailableNow, checkpointLocation = checkpointDir.getAbsolutePath), awaitTermination(), - CheckNewAnswer((10, 3)), // watermark = 15 is used to generate this - assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 15), - // watermark should be updated to 50 - 10 = 40 + CheckNewAnswer((15, 1), (25, 1)), // watermark should be updated to 50 - 10 = 40 AddData(inputData, 50), - StartStream(Trigger.Once, checkpointLocation = checkpointDir.getAbsolutePath), + StartStream(Trigger.AvailableNow, checkpointLocation = checkpointDir.getAbsolutePath), awaitTermination(), - CheckNewAnswer((15, 1), (25, 1)), // watermark = 40 is used to generate this - assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 40)) + CheckNewAnswer() + ) } test("append mode") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index 92819338843..b81b0f775a5 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1273,6 +1273,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { .text(src.getCanonicalPath) def startQuery(): StreamingQuery = { + // NOTE: the test uses the deprecated Trigger.Once() by intention, do not change. df.writeStream .format("parquet") .trigger(Trigger.Once) @@ -1328,6 +1329,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { .text(src.getCanonicalPath) def startTriggerOnceQuery(): StreamingQuery = { + // NOTE: the test uses the deprecated Trigger.Once() by intention, do not change. df.writeStream .foreachBatch((_: Dataset[Row], _: Long) => {}) .trigger(Trigger.Once) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index d47c3ac3a56..9499b20ada6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -174,6 +174,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } testQuietly("OneTime trigger, commit log, and exception") { + // NOTE: the test uses the deprecated Trigger.Once() by intention, do not change. import Trigger.Once val inputData = MemoryStream[Int] val mapped = inputData.toDS().map { 6 / _} @@ -182,7 +183,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AssertOnQuery(_.isActive), StopStream, AddData(inputData, 1, 2), - StartStream(trigger = Once), + StartStream(trigger = Trigger.Once), CheckAnswer(6, 3), StopStream, // clears out StreamTest state AssertOnQuery { q => @@ -846,6 +847,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } test("processAllAvailable should not block forever when a query is stopped") { + // NOTE: the test uses the deprecated Trigger.Once() by intention, do not change. val input = MemoryStream[Int] input.addData(1) val query = input.toDF().writeStream @@ -857,6 +859,19 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi } } + test("processAllAvailable should not block forever when a query is stopped -" + + " Trigger.AvailableNow") { + val input = MemoryStream[Int] + input.addData(1) + val query = input.toDF().writeStream + .trigger(Trigger.AvailableNow()) + .format("console") + .start() + failAfter(streamingTimeout) { + query.processAllAvailable() + } + } + test("SPARK-22238: don't check for RDD partitions during streaming aggregation preparation") { val stream = MemoryStream[(Int, Int)] val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char").where("char = 'A'") diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 251a02d922e..9906defa96e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -291,7 +291,9 @@ class StreamingDataSourceV2Suite extends StreamTest { "fake-write-microbatch-continuous", "fake-write-neither-mode") val triggers = Seq( + // NOTE: the test uses the deprecated Trigger.Once() by intention, do not change. Trigger.Once(), + Trigger.AvailableNow(), Trigger.ProcessingTime(1000), Trigger.Continuous(1000)) @@ -349,7 +351,7 @@ class StreamingDataSourceV2Suite extends StreamTest { "supports external metadata") { testPositiveCaseWithQuery( "fake-read-microbatch-continuous", "fake-write-supporting-external-metadata", - Trigger.Once()) { v2Query => + Trigger.AvailableNow()) { v2Query => val sink = v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink assert(sink.isInstanceOf[Table]) assert(sink.schema() == StructType(Nil)) @@ -359,7 +361,8 @@ class StreamingDataSourceV2Suite extends StreamTest { test("disabled v2 write") { // Ensure the V2 path works normally and generates a V2 sink.. testPositiveCaseWithQuery( - "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v2Query => + "fake-read-microbatch-continuous", "fake-write-v1-fallback", + Trigger.AvailableNow()) { v2Query => assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink .isInstanceOf[Table]) } @@ -369,7 +372,8 @@ class StreamingDataSourceV2Suite extends StreamTest { val fullSinkName = classOf[FakeWriteSupportProviderV1Fallback].getName withSQLConf(SQLConf.DISABLED_V2_STREAMING_WRITERS.key -> s"a,b,c,test,$fullSinkName,d,e") { testPositiveCaseWithQuery( - "fake-read-microbatch-continuous", "fake-write-v1-fallback", Trigger.Once()) { v1Query => + "fake-read-microbatch-continuous", "fake-write-v1-fallback", + Trigger.AvailableNow()) { v1Query => assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink .isInstanceOf[FakeSink]) } @@ -377,7 +381,7 @@ class StreamingDataSourceV2Suite extends StreamTest { } Seq( - Tuple2(classOf[FakeReadMicroBatchOnly], Trigger.Once()), + Tuple2(classOf[FakeReadMicroBatchOnly], Trigger.AvailableNow()), Tuple2(classOf[FakeReadContinuousOnly], Trigger.Continuous(1000)) ).foreach { case (source, trigger) => test(s"SPARK-25460: session options are respected in structured streaming sources - $source") { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org