This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 7548a88 [SPARK-28199][SS] Move Trigger implementations to Triggers.scala and avoid exposing these to the end users 7548a88 is described below commit 7548a8826d6113121b6bda1ea99ca835374220b6 Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> AuthorDate: Sun Jul 14 14:46:01 2019 -0500 [SPARK-28199][SS] Move Trigger implementations to Triggers.scala and avoid exposing these to the end users ## What changes were proposed in this pull request? This patch proposes moving all Trigger implementations to `Triggers.scala`, to avoid exposing these implementations to the end users and let end users only deal with `Trigger.xxx` static methods. This fits the intention of deprecation of `ProcessingTIme`, and we agree to move others without deprecation as this patch will be shipped in major version (Spark 3.0.0). ## How was this patch tested? UTs modified to work with newly introduced class. Closes #24996 from HeartSaVioR/SPARK-28199. Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com> Signed-off-by: Sean Owen <sean.o...@databricks.com> --- .../sql/kafka010/KafkaContinuousSourceSuite.scala | 2 +- project/MimaExcludes.scala | 6 +- .../org/apache/spark/sql/streaming/Trigger.java | 11 +- .../execution/streaming/MicroBatchExecution.scala | 4 +- .../sql/execution/streaming/TriggerExecutor.scala | 7 +- .../spark/sql/execution/streaming/Triggers.scala | 83 ++++++++++++- .../streaming/continuous/ContinuousExecution.scala | 4 +- .../streaming/continuous/ContinuousTrigger.scala | 57 --------- .../spark/sql/streaming/DataStreamWriter.scala | 1 - .../spark/sql/streaming/ProcessingTime.scala | 133 --------------------- .../sql/streaming/StreamingQueryManager.scala | 2 +- .../org/apache/spark/sql/ProcessingTimeSuite.scala | 7 +- .../streaming/ProcessingTimeExecutorSuite.scala | 11 +- .../sources/StreamingDataSourceV2Suite.scala | 3 +- 14 files changed, 114 insertions(+), 217 deletions(-) diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala index 9b3e78c..76c2598 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala @@ -21,7 +21,7 @@ import org.apache.kafka.clients.producer.ProducerRecord import org.apache.spark.sql.Dataset import org.apache.spark.sql.execution.datasources.v2.ContinuousScanExec -import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger +import org.apache.spark.sql.execution.streaming.ContinuousTrigger import org.apache.spark.sql.streaming.Trigger // Run tests in KafkaSourceSuiteBase in continuous execution mode. diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index cb3b803..5978f88 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -372,7 +372,11 @@ object MimaExcludes { // [SPARK-26616][MLlib] Expose document frequency in IDFModel ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.feature.IDFModel.this"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf"), + + // [SPARK-28199][SS] Remove deprecated ProcessingTime + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime"), + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$") ) // Exclude rules for 2.4.x diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java index fd6f7be..1bd7b82 100644 --- a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java +++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java @@ -20,9 +20,10 @@ package org.apache.spark.sql.streaming; import java.util.concurrent.TimeUnit; import org.apache.spark.annotation.Evolving; +import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger; import scala.concurrent.duration.Duration; -import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger; +import org.apache.spark.sql.execution.streaming.ContinuousTrigger; import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; /** @@ -40,7 +41,7 @@ public class Trigger { * @since 2.2.0 */ public static Trigger ProcessingTime(long intervalMs) { - return ProcessingTime.create(intervalMs, TimeUnit.MILLISECONDS); + return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS); } /** @@ -56,7 +57,7 @@ public class Trigger { * @since 2.2.0 */ public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { - return ProcessingTime.create(interval, timeUnit); + return ProcessingTimeTrigger.create(interval, timeUnit); } /** @@ -71,7 +72,7 @@ public class Trigger { * @since 2.2.0 */ public static Trigger ProcessingTime(Duration interval) { - return ProcessingTime.apply(interval); + return ProcessingTimeTrigger.apply(interval); } /** @@ -84,7 +85,7 @@ public class Trigger { * @since 2.2.0 */ public static Trigger ProcessingTime(String interval) { - return ProcessingTime.apply(interval); + return ProcessingTimeTrigger.apply(interval); } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala index fd2638f..e7eb2cb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala @@ -30,7 +30,7 @@ import org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchSt import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.v2._ import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, Offset => OffsetV2, SparkDataStream} -import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.apache.spark.util.Clock class MicroBatchExecution( @@ -51,7 +51,7 @@ class MicroBatchExecution( @volatile protected var sources: Seq[SparkDataStream] = Seq.empty private val triggerExecutor = trigger match { - case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) + case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock) case OneTimeTrigger => OneTimeExecutor() case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala index d188566..0884710 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala @@ -18,7 +18,6 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.internal.Logging -import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.util.{Clock, SystemClock} trait TriggerExecutor { @@ -43,10 +42,12 @@ case class OneTimeExecutor() extends TriggerExecutor { /** * A trigger executor that runs a batch every `intervalMs` milliseconds. */ -case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock = new SystemClock()) +case class ProcessingTimeExecutor( + processingTimeTrigger: ProcessingTimeTrigger, + clock: Clock = new SystemClock()) extends TriggerExecutor with Logging { - private val intervalMs = processingTime.intervalMs + private val intervalMs = processingTimeTrigger.intervalMs require(intervalMs >= 0) override def execute(triggerHandler: () => Boolean): Unit = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala index 4c0db3c..aede088 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala @@ -17,8 +17,31 @@ package org.apache.spark.sql.execution.streaming +import java.util.concurrent.TimeUnit + +import scala.concurrent.duration.Duration + import org.apache.spark.annotation.{Evolving, Experimental} import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.unsafe.types.CalendarInterval + +private object Triggers { + def validate(intervalMs: Long): Unit = { + require(intervalMs >= 0, "the interval of trigger should not be negative") + } + + def convert(interval: String): Long = { + val cal = CalendarInterval.fromCaseInsensitiveString(interval) + if (cal.months > 0) { + throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") + } + TimeUnit.MICROSECONDS.toMillis(cal.microseconds) + } + + def convert(interval: Duration): Long = interval.toMillis + + def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval) +} /** * A [[Trigger]] that processes only one batch of data in a streaming query then terminates @@ -26,4 +49,62 @@ import org.apache.spark.sql.streaming.Trigger */ @Experimental @Evolving -case object OneTimeTrigger extends Trigger +private[sql] case object OneTimeTrigger extends Trigger + +/** + * A [[Trigger]] that runs a query periodically based on the processing time. If `interval` is 0, + * the query will run as fast as possible. + */ +@Evolving +private[sql] case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger { + Triggers.validate(intervalMs) +} + +private[sql] object ProcessingTimeTrigger { + import Triggers._ + + def apply(interval: String): ProcessingTimeTrigger = { + ProcessingTimeTrigger(convert(interval)) + } + + def apply(interval: Duration): ProcessingTimeTrigger = { + ProcessingTimeTrigger(convert(interval)) + } + + def create(interval: String): ProcessingTimeTrigger = { + apply(interval) + } + + def create(interval: Long, unit: TimeUnit): ProcessingTimeTrigger = { + ProcessingTimeTrigger(convert(interval, unit)) + } +} + +/** + * A [[Trigger]] that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. + */ +@Evolving +private[sql] case class ContinuousTrigger(intervalMs: Long) extends Trigger { + Triggers.validate(intervalMs) +} + +private[sql] object ContinuousTrigger { + import Triggers._ + + def apply(interval: String): ContinuousTrigger = { + ContinuousTrigger(convert(interval)) + } + + def apply(interval: Duration): ContinuousTrigger = { + ContinuousTrigger(convert(interval)) + } + + def create(interval: String): ContinuousTrigger = { + apply(interval) + } + + def create(interval: Long, unit: TimeUnit): ContinuousTrigger = { + ContinuousTrigger(convert(interval, unit)) + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala index 509b103..f6d156d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _} import org.apache.spark.sql.sources.v2 import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, TableCapability} import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, PartitionOffset} -import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger} +import org.apache.spark.sql.streaming.{OutputMode, Trigger} import org.apache.spark.util.Clock class ContinuousExecution( @@ -93,7 +93,7 @@ class ContinuousExecution( } private val triggerExecutor = trigger match { - case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), triggerClock) + case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTimeTrigger(t), triggerClock) case _ => throw new IllegalStateException(s"Unsupported type of trigger: $trigger") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala deleted file mode 100644 index bd343f3..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming.continuous - -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration.Duration - -import org.apache.spark.annotation.Evolving -import org.apache.spark.sql.streaming.Trigger -import org.apache.spark.unsafe.types.CalendarInterval - -/** - * A [[Trigger]] that continuously processes streaming data, asynchronously checkpointing at - * the specified interval. - */ -@Evolving -case class ContinuousTrigger(intervalMs: Long) extends Trigger { - require(intervalMs >= 0, "the interval of trigger should not be negative") -} - -private[sql] object ContinuousTrigger { - def apply(interval: String): ContinuousTrigger = { - val cal = CalendarInterval.fromCaseInsensitiveString(interval) - if (cal.months > 0) { - throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") - } - new ContinuousTrigger(TimeUnit.MICROSECONDS.toMillis(cal.microseconds)) - } - - def apply(interval: Duration): ContinuousTrigger = { - ContinuousTrigger(interval.toMillis) - } - - def create(interval: String): ContinuousTrigger = { - apply(interval) - } - - def create(interval: Long, unit: TimeUnit): ContinuousTrigger = { - ContinuousTrigger(unit.toMillis(interval)) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index d051cf9..36104d7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.DDLUtils import org.apache.spark.sql.execution.datasources.DataSource import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger import org.apache.spark.sql.execution.streaming.sources._ import org.apache.spark.sql.sources.v2.{SupportsWrite, TableProvider} import org.apache.spark.sql.sources.v2.TableCapability._ diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala deleted file mode 100644 index 417d698..0000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala +++ /dev/null @@ -1,133 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming - -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration.Duration - -import org.apache.spark.annotation.Evolving -import org.apache.spark.unsafe.types.CalendarInterval - -/** - * A trigger that runs a query periodically based on the processing time. If `interval` is 0, - * the query will run as fast as possible. - * - * Scala Example: - * {{{ - * df.writeStream.trigger(ProcessingTime("10 seconds")) - * - * import scala.concurrent.duration._ - * df.writeStream.trigger(ProcessingTime(10.seconds)) - * }}} - * - * Java Example: - * {{{ - * df.writeStream.trigger(ProcessingTime.create("10 seconds")) - * - * import java.util.concurrent.TimeUnit - * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) - * }}} - * - * @since 2.0.0 - */ -@Evolving -@deprecated("use Trigger.ProcessingTime(intervalMs)", "2.2.0") -case class ProcessingTime(intervalMs: Long) extends Trigger { - require(intervalMs >= 0, "the interval of trigger should not be negative") -} - -/** - * Used to create [[ProcessingTime]] triggers for [[StreamingQuery]]s. - * - * @since 2.0.0 - */ -@Evolving -@deprecated("use Trigger.ProcessingTime(intervalMs)", "2.2.0") -object ProcessingTime { - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * df.writeStream.trigger(ProcessingTime("10 seconds")) - * }}} - * - * @since 2.0.0 - * @deprecated use Trigger.ProcessingTime(interval) - */ - @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0") - def apply(interval: String): ProcessingTime = { - val cal = CalendarInterval.fromCaseInsensitiveString(interval) - if (cal.months > 0) { - throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") - } - new ProcessingTime(TimeUnit.MICROSECONDS.toMillis(cal.microseconds)) - } - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * import scala.concurrent.duration._ - * df.writeStream.trigger(ProcessingTime(10.seconds)) - * }}} - * - * @since 2.0.0 - * @deprecated use Trigger.ProcessingTime(interval) - */ - @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0") - def apply(interval: Duration): ProcessingTime = { - new ProcessingTime(interval.toMillis) - } - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * df.writeStream.trigger(ProcessingTime.create("10 seconds")) - * }}} - * - * @since 2.0.0 - * @deprecated use Trigger.ProcessingTime(interval) - */ - @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0") - def create(interval: String): ProcessingTime = { - apply(interval) - } - - /** - * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as fast as possible. - * - * Example: - * {{{ - * import java.util.concurrent.TimeUnit - * df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS)) - * }}} - * - * @since 2.0.0 - * @deprecated use Trigger.ProcessingTime(interval, unit) - */ - @deprecated("use Trigger.ProcessingTime(interval, unit)", "2.2.0") - def create(interval: Long, unit: TimeUnit): ProcessingTime = { - new ProcessingTime(unit.toMillis(interval)) - } -} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala index 1705d56..abee5f6 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala @@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession} import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, ContinuousTrigger} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS diff --git a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala index 623a1b6..e33870d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala @@ -22,12 +22,15 @@ import java.util.concurrent.TimeUnit import scala.concurrent.duration._ import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.streaming.{ProcessingTime, Trigger} +import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger +import org.apache.spark.sql.streaming.Trigger class ProcessingTimeSuite extends SparkFunSuite { test("create") { - def getIntervalMs(trigger: Trigger): Long = trigger.asInstanceOf[ProcessingTime].intervalMs + def getIntervalMs(trigger: Trigger): Long = { + trigger.asInstanceOf[ProcessingTimeTrigger].intervalMs + } assert(getIntervalMs(Trigger.ProcessingTime(10.seconds)) === 10 * 1000) assert(getIntervalMs(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) === 10 * 1000) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala index 723764c..c0fd3fe 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala @@ -24,7 +24,6 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkFunSuite -import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.sql.streaming.util.StreamManualClock class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits { @@ -35,7 +34,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits { val timeout = 10.seconds test("nextBatchTime") { - val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100)) + val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTimeTrigger(100)) assert(processingTimeExecutor.nextBatchTime(0) === 100) assert(processingTimeExecutor.nextBatchTime(1) === 100) assert(processingTimeExecutor.nextBatchTime(99) === 100) @@ -49,7 +48,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits { val clock = new StreamManualClock() @volatile var continueExecuting = true @volatile var clockIncrementInTrigger = 0L - val executor = ProcessingTimeExecutor(ProcessingTime("1000 milliseconds"), clock) + val executor = ProcessingTimeExecutor(ProcessingTimeTrigger("1000 milliseconds"), clock) val executorThread = new Thread() { override def run(): Unit = { executor.execute(() => { @@ -97,7 +96,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits { test("calling nextBatchTime with the result of a previous call should return the next interval") { val intervalMS = 100 - val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMS)) + val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTimeTrigger(intervalMS)) val ITERATION = 10 var nextBatchTime: Long = 0 @@ -111,7 +110,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits { private def testBatchTermination(intervalMs: Long): Unit = { var batchCounts = 0 - val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(intervalMs)) + val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTimeTrigger(intervalMs)) processingTimeExecutor.execute(() => { batchCounts += 1 // If the batch termination works correctly, batchCounts should be 3 after `execute` @@ -130,7 +129,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits { @volatile var batchFallingBehindCalled = false val t = new Thread() { override def run(): Unit = { - val processingTimeExecutor = new ProcessingTimeExecutor(ProcessingTime(100), clock) { + val processingTimeExecutor = new ProcessingTimeExecutor(ProcessingTimeTrigger(100), clock) { override def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit = { batchFallingBehindCalled = true } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala index 79016b5..4db605e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala @@ -24,8 +24,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.{DataFrame, SQLContext} import org.apache.spark.sql.execution.datasources.DataSource -import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, StreamingQueryWrapper} -import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger +import org.apache.spark.sql.execution.streaming.{ContinuousTrigger, RateStreamOffset, Sink, StreamingQueryWrapper} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider} import org.apache.spark.sql.sources.v2._ --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org