[ 
https://issues.apache.org/jira/browse/SPARK-26428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16727288#comment-16727288
 ] 

ASF GitHub Bot commented on SPARK-26428:
----------------------------------------

asfgit closed pull request #23367: [SPARK-26428][SS][TEST] Minimize deprecated 
`ProcessingTime` usage
URL: https://github.com/apache/spark/pull/23367
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 61cbb3285a4f0..d4eb526540053 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -42,7 +42,7 @@ import org.apache.spark.sql.functions.{count, window}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.kafka010.KafkaSourceProvider._
 import org.apache.spark.sql.sources.v2.DataSourceOptions
-import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest}
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
 import org.apache.spark.sql.streaming.util.StreamManualClock
 import org.apache.spark.sql.test.SharedSQLContext
 
@@ -236,7 +236,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     }
 
     testStream(mapped)(
-      StartStream(ProcessingTime(100), clock),
+      StartStream(Trigger.ProcessingTime(100), clock),
       waitUntilBatchProcessed,
       // 1 from smallest, 1 from middle, 8 from biggest
       CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
@@ -247,7 +247,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
         11, 108, 109, 110, 111, 112, 113, 114, 115, 116
       ),
       StopStream,
-      StartStream(ProcessingTime(100), clock),
+      StartStream(Trigger.ProcessingTime(100), clock),
       waitUntilBatchProcessed,
       // smallest now empty, 1 more from middle, 9 more from biggest
       CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
@@ -282,7 +282,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
 
     val mapped = kafka.map(kv => kv._2.toInt + 1)
     testStream(mapped)(
-      StartStream(trigger = ProcessingTime(1)),
+      StartStream(trigger = Trigger.ProcessingTime(1)),
       makeSureGetOffsetCalled,
       AddKafkaData(Set(topic), 1, 2, 3),
       CheckAnswer(2, 3, 4),
@@ -605,7 +605,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     }
 
     testStream(kafka)(
-      StartStream(ProcessingTime(100), clock),
+      StartStream(Trigger.ProcessingTime(100), clock),
       waitUntilBatchProcessed,
       // 5 from smaller topic, 5 from bigger one
       CheckLastBatch((0 to 4) ++ (100 to 104): _*),
@@ -618,7 +618,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
       // smaller topic empty, 5 from bigger one
       CheckLastBatch(110 to 114: _*),
       StopStream,
-      StartStream(ProcessingTime(100), clock),
+      StartStream(Trigger.ProcessingTime(100), clock),
       waitUntilBatchProcessed,
       // smallest now empty, 5 from bigger one
       CheckLastBatch(115 to 119: _*),
@@ -727,7 +727,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     // The message values are the same as their offsets to make the test easy 
to follow
     testUtils.withTranscationalProducer { producer =>
       testStream(mapped)(
-        StartStream(ProcessingTime(100), clock),
+        StartStream(Trigger.ProcessingTime(100), clock),
         waitUntilBatchProcessed,
         CheckAnswer(),
         WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
@@ -850,7 +850,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
     // The message values are the same as their offsets to make the test easy 
to follow
     testUtils.withTranscationalProducer { producer =>
       testStream(mapped)(
-        StartStream(ProcessingTime(100), clock),
+        StartStream(Trigger.ProcessingTime(100), clock),
         waitUntilBatchProcessed,
         CheckNewAnswer(),
         WithOffsetSync(topicPartition, expectedOffset = 5) { () =>
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
index d4bd9c7987f2d..de664cafed3b6 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala
@@ -1360,7 +1360,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest {
       options = srcOptions)
     val clock = new StreamManualClock()
     testStream(fileStream)(
-      StartStream(trigger = ProcessingTime(10), triggerClock = clock),
+      StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock),
       AssertOnQuery { _ =>
         // Block until the first batch finishes.
         eventually(timeout(streamingTimeout)) {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
index f55ddb5419d20..55fdcee83f114 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala
@@ -312,7 +312,7 @@ class StreamSuite extends StreamTest {
 
     val inputData = MemoryStream[Int]
     testStream(inputData.toDS())(
-      StartStream(ProcessingTime("10 seconds"), new StreamManualClock),
+      StartStream(Trigger.ProcessingTime("10 seconds"), new StreamManualClock),
 
       /* -- batch 0 ----------------------- */
       // Add some data in batch 0
@@ -353,7 +353,7 @@ class StreamSuite extends StreamTest {
 
       /* Stop then restart the Stream  */
       StopStream,
-      StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 
1000)),
+      StartStream(Trigger.ProcessingTime("10 seconds"), new 
StreamManualClock(60 * 1000)),
 
       /* -- batch 1 no rerun ----------------- */
       // batch 1 would not re-run because the latest batch id logged in commit 
log is 1
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
index fe77a1b4469c5..d00f2e3bf4d1a 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala
@@ -82,7 +82,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
       testStream(df, OutputMode.Append)(
 
         // Start event generated when query started
-        StartStream(ProcessingTime(100), triggerClock = clock),
+        StartStream(Trigger.ProcessingTime(100), triggerClock = clock),
         AssertOnQuery { query =>
           assert(listener.startEvent !== null)
           assert(listener.startEvent.id === query.id)
@@ -124,7 +124,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
         },
 
         // Termination event generated with exception message when stopped 
with error
-        StartStream(ProcessingTime(100), triggerClock = clock),
+        StartStream(Trigger.ProcessingTime(100), triggerClock = clock),
         AssertStreamExecThreadToWaitForClock(),
         AddData(inputData, 0),
         AdvanceManualClock(100), // process bad data
@@ -306,7 +306,7 @@ class StreamingQueryListenerSuite extends StreamTest with 
BeforeAndAfter {
         }
         val clock = new StreamManualClock()
         val actions = mutable.ArrayBuffer[StreamAction]()
-        actions += StartStream(trigger = ProcessingTime(10), triggerClock = 
clock)
+        actions += StartStream(trigger = Trigger.ProcessingTime(10), 
triggerClock = clock)
         for (_ <- 1 to 100) {
           actions += AdvanceManualClock(10)
         }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index c170641372d61..29b816486a1fe 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -257,7 +257,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
     var lastProgressBeforeStop: StreamingQueryProgress = null
 
     testStream(mapped, OutputMode.Complete)(
-      StartStream(ProcessingTime(1000), triggerClock = clock),
+      StartStream(Trigger.ProcessingTime(1000), triggerClock = clock),
       AssertStreamExecThreadIsWaitingForTime(1000),
       AssertOnQuery(_.status.isDataAvailable === false),
       AssertOnQuery(_.status.isTriggerActive === false),
@@ -370,7 +370,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
       AssertOnQuery(_.status.message === "Stopped"),
 
       // Test status and progress after query terminated with error
-      StartStream(ProcessingTime(1000), triggerClock = clock),
+      StartStream(Trigger.ProcessingTime(1000), triggerClock = clock),
       AdvanceManualClock(1000), // ensure initial trigger completes before 
AddData
       AddData(inputData, 0),
       AdvanceManualClock(1000), // allow another trigger


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Minimize deprecated `ProcessingTime` usage
> ------------------------------------------
>
>                 Key: SPARK-26428
>                 URL: https://issues.apache.org/jira/browse/SPARK-26428
>             Project: Spark
>          Issue Type: Improvement
>          Components: Tests
>    Affects Versions: 3.0.0
>            Reporter: Dongjoon Hyun
>            Assignee: Dongjoon Hyun
>            Priority: Minor
>             Fix For: 3.0.0
>
>
> Use of `ProcessingTime` class was deprecated in favor of 
> `Trigger.ProcessingTime` in Spark 2.2. And, SPARK-21464 minimized it at 
> 2.2.1. Recently, it grows again in test suites. This issue aims to clean up 
> newly introduced deprecation warnings for Spark 3.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to