This is an automated email from the ASF dual-hosted git repository.

srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 7548a88  [SPARK-28199][SS] Move Trigger implementations to 
Triggers.scala and avoid exposing these to the end users
7548a88 is described below

commit 7548a8826d6113121b6bda1ea99ca835374220b6
Author: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
AuthorDate: Sun Jul 14 14:46:01 2019 -0500

    [SPARK-28199][SS] Move Trigger implementations to Triggers.scala and avoid 
exposing these to the end users
    
    ## What changes were proposed in this pull request?
    
    This patch proposes moving all Trigger implementations to `Triggers.scala`, 
to avoid exposing these implementations to the end users and let end users only 
deal with `Trigger.xxx` static methods. This fits the intention of deprecation 
of `ProcessingTIme`, and we agree to move others without deprecation as this 
patch will be shipped in major version (Spark 3.0.0).
    
    ## How was this patch tested?
    
    UTs modified to work with newly introduced class.
    
    Closes #24996 from HeartSaVioR/SPARK-28199.
    
    Authored-by: Jungtaek Lim (HeartSaVioR) <kabh...@gmail.com>
    Signed-off-by: Sean Owen <sean.o...@databricks.com>
---
 .../sql/kafka010/KafkaContinuousSourceSuite.scala  |   2 +-
 project/MimaExcludes.scala                         |   6 +-
 .../org/apache/spark/sql/streaming/Trigger.java    |  11 +-
 .../execution/streaming/MicroBatchExecution.scala  |   4 +-
 .../sql/execution/streaming/TriggerExecutor.scala  |   7 +-
 .../spark/sql/execution/streaming/Triggers.scala   |  83 ++++++++++++-
 .../streaming/continuous/ContinuousExecution.scala |   4 +-
 .../streaming/continuous/ContinuousTrigger.scala   |  57 ---------
 .../spark/sql/streaming/DataStreamWriter.scala     |   1 -
 .../spark/sql/streaming/ProcessingTime.scala       | 133 ---------------------
 .../sql/streaming/StreamingQueryManager.scala      |   2 +-
 .../org/apache/spark/sql/ProcessingTimeSuite.scala |   7 +-
 .../streaming/ProcessingTimeExecutorSuite.scala    |  11 +-
 .../sources/StreamingDataSourceV2Suite.scala       |   3 +-
 14 files changed, 114 insertions(+), 217 deletions(-)

diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index 9b3e78c..76c2598 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -21,7 +21,7 @@ import org.apache.kafka.clients.producer.ProducerRecord
 
 import org.apache.spark.sql.Dataset
 import org.apache.spark.sql.execution.datasources.v2.ContinuousScanExec
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
+import org.apache.spark.sql.execution.streaming.ContinuousTrigger
 import org.apache.spark.sql.streaming.Trigger
 
 // Run tests in KafkaSourceSuiteBase in continuous execution mode.
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index cb3b803..5978f88 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -372,7 +372,11 @@ object MimaExcludes {
 
     // [SPARK-26616][MLlib] Expose document frequency in IDFModel
     
ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.mllib.feature.IDFModel.this"),
-    
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf")
+    
ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.spark.mllib.feature.IDF#DocumentFrequencyAggregator.idf"),
+
+    // [SPARK-28199][SS] Remove deprecated ProcessingTime
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime"),
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.ProcessingTime$")
   )
 
   // Exclude rules for 2.4.x
diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java 
b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
index fd6f7be..1bd7b82 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
@@ -20,9 +20,10 @@ package org.apache.spark.sql.streaming;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
 import scala.concurrent.duration.Duration;
 
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger;
+import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
 import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
 
 /**
@@ -40,7 +41,7 @@ public class Trigger {
    * @since 2.2.0
    */
   public static Trigger ProcessingTime(long intervalMs) {
-      return ProcessingTime.create(intervalMs, TimeUnit.MILLISECONDS);
+      return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
   }
 
   /**
@@ -56,7 +57,7 @@ public class Trigger {
    * @since 2.2.0
    */
   public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
-      return ProcessingTime.create(interval, timeUnit);
+      return ProcessingTimeTrigger.create(interval, timeUnit);
   }
 
   /**
@@ -71,7 +72,7 @@ public class Trigger {
    * @since 2.2.0
    */
   public static Trigger ProcessingTime(Duration interval) {
-      return ProcessingTime.apply(interval);
+      return ProcessingTimeTrigger.apply(interval);
   }
 
   /**
@@ -84,7 +85,7 @@ public class Trigger {
    * @since 2.2.0
    */
   public static Trigger ProcessingTime(String interval) {
-      return ProcessingTime.apply(interval);
+      return ProcessingTimeTrigger.apply(interval);
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
index fd2638f..e7eb2cb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/MicroBatchExecution.scala
@@ -30,7 +30,7 @@ import 
org.apache.spark.sql.execution.streaming.sources.{RateControlMicroBatchSt
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.v2._
 import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchStream, 
Offset => OffsetV2, SparkDataStream}
-import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.streaming.{OutputMode, Trigger}
 import org.apache.spark.util.Clock
 
 class MicroBatchExecution(
@@ -51,7 +51,7 @@ class MicroBatchExecution(
   @volatile protected var sources: Seq[SparkDataStream] = Seq.empty
 
   private val triggerExecutor = trigger match {
-    case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock)
+    case t: ProcessingTimeTrigger => ProcessingTimeExecutor(t, triggerClock)
     case OneTimeTrigger => OneTimeExecutor()
     case _ => throw new IllegalStateException(s"Unknown type of trigger: 
$trigger")
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
index d188566..0884710 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/TriggerExecutor.scala
@@ -18,7 +18,6 @@
 package org.apache.spark.sql.execution.streaming
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.streaming.ProcessingTime
 import org.apache.spark.util.{Clock, SystemClock}
 
 trait TriggerExecutor {
@@ -43,10 +42,12 @@ case class OneTimeExecutor() extends TriggerExecutor {
 /**
  * A trigger executor that runs a batch every `intervalMs` milliseconds.
  */
-case class ProcessingTimeExecutor(processingTime: ProcessingTime, clock: Clock 
= new SystemClock())
+case class ProcessingTimeExecutor(
+    processingTimeTrigger: ProcessingTimeTrigger,
+    clock: Clock = new SystemClock())
   extends TriggerExecutor with Logging {
 
-  private val intervalMs = processingTime.intervalMs
+  private val intervalMs = processingTimeTrigger.intervalMs
   require(intervalMs >= 0)
 
   override def execute(triggerHandler: () => Boolean): Unit = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
index 4c0db3c..aede088 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
@@ -17,8 +17,31 @@
 
 package org.apache.spark.sql.execution.streaming
 
+import java.util.concurrent.TimeUnit
+
+import scala.concurrent.duration.Duration
+
 import org.apache.spark.annotation.{Evolving, Experimental}
 import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.unsafe.types.CalendarInterval
+
+private object Triggers {
+  def validate(intervalMs: Long): Unit = {
+    require(intervalMs >= 0, "the interval of trigger should not be negative")
+  }
+
+  def convert(interval: String): Long = {
+    val cal = CalendarInterval.fromCaseInsensitiveString(interval)
+    if (cal.months > 0) {
+      throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
+    }
+    TimeUnit.MICROSECONDS.toMillis(cal.microseconds)
+  }
+
+  def convert(interval: Duration): Long = interval.toMillis
+
+  def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval)
+}
 
 /**
  * A [[Trigger]] that processes only one batch of data in a streaming query 
then terminates
@@ -26,4 +49,62 @@ import org.apache.spark.sql.streaming.Trigger
  */
 @Experimental
 @Evolving
-case object OneTimeTrigger extends Trigger
+private[sql] case object OneTimeTrigger extends Trigger
+
+/**
+ * A [[Trigger]] that runs a query periodically based on the processing time. 
If `interval` is 0,
+ * the query will run as fast as possible.
+ */
+@Evolving
+private[sql] case class ProcessingTimeTrigger(intervalMs: Long) extends 
Trigger {
+  Triggers.validate(intervalMs)
+}
+
+private[sql] object ProcessingTimeTrigger {
+  import Triggers._
+
+  def apply(interval: String): ProcessingTimeTrigger = {
+    ProcessingTimeTrigger(convert(interval))
+  }
+
+  def apply(interval: Duration): ProcessingTimeTrigger = {
+    ProcessingTimeTrigger(convert(interval))
+  }
+
+  def create(interval: String): ProcessingTimeTrigger = {
+    apply(interval)
+  }
+
+  def create(interval: Long, unit: TimeUnit): ProcessingTimeTrigger = {
+    ProcessingTimeTrigger(convert(interval, unit))
+  }
+}
+
+/**
+ * A [[Trigger]] that continuously processes streaming data, asynchronously 
checkpointing at
+ * the specified interval.
+ */
+@Evolving
+private[sql] case class ContinuousTrigger(intervalMs: Long) extends Trigger {
+  Triggers.validate(intervalMs)
+}
+
+private[sql] object ContinuousTrigger {
+  import Triggers._
+
+  def apply(interval: String): ContinuousTrigger = {
+    ContinuousTrigger(convert(interval))
+  }
+
+  def apply(interval: Duration): ContinuousTrigger = {
+    ContinuousTrigger(convert(interval))
+  }
+
+  def create(interval: String): ContinuousTrigger = {
+    apply(interval)
+  }
+
+  def create(interval: Long, unit: TimeUnit): ContinuousTrigger = {
+    ContinuousTrigger(convert(interval, unit))
+  }
+}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
index 509b103..f6d156d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousExecution.scala
@@ -34,7 +34,7 @@ import 
org.apache.spark.sql.execution.streaming.{StreamingRelationV2, _}
 import org.apache.spark.sql.sources.v2
 import org.apache.spark.sql.sources.v2.{SupportsRead, SupportsWrite, 
TableCapability}
 import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousStream, 
PartitionOffset}
-import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
+import org.apache.spark.sql.streaming.{OutputMode, Trigger}
 import org.apache.spark.util.Clock
 
 class ContinuousExecution(
@@ -93,7 +93,7 @@ class ContinuousExecution(
   }
 
   private val triggerExecutor = trigger match {
-    case ContinuousTrigger(t) => ProcessingTimeExecutor(ProcessingTime(t), 
triggerClock)
+    case ContinuousTrigger(t) => 
ProcessingTimeExecutor(ProcessingTimeTrigger(t), triggerClock)
     case _ => throw new IllegalStateException(s"Unsupported type of trigger: 
$trigger")
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
deleted file mode 100644
index bd343f3..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/ContinuousTrigger.scala
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming.continuous
-
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.duration.Duration
-
-import org.apache.spark.annotation.Evolving
-import org.apache.spark.sql.streaming.Trigger
-import org.apache.spark.unsafe.types.CalendarInterval
-
-/**
- * A [[Trigger]] that continuously processes streaming data, asynchronously 
checkpointing at
- * the specified interval.
- */
-@Evolving
-case class ContinuousTrigger(intervalMs: Long) extends Trigger {
-  require(intervalMs >= 0, "the interval of trigger should not be negative")
-}
-
-private[sql] object ContinuousTrigger {
-  def apply(interval: String): ContinuousTrigger = {
-    val cal = CalendarInterval.fromCaseInsensitiveString(interval)
-    if (cal.months > 0) {
-      throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
-    }
-    new ContinuousTrigger(TimeUnit.MICROSECONDS.toMillis(cal.microseconds))
-  }
-
-  def apply(interval: Duration): ContinuousTrigger = {
-    ContinuousTrigger(interval.toMillis)
-  }
-
-  def create(interval: String): ContinuousTrigger = {
-    apply(interval)
-  }
-
-  def create(interval: Long, unit: TimeUnit): ContinuousTrigger = {
-    ContinuousTrigger(unit.toMillis(interval))
-  }
-}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index d051cf9..36104d7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -29,7 +29,6 @@ import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Utils
 import org.apache.spark.sql.execution.streaming._
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
 import org.apache.spark.sql.execution.streaming.sources._
 import org.apache.spark.sql.sources.v2.{SupportsWrite, TableProvider}
 import org.apache.spark.sql.sources.v2.TableCapability._
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
deleted file mode 100644
index 417d698..0000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/ProcessingTime.scala
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming
-
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.duration.Duration
-
-import org.apache.spark.annotation.Evolving
-import org.apache.spark.unsafe.types.CalendarInterval
-
-/**
- * A trigger that runs a query periodically based on the processing time. If 
`interval` is 0,
- * the query will run as fast as possible.
- *
- * Scala Example:
- * {{{
- *   df.writeStream.trigger(ProcessingTime("10 seconds"))
- *
- *   import scala.concurrent.duration._
- *   df.writeStream.trigger(ProcessingTime(10.seconds))
- * }}}
- *
- * Java Example:
- * {{{
- *   df.writeStream.trigger(ProcessingTime.create("10 seconds"))
- *
- *   import java.util.concurrent.TimeUnit
- *   df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
- * }}}
- *
- * @since 2.0.0
- */
-@Evolving
-@deprecated("use Trigger.ProcessingTime(intervalMs)", "2.2.0")
-case class ProcessingTime(intervalMs: Long) extends Trigger {
-  require(intervalMs >= 0, "the interval of trigger should not be negative")
-}
-
-/**
- * Used to create [[ProcessingTime]] triggers for [[StreamingQuery]]s.
- *
- * @since 2.0.0
- */
-@Evolving
-@deprecated("use Trigger.ProcessingTime(intervalMs)", "2.2.0")
-object ProcessingTime {
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   df.writeStream.trigger(ProcessingTime("10 seconds"))
-   * }}}
-   *
-   * @since 2.0.0
-   * @deprecated use Trigger.ProcessingTime(interval)
-   */
-  @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0")
-  def apply(interval: String): ProcessingTime = {
-    val cal = CalendarInterval.fromCaseInsensitiveString(interval)
-    if (cal.months > 0) {
-      throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
-    }
-    new ProcessingTime(TimeUnit.MICROSECONDS.toMillis(cal.microseconds))
-  }
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   import scala.concurrent.duration._
-   *   df.writeStream.trigger(ProcessingTime(10.seconds))
-   * }}}
-   *
-   * @since 2.0.0
-   * @deprecated use Trigger.ProcessingTime(interval)
-   */
-  @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0")
-  def apply(interval: Duration): ProcessingTime = {
-    new ProcessingTime(interval.toMillis)
-  }
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   df.writeStream.trigger(ProcessingTime.create("10 seconds"))
-   * }}}
-   *
-   * @since 2.0.0
-   * @deprecated use Trigger.ProcessingTime(interval)
-   */
-  @deprecated("use Trigger.ProcessingTime(interval)", "2.2.0")
-  def create(interval: String): ProcessingTime = {
-    apply(interval)
-  }
-
-  /**
-   * Create a [[ProcessingTime]]. If `interval` is 0, the query will run as 
fast as possible.
-   *
-   * Example:
-   * {{{
-   *   import java.util.concurrent.TimeUnit
-   *   df.writeStream.trigger(ProcessingTime.create(10, TimeUnit.SECONDS))
-   * }}}
-   *
-   * @since 2.0.0
-   * @deprecated use Trigger.ProcessingTime(interval, unit)
-   */
-  @deprecated("use Trigger.ProcessingTime(interval, unit)", "2.2.0")
-  def create(interval: Long, unit: TimeUnit): ProcessingTime = {
-    new ProcessingTime(unit.toMillis(interval))
-  }
-}
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
index 1705d56..abee5f6 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/StreamingQueryManager.scala
@@ -31,7 +31,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, SparkSession}
 import org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker
 import org.apache.spark.sql.execution.streaming._
-import 
org.apache.spark.sql.execution.streaming.continuous.{ContinuousExecution, 
ContinuousTrigger}
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
 import org.apache.spark.sql.execution.streaming.state.StateStoreCoordinatorRef
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.internal.StaticSQLConf.STREAMING_QUERY_LISTENERS
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
index 623a1b6..e33870d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/ProcessingTimeSuite.scala
@@ -22,12 +22,15 @@ import java.util.concurrent.TimeUnit
 import scala.concurrent.duration._
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.streaming.{ProcessingTime, Trigger}
+import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
+import org.apache.spark.sql.streaming.Trigger
 
 class ProcessingTimeSuite extends SparkFunSuite {
 
   test("create") {
-    def getIntervalMs(trigger: Trigger): Long = 
trigger.asInstanceOf[ProcessingTime].intervalMs
+    def getIntervalMs(trigger: Trigger): Long = {
+      trigger.asInstanceOf[ProcessingTimeTrigger].intervalMs
+    }
 
     assert(getIntervalMs(Trigger.ProcessingTime(10.seconds)) === 10 * 1000)
     assert(getIntervalMs(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) === 10 
* 1000)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
index 723764c..c0fd3fe 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.scala
@@ -24,7 +24,6 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.scalatest.time.SpanSugar._
 
 import org.apache.spark.SparkFunSuite
-import org.apache.spark.sql.streaming.ProcessingTime
 import org.apache.spark.sql.streaming.util.StreamManualClock
 
 class ProcessingTimeExecutorSuite extends SparkFunSuite with TimeLimits {
@@ -35,7 +34,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with 
TimeLimits {
   val timeout = 10.seconds
 
   test("nextBatchTime") {
-    val processingTimeExecutor = ProcessingTimeExecutor(ProcessingTime(100))
+    val processingTimeExecutor = 
ProcessingTimeExecutor(ProcessingTimeTrigger(100))
     assert(processingTimeExecutor.nextBatchTime(0) === 100)
     assert(processingTimeExecutor.nextBatchTime(1) === 100)
     assert(processingTimeExecutor.nextBatchTime(99) === 100)
@@ -49,7 +48,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with 
TimeLimits {
     val clock = new StreamManualClock()
     @volatile var continueExecuting = true
     @volatile var clockIncrementInTrigger = 0L
-    val executor = ProcessingTimeExecutor(ProcessingTime("1000 milliseconds"), 
clock)
+    val executor = ProcessingTimeExecutor(ProcessingTimeTrigger("1000 
milliseconds"), clock)
     val executorThread = new Thread() {
       override def run(): Unit = {
         executor.execute(() => {
@@ -97,7 +96,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite with 
TimeLimits {
 
   test("calling nextBatchTime with the result of a previous call should return 
the next interval") {
     val intervalMS = 100
-    val processingTimeExecutor = 
ProcessingTimeExecutor(ProcessingTime(intervalMS))
+    val processingTimeExecutor = 
ProcessingTimeExecutor(ProcessingTimeTrigger(intervalMS))
 
     val ITERATION = 10
     var nextBatchTime: Long = 0
@@ -111,7 +110,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite 
with TimeLimits {
 
   private def testBatchTermination(intervalMs: Long): Unit = {
     var batchCounts = 0
-    val processingTimeExecutor = 
ProcessingTimeExecutor(ProcessingTime(intervalMs))
+    val processingTimeExecutor = 
ProcessingTimeExecutor(ProcessingTimeTrigger(intervalMs))
     processingTimeExecutor.execute(() => {
       batchCounts += 1
       // If the batch termination works correctly, batchCounts should be 3 
after `execute`
@@ -130,7 +129,7 @@ class ProcessingTimeExecutorSuite extends SparkFunSuite 
with TimeLimits {
     @volatile var batchFallingBehindCalled = false
     val t = new Thread() {
       override def run(): Unit = {
-        val processingTimeExecutor = new 
ProcessingTimeExecutor(ProcessingTime(100), clock) {
+        val processingTimeExecutor = new 
ProcessingTimeExecutor(ProcessingTimeTrigger(100), clock) {
           override def notifyBatchFallingBehind(realElapsedTimeMs: Long): Unit 
= {
             batchFallingBehindCalled = true
           }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
index 79016b5..4db605e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/sources/StreamingDataSourceV2Suite.scala
@@ -24,8 +24,7 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.{DataFrame, SQLContext}
 import org.apache.spark.sql.execution.datasources.DataSource
-import org.apache.spark.sql.execution.streaming.{RateStreamOffset, Sink, 
StreamingQueryWrapper}
-import org.apache.spark.sql.execution.streaming.continuous.ContinuousTrigger
+import org.apache.spark.sql.execution.streaming.{ContinuousTrigger, 
RateStreamOffset, Sink, StreamingQueryWrapper}
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.sources.{DataSourceRegister, StreamSinkProvider}
 import org.apache.spark.sql.sources.v2._


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to