[ https://issues.apache.org/jira/browse/SPARK-26428?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16727288#comment-16727288 ]
ASF GitHub Bot commented on SPARK-26428: ---------------------------------------- asfgit closed pull request #23367: [SPARK-26428][SS][TEST] Minimize deprecated `ProcessingTime` usage URL: https://github.com/apache/spark/pull/23367 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 61cbb3285a4f0..d4eb526540053 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -42,7 +42,7 @@ import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.kafka010.KafkaSourceProvider._ import org.apache.spark.sql.sources.v2.DataSourceOptions -import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} +import org.apache.spark.sql.streaming.{StreamTest, Trigger} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.SharedSQLContext @@ -236,7 +236,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } testStream(mapped)( - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, // 1 from smallest, 1 from middle, 8 from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), @@ -247,7 +247,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { 11, 108, 109, 110, 111, 112, 113, 114, 115, 116 ), StopStream, - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, // smallest now empty, 1 more from middle, 9 more from biggest CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, @@ -282,7 +282,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { val mapped = kafka.map(kv => kv._2.toInt + 1) testStream(mapped)( - StartStream(trigger = ProcessingTime(1)), + StartStream(trigger = Trigger.ProcessingTime(1)), makeSureGetOffsetCalled, AddKafkaData(Set(topic), 1, 2, 3), CheckAnswer(2, 3, 4), @@ -605,7 +605,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { } testStream(kafka)( - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, // 5 from smaller topic, 5 from bigger one CheckLastBatch((0 to 4) ++ (100 to 104): _*), @@ -618,7 +618,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { // smaller topic empty, 5 from bigger one CheckLastBatch(110 to 114: _*), StopStream, - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, // smallest now empty, 5 from bigger one CheckLastBatch(115 to 119: _*), @@ -727,7 +727,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { // The message values are the same as their offsets to make the test easy to follow testUtils.withTranscationalProducer { producer => testStream(mapped)( - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, CheckAnswer(), WithOffsetSync(topicPartition, expectedOffset = 5) { () => @@ -850,7 +850,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase { // The message values are the same as their offsets to make the test easy to follow testUtils.withTranscationalProducer { producer => testStream(mapped)( - StartStream(ProcessingTime(100), clock), + StartStream(Trigger.ProcessingTime(100), clock), waitUntilBatchProcessed, CheckNewAnswer(), WithOffsetSync(topicPartition, expectedOffset = 5) { () => diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala index d4bd9c7987f2d..de664cafed3b6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/FileStreamSourceSuite.scala @@ -1360,7 +1360,7 @@ class FileStreamSourceSuite extends FileStreamSourceTest { options = srcOptions) val clock = new StreamManualClock() testStream(fileStream)( - StartStream(trigger = ProcessingTime(10), triggerClock = clock), + StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock), AssertOnQuery { _ => // Block until the first batch finishes. eventually(timeout(streamingTimeout)) { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala index f55ddb5419d20..55fdcee83f114 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamSuite.scala @@ -312,7 +312,7 @@ class StreamSuite extends StreamTest { val inputData = MemoryStream[Int] testStream(inputData.toDS())( - StartStream(ProcessingTime("10 seconds"), new StreamManualClock), + StartStream(Trigger.ProcessingTime("10 seconds"), new StreamManualClock), /* -- batch 0 ----------------------- */ // Add some data in batch 0 @@ -353,7 +353,7 @@ class StreamSuite extends StreamTest { /* Stop then restart the Stream */ StopStream, - StartStream(ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)), + StartStream(Trigger.ProcessingTime("10 seconds"), new StreamManualClock(60 * 1000)), /* -- batch 1 no rerun ----------------- */ // batch 1 would not re-run because the latest batch id logged in commit log is 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala index fe77a1b4469c5..d00f2e3bf4d1a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryListenerSuite.scala @@ -82,7 +82,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { testStream(df, OutputMode.Append)( // Start event generated when query started - StartStream(ProcessingTime(100), triggerClock = clock), + StartStream(Trigger.ProcessingTime(100), triggerClock = clock), AssertOnQuery { query => assert(listener.startEvent !== null) assert(listener.startEvent.id === query.id) @@ -124,7 +124,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { }, // Termination event generated with exception message when stopped with error - StartStream(ProcessingTime(100), triggerClock = clock), + StartStream(Trigger.ProcessingTime(100), triggerClock = clock), AssertStreamExecThreadToWaitForClock(), AddData(inputData, 0), AdvanceManualClock(100), // process bad data @@ -306,7 +306,7 @@ class StreamingQueryListenerSuite extends StreamTest with BeforeAndAfter { } val clock = new StreamManualClock() val actions = mutable.ArrayBuffer[StreamAction]() - actions += StartStream(trigger = ProcessingTime(10), triggerClock = clock) + actions += StartStream(trigger = Trigger.ProcessingTime(10), triggerClock = clock) for (_ <- 1 to 100) { actions += AdvanceManualClock(10) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index c170641372d61..29b816486a1fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -257,7 +257,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi var lastProgressBeforeStop: StreamingQueryProgress = null testStream(mapped, OutputMode.Complete)( - StartStream(ProcessingTime(1000), triggerClock = clock), + StartStream(Trigger.ProcessingTime(1000), triggerClock = clock), AssertStreamExecThreadIsWaitingForTime(1000), AssertOnQuery(_.status.isDataAvailable === false), AssertOnQuery(_.status.isTriggerActive === false), @@ -370,7 +370,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi AssertOnQuery(_.status.message === "Stopped"), // Test status and progress after query terminated with error - StartStream(ProcessingTime(1000), triggerClock = clock), + StartStream(Trigger.ProcessingTime(1000), triggerClock = clock), AdvanceManualClock(1000), // ensure initial trigger completes before AddData AddData(inputData, 0), AdvanceManualClock(1000), // allow another trigger ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Minimize deprecated `ProcessingTime` usage > ------------------------------------------ > > Key: SPARK-26428 > URL: https://issues.apache.org/jira/browse/SPARK-26428 > Project: Spark > Issue Type: Improvement > Components: Tests > Affects Versions: 3.0.0 > Reporter: Dongjoon Hyun > Assignee: Dongjoon Hyun > Priority: Minor > Fix For: 3.0.0 > > > Use of `ProcessingTime` class was deprecated in favor of > `Trigger.ProcessingTime` in Spark 2.2. And, SPARK-21464 minimized it at > 2.2.1. Recently, it grows again in test suites. This issue aims to clean up > newly introduced deprecation warnings for Spark 3.0. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org