This is an automated email from the ASF dual-hosted git repository.

kabhwan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 66b1f79b728 [SPARK-39805][SS] Deprecate Trigger.Once and Promote 
Trigger.AvailableNow
66b1f79b728 is described below

commit 66b1f79b72855af35351ff995492f2c13872dac5
Author: Jungtaek Lim <kabhwan.opensou...@gmail.com>
AuthorDate: Thu Jul 21 07:35:00 2022 +0900

    [SPARK-39805][SS] Deprecate Trigger.Once and Promote Trigger.AvailableNow
    
    ### What changes were proposed in this pull request?
    
    This PR proposes to deprecate Trigger.Once and suggest Trigger.AvailableNow 
as a replacement.
    
    This PR also tries to replace Trigger.Once to Trigger.AvailableNow in the 
test code as well, except the cases Trigger.Once is used intentionally.
    
    ### Why are the changes needed?
    
    Trigger.Once() exposes various issues, including:
    
    1) weak guarantee of the contract
    
    This is the javadoc content of `Trigger.Once`:
    
    > A trigger that processes all available data in a single batch then 
terminates the query.
    
    Spark does not respect the contract when there is "uncommitted" batch in 
the previous run. It really works as the name represents, "just run a single 
batch", hence if there is "uncommitted" batch, Spark will execute the 
"uncommitted" batch and terminate without processing new data.
    
    2) scalable issue on batch
    
    This is the main rationalization we introduced Trigger.AvailableNow.
    
    3) huge output latency for stateful operator due to the lack of no-data 
batch
    
    Since Trigger.Once executes the single batch and terminates, the processing 
for watermark advancement is deferred to the next execution of the query, which 
tends to be multiple hours or even day(s).
    
    ### Does this PR introduce _any_ user-facing change?
    
    Yes, end users will start to see the deprecation message when they use 
Trigger.Once. The deprecation message guides the end users to migrate to 
Trigger.Available, with the rationalization on migration.
    
    ### How was this patch tested?
    
    Existing UTs
    
    Closes #37213 from HeartSaVioR/SPARK-39805.
    
    Authored-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
    Signed-off-by: Jungtaek Lim <kabhwan.opensou...@gmail.com>
---
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  3 +++
 docs/ss-migration-guide.md                         |  4 ++++
 docs/structured-streaming-programming-guide.md     | 20 ++++++++++++++++----
 .../org/apache/spark/sql/streaming/Trigger.java    | 19 ++++++++++++++++---
 .../streaming/MicroBatchExecutionSuite.scala       |  2 +-
 .../streaming/sources/ForeachBatchSinkSuite.scala  |  4 ++--
 .../sources/RatePerMicroBatchProviderSuite.scala   |  1 +
 .../sql/streaming/EventTimeWatermarkSuite.scala    | 22 ++++++++++------------
 .../sql/streaming/FileStreamSourceSuite.scala      |  2 ++
 .../spark/sql/streaming/StreamingQuerySuite.scala  | 17 ++++++++++++++++-
 .../sources/StreamingDataSourceV2Suite.scala       | 12 ++++++++----
 11 files changed, 79 insertions(+), 27 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 5a8caef9e5e..af66ecd21c0 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -338,6 +338,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     )
 
     // When Trigger.Once() is used, the read limit should be ignored
+    // NOTE: the test uses the deprecated Trigger.Once() by intention, do not 
change.
     val allData = Seq(1) ++ (10 to 20) ++ (100 to 200)
     withTempDir { dir =>
       testStream(mapped)(
@@ -435,6 +436,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         13, 14, 15, 16, 17, 18, 19, 2, 20, 21, 22, 23, 24, 25)
     )
     // When Trigger.Once() is used, the read limit should be ignored
+    // NOTE: the test uses the deprecated Trigger.Once() by intention, do not 
change.
     val allData = Seq(1, 2) ++ (10 to 25) ++ (100 to 125)
     withTempDir { dir =>
       testStream(mapped)(
@@ -537,6 +539,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     )
 
     // When Trigger.Once() is used, the read limit should be ignored
+    // NOTE: the test uses the deprecated Trigger.Once() by intention, do not 
change.
     val allData = Seq(1, 2) ++ (10 to 30) ++ (100 to 128)
     withTempDir { dir =>
       testStream(mapped)(
diff --git a/docs/ss-migration-guide.md b/docs/ss-migration-guide.md
index c28724576bc..0ca5b00debc 100644
--- a/docs/ss-migration-guide.md
+++ b/docs/ss-migration-guide.md
@@ -26,6 +26,10 @@ Note that this migration guide describes the items specific 
to Structured Stream
 Many items of SQL migration can be applied when migrating Structured Streaming 
to higher versions.
 Please refer [Migration Guide: SQL, Datasets and 
DataFrame](sql-migration-guide.html).
 
+## Upgrading from Structured Streaming 3.3 to 3.4
+
+- Since Spark 3.4, `Trigger.Once` is deprecated, and users are encouraged to 
migrate from `Trigger.Once` to `Trigger.AvailableNow`. Please refer 
[SPARK-39805](https://issues.apache.org/jira/browse/SPARK-39805) for more 
details.
+
 ## Upgrading from Structured Streaming 3.2 to 3.3
 
 - Since Spark 3.3, all stateful operators require hash partitioning with exact 
grouping keys. In previous versions, all stateful operators except 
stream-stream join require loose partitioning criteria which opens the 
possibility on correctness issue. (See 
[SPARK-38204](https://issues.apache.org/jira/browse/SPARK-38204) for more 
details.) To ensure backward compatibility, we retain the old behavior with the 
checkpoint built from older versions.
diff --git a/docs/structured-streaming-programming-guide.md 
b/docs/structured-streaming-programming-guide.md
index c0f501a3d92..c3b88a6d165 100644
--- a/docs/structured-streaming-programming-guide.md
+++ b/docs/structured-streaming-programming-guide.md
@@ -2779,12 +2779,15 @@ Here are the different kinds of triggers that are 
supported.
     </td>
   </tr>
   <tr>
-    <td><b>One-time micro-batch</b></td>
+    <td><b>One-time micro-batch</b><i>(deprecated)</i></td>
     <td>
         The query will execute <strong>only one</strong> micro-batch to 
process all the available data and then
         stop on its own. This is useful in scenarios you want to periodically 
spin up a cluster,
         process everything that is available since the last period, and then 
shutdown the
         cluster. In some case, this may lead to significant cost savings.
+        Note that this trigger is deprecated and users are encouraged to 
migrate to <b>Available-now micro-batch</b>,
+        as it provides the better guarantee of processing, fine-grained scale 
of batches, and better gradual processing
+        of watermark advancement including no-data batch.
     </td>
   </tr>
   <tr>
@@ -2794,6 +2797,15 @@ Here are the different kinds of triggers that are 
supported.
         stop on its own. The difference is that, it will process the data in 
(possibly) multiple micro-batches
         based on the source options (e.g. <code>maxFilesPerTrigger</code> for 
file source), which will result
         in better query scalability.
+        <ul>
+            <li>This trigger provides a strong guarantee of processing: 
regardless of how many batches were
+                left over in previous run, it ensures all available data at 
the time of execution gets
+                processed before termination. All uncommitted batches will be 
processed first.</li>
+
+            <li>Watermark gets advanced per each batch, and no-data batch gets 
executed before termination
+                if the last batch advances the watermark. This helps to 
maintain smaller and predictable
+                state size and smaller latency on the output of stateful 
operators.</li>
+        </ul>
     </td>
   </tr>
   <tr>
@@ -2824,7 +2836,7 @@ df.writeStream
   .trigger(Trigger.ProcessingTime("2 seconds"))
   .start()
 
-// One-time trigger
+// One-time trigger (Deprecated, encouraged to use Available-now trigger)
 df.writeStream
   .format("console")
   .trigger(Trigger.Once())
@@ -2862,7 +2874,7 @@ df.writeStream
   .trigger(Trigger.ProcessingTime("2 seconds"))
   .start();
 
-// One-time trigger
+// One-time trigger (Deprecated, encouraged to use Available-now trigger)
 df.writeStream
   .format("console")
   .trigger(Trigger.Once())
@@ -2898,7 +2910,7 @@ df.writeStream \
   .trigger(processingTime='2 seconds') \
   .start()
 
-# One-time trigger
+# One-time trigger (Deprecated, encouraged to use Available-now trigger)
 df.writeStream \
   .format("console") \
   .trigger(once=True) \
diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java 
b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
index b6e105cfe91..328c5290e77 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
@@ -92,11 +92,13 @@ public class Trigger {
   /**
    * A trigger that processes all available data in a single batch then 
terminates the query.
    *
-   * For better scalability, AvailableNow can be used alternatively to process 
the data in
-   * multiple batches.
-   *
    * @since 2.2.0
+   * @deprecated This is deprecated as of Spark 3.4.0. Use {@link 
#AvailableNow()} to leverage
+   *             better guarantee of processing, fine-grained scale of 
batches, and better gradual
+   *             processing of watermark advancement including no-data batch.
+   *             See the NOTES in {@link #AvailableNow()} for details.
    */
+  @Deprecated
   public static Trigger Once() {
     return OneTimeTrigger$.MODULE$;
   }
@@ -105,6 +107,17 @@ public class Trigger {
    * A trigger that processes all available data at the start of the query in 
one or multiple
    * batches, then terminates the query.
    *
+   * Users are encouraged to set the source options to control the size of the 
batch as similar as
+   * controlling the size of the batch in {@link #ProcessingTime(long)} 
trigger.
+   *
+   * NOTES:
+   * - This trigger provides a strong guarantee of processing: regardless of 
how many batches were
+   *   left over in previous run, it ensures all available data at the time of 
execution gets
+   *   processed before termination. All uncommitted batches will be processed 
first.
+   * - Watermark gets advanced per each batch, and no-data batch gets executed 
before termination
+   *   if the last batch advances the watermark. This helps to maintain 
smaller and predictable
+   *   state size and smaller latency on the output of stateful operators.
+   *
    * @since 3.3.0
    */
   public static Trigger AvailableNow() {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
index f06e62b33b1..749ca9d06ea 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecutionSuite.scala
@@ -92,7 +92,7 @@ class MicroBatchExecutionSuite extends StreamTest with 
BeforeAndAfter {
 
     testStream(streamEvent) (
       AddData(inputData, 1, 2, 3, 4, 5, 6),
-      StartStream(Trigger.Once, checkpointLocation = 
checkpointDir.getAbsolutePath),
+      StartStream(Trigger.AvailableNow(), checkpointLocation = 
checkpointDir.getAbsolutePath),
       ExpectFailure[IllegalStateException] { e =>
         assert(e.getMessage.contains("batch 3 doesn't exist"))
       }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
index dbac4af90c0..91e915235d7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/ForeachBatchSinkSuite.scala
@@ -169,7 +169,7 @@ class ForeachBatchSinkSuite extends StreamTest {
 
       stream.addData(1, 2, 3, 4, 5)
 
-      val query = 
ds.writeStream.trigger(Trigger.Once()).foreachBatch(writer).start()
+      val query = 
ds.writeStream.trigger(Trigger.AvailableNow()).foreachBatch(writer).start()
       query.awaitTermination()
 
       assert(planAsserted, "ForeachBatch writer should be called!")
@@ -210,7 +210,7 @@ class ForeachBatchSinkSuite extends StreamTest {
 
       stream.addData(1, 2, 3, 4, 5)
 
-      val query = 
ds.writeStream.trigger(Trigger.Once()).foreachBatch(writer).start()
+      val query = 
ds.writeStream.trigger(Trigger.AvailableNow()).foreachBatch(writer).start()
       query.awaitTermination()
 
       assert(planAsserted, "ForeachBatch writer should be called!")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
index 0084e0c65b2..5ef531d4540 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
@@ -84,6 +84,7 @@ class RatePerMicroBatchProviderSuite extends StreamTest {
   }
 
   test("Trigger.Once") {
+    // NOTE: the test uses the deprecated Trigger.Once() by intention, do not 
change.
     testTrigger(Trigger.Once())
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
index 5c74176bf8e..058c335ad43 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/EventTimeWatermarkSuite.scala
@@ -184,6 +184,8 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
     // Also, the data to process in the next trigger is added *before* 
starting the stream in
     // Trigger.Once to ensure that first and only trigger picks up the new 
data.
 
+    // NOTE: the test uses the deprecated Trigger.Once() by intention, do not 
change.
+
     testStream(aggWithWatermark)(
       StartStream(Trigger.Once),  // to make sure the query is not running 
when adding data 1st time
       awaitTermination(),
@@ -261,28 +263,24 @@ class EventTimeWatermarkSuite extends StreamTest with 
BeforeAndAfter with Matche
       // Offset log should have watermark recorded as 5.
       */
 
-      StartStream(Trigger.Once),
+      StartStream(Trigger.AvailableNow),
       awaitTermination(),
 
       AddData(inputData, 25),
-      StartStream(Trigger.Once, checkpointLocation = 
checkpointDir.getAbsolutePath),
+      StartStream(Trigger.AvailableNow, checkpointLocation = 
checkpointDir.getAbsolutePath),
       awaitTermination(),
-      CheckNewAnswer(),
-      assertEventStats(min = 25, max = 25, avg = 25, wtrmark = 5),
-      // watermark should be updated to 25 - 10 = 15
+      CheckNewAnswer((10, 3)), // watermark should be updated to 25 - 10 = 15
 
       AddData(inputData, 50),
-      StartStream(Trigger.Once, checkpointLocation = 
checkpointDir.getAbsolutePath),
+      StartStream(Trigger.AvailableNow, checkpointLocation = 
checkpointDir.getAbsolutePath),
       awaitTermination(),
-      CheckNewAnswer((10, 3)),   // watermark = 15 is used to generate this
-      assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 15),
-      // watermark should be updated to 50 - 10 = 40
+      CheckNewAnswer((15, 1), (25, 1)), // watermark should be updated to 50 - 
10 = 40
 
       AddData(inputData, 50),
-      StartStream(Trigger.Once, checkpointLocation = 
checkpointDir.getAbsolutePath),
+      StartStream(Trigger.AvailableNow, checkpointLocation = 
checkpointDir.getAbsolutePath),
       awaitTermination(),
-      CheckNewAnswer((15, 1), (25, 1)), // watermark = 40 is used to generate 
this
-      assertEventStats(min = 50, max = 50, avg = 50, wtrmark = 40))
+      CheckNewAnswer()
+    )
   }
 
   test("append mode") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index 92819338843..b81b0f775a5 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1273,6 +1273,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         .text(src.getCanonicalPath)
 
       def startQuery(): StreamingQuery = {
+        // NOTE: the test uses the deprecated Trigger.Once() by intention, do 
not change.
         df.writeStream
           .format("parquet")
           .trigger(Trigger.Once)
@@ -1328,6 +1329,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
         .text(src.getCanonicalPath)
 
       def startTriggerOnceQuery(): StreamingQuery = {
+        // NOTE: the test uses the deprecated Trigger.Once() by intention, do 
not change.
         df.writeStream
           .foreachBatch((_: Dataset[Row], _: Long) => {})
           .trigger(Trigger.Once)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index d47c3ac3a56..9499b20ada6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -174,6 +174,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
   }
 
   testQuietly("OneTime trigger, commit log, and exception") {
+    // NOTE: the test uses the deprecated Trigger.Once() by intention, do not 
change.
     import Trigger.Once
     val inputData = MemoryStream[Int]
     val mapped = inputData.toDS().map { 6 / _}
@@ -182,7 +183,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
       AssertOnQuery(_.isActive),
       StopStream,
       AddData(inputData, 1, 2),
-      StartStream(trigger = Once),
+      StartStream(trigger = Trigger.Once),
       CheckAnswer(6, 3),
       StopStream, // clears out StreamTest state
       AssertOnQuery { q =>
@@ -846,6 +847,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
   }
 
   test("processAllAvailable should not block forever when a query is stopped") 
{
+    // NOTE: the test uses the deprecated Trigger.Once() by intention, do not 
change.
     val input = MemoryStream[Int]
     input.addData(1)
     val query = input.toDF().writeStream
@@ -857,6 +859,19 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     }
   }
 
+  test("processAllAvailable should not block forever when a query is stopped 
-" +
+    " Trigger.AvailableNow") {
+    val input = MemoryStream[Int]
+    input.addData(1)
+    val query = input.toDF().writeStream
+      .trigger(Trigger.AvailableNow())
+      .format("console")
+      .start()
+    failAfter(streamingTimeout) {
+      query.processAllAvailable()
+    }
+  }
+
   test("SPARK-22238: don't check for RDD partitions during streaming 
aggregation preparation") {
     val stream = MemoryStream[(Int, Int)]
     val baseDf = Seq((1, "A"), (2, "b")).toDF("num", "char").where("char = 
'A'")
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 251a02d922e..9906defa96e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -291,7 +291,9 @@ class StreamingDataSourceV2Suite extends StreamTest {
     "fake-write-microbatch-continuous",
     "fake-write-neither-mode")
   val triggers = Seq(
+    // NOTE: the test uses the deprecated Trigger.Once() by intention, do not 
change.
     Trigger.Once(),
+    Trigger.AvailableNow(),
     Trigger.ProcessingTime(1000),
     Trigger.Continuous(1000))
 
@@ -349,7 +351,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
     "supports external metadata") {
     testPositiveCaseWithQuery(
       "fake-read-microbatch-continuous", 
"fake-write-supporting-external-metadata",
-      Trigger.Once()) { v2Query =>
+      Trigger.AvailableNow()) { v2Query =>
       val sink = 
v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
       assert(sink.isInstanceOf[Table])
       assert(sink.schema() == StructType(Nil))
@@ -359,7 +361,8 @@ class StreamingDataSourceV2Suite extends StreamTest {
   test("disabled v2 write") {
     // Ensure the V2 path works normally and generates a V2 sink..
     testPositiveCaseWithQuery(
-      "fake-read-microbatch-continuous", "fake-write-v1-fallback", 
Trigger.Once()) { v2Query =>
+      "fake-read-microbatch-continuous", "fake-write-v1-fallback",
+      Trigger.AvailableNow()) { v2Query =>
       assert(v2Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
         .isInstanceOf[Table])
     }
@@ -369,7 +372,8 @@ class StreamingDataSourceV2Suite extends StreamTest {
     val fullSinkName = classOf[FakeWriteSupportProviderV1Fallback].getName
     withSQLConf(SQLConf.DISABLED_V2_STREAMING_WRITERS.key -> 
s"a,b,c,test,$fullSinkName,d,e") {
       testPositiveCaseWithQuery(
-        "fake-read-microbatch-continuous", "fake-write-v1-fallback", 
Trigger.Once()) { v1Query =>
+        "fake-read-microbatch-continuous", "fake-write-v1-fallback",
+        Trigger.AvailableNow()) { v1Query =>
         assert(v1Query.asInstanceOf[StreamingQueryWrapper].streamingQuery.sink
           .isInstanceOf[FakeSink])
       }
@@ -377,7 +381,7 @@ class StreamingDataSourceV2Suite extends StreamTest {
   }
 
   Seq(
-    Tuple2(classOf[FakeReadMicroBatchOnly], Trigger.Once()),
+    Tuple2(classOf[FakeReadMicroBatchOnly], Trigger.AvailableNow()),
     Tuple2(classOf[FakeReadContinuousOnly], Trigger.Continuous(1000))
   ).foreach { case (source, trigger) =>
     test(s"SPARK-25460: session options are respected in structured streaming 
sources - $source") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to