This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch branch-3.5
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.5 by this push:
     new d8f02274c38 [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api
d8f02274c38 is described below

commit d8f02274c38c027e2d56f5158ce63f6e74255d2d
Author: Herman van Hovell <her...@databricks.com>
AuthorDate: Tue Aug 8 00:42:13 2023 +0200

    [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api
    
    This PR moves `Triggers.scala` and `Trigger.scala` from `sql/core` to 
`sql/api`, and it removes the duplicates from the connect scala client.
    
    Not really needed, just some deduplication.
    
    No.
    
    Existing tests.
    
    Closes #42368 from hvanhovell/SPARK-44692.
    
    Authored-by: Herman van Hovell <her...@databricks.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
    (cherry picked from commit 4eea89d339649152a1afcd8b7a32020454e71d42)
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../org/apache/spark/sql/streaming/Trigger.java    | 180 ---------------------
 dev/checkstyle-suppressions.xml                    |   4 +-
 project/MimaExcludes.scala                         |   4 +-
 .../org/apache/spark/sql/streaming/Trigger.java    |   0
 .../spark/sql/execution/streaming/Triggers.scala   |   6 +-
 .../spark/sql/execution/streaming/Triggers.scala   | 113 -------------
 6 files changed, 6 insertions(+), 301 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
 
b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
deleted file mode 100644
index 27ffe67d990..00000000000
--- 
a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java
+++ /dev/null
@@ -1,180 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.streaming;
-
-import java.util.concurrent.TimeUnit;
-
-import scala.concurrent.duration.Duration;
-
-import org.apache.spark.annotation.Evolving;
-import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$;
-import org.apache.spark.sql.execution.streaming.ContinuousTrigger;
-import org.apache.spark.sql.execution.streaming.OneTimeTrigger$;
-import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger;
-
-/**
- * Policy used to indicate how often results should be produced by a 
[[StreamingQuery]].
- *
- * @since 3.5.0
- */
-@Evolving
-public class Trigger {
-  // This is a copy of the same class in sql/core/.../streaming/Trigger.java
-
-  /**
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `interval` is 0, the query will run as fast as possible.
-   *
-   * @since 3.5.0
-   */
-  public static Trigger ProcessingTime(long intervalMs) {
-      return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS);
-  }
-
-  /**
-   * (Java-friendly)
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `interval` is 0, the query will run as fast as possible.
-   *
-   * {{{
-   *    import java.util.concurrent.TimeUnit
-   *    df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
-   * }}}
-   *
-   * @since 3.5.0
-   */
-  public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) {
-      return ProcessingTimeTrigger.create(interval, timeUnit);
-  }
-
-  /**
-   * (Scala-friendly)
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `duration` is 0, the query will run as fast as possible.
-   *
-   * {{{
-   *    import scala.concurrent.duration._
-   *    df.writeStream.trigger(Trigger.ProcessingTime(10.seconds))
-   * }}}
-   * @since 3.5.0
-   */
-  public static Trigger ProcessingTime(Duration interval) {
-      return ProcessingTimeTrigger.apply(interval);
-  }
-
-  /**
-   * A trigger policy that runs a query periodically based on an interval in 
processing time.
-   * If `interval` is effectively 0, the query will run as fast as possible.
-   *
-   * {{{
-   *    df.writeStream.trigger(Trigger.ProcessingTime("10 seconds"))
-   * }}}
-   * @since 3.5.0
-   */
-  public static Trigger ProcessingTime(String interval) {
-      return ProcessingTimeTrigger.apply(interval);
-  }
-
-  /**
-   * A trigger that processes all available data in a single batch then 
terminates the query.
-   *
-   * @since 3.5.0
-   * @deprecated This is deprecated as of Spark 3.4.0. Use {@link 
#AvailableNow()} to leverage
-   *             better guarantee of processing, fine-grained scale of 
batches, and better gradual
-   *             processing of watermark advancement including no-data batch.
-   *             See the NOTES in {@link #AvailableNow()} for details.
-   */
-  @Deprecated
-  public static Trigger Once() {
-    return OneTimeTrigger$.MODULE$;
-  }
-
-  /**
-   * A trigger that processes all available data at the start of the query in 
one or multiple
-   * batches, then terminates the query.
-   *
-   * Users are encouraged to set the source options to control the size of the 
batch as similar as
-   * controlling the size of the batch in {@link #ProcessingTime(long)} 
trigger.
-   *
-   * NOTES:
-   * - This trigger provides a strong guarantee of processing: regardless of 
how many batches were
-   *   left over in previous run, it ensures all available data at the time of 
execution gets
-   *   processed before termination. All uncommitted batches will be processed 
first.
-   * - Watermark gets advanced per each batch, and no-data batch gets executed 
before termination
-   *   if the last batch advances the watermark. This helps to maintain 
smaller and predictable
-   *   state size and smaller latency on the output of stateful operators.
-   *
-   * @since 3.5.0
-   */
-  public static Trigger AvailableNow() {
-    return AvailableNowTrigger$.MODULE$;
-  }
-
-  /**
-   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
-   * the specified interval.
-   *
-   * @since 3.5.0
-   */
-  public static Trigger Continuous(long intervalMs) {
-    return ContinuousTrigger.apply(intervalMs);
-  }
-
-  /**
-   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
-   * the specified interval.
-   *
-   * {{{
-   *    import java.util.concurrent.TimeUnit
-   *    df.writeStream.trigger(Trigger.Continuous(10, TimeUnit.SECONDS))
-   * }}}
-   *
-   * @since 3.5.0
-   */
-  public static Trigger Continuous(long interval, TimeUnit timeUnit) {
-    return ContinuousTrigger.create(interval, timeUnit);
-  }
-
-  /**
-   * (Scala-friendly)
-   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
-   * the specified interval.
-   *
-   * {{{
-   *    import scala.concurrent.duration._
-   *    df.writeStream.trigger(Trigger.Continuous(10.seconds))
-   * }}}
-   * @since 3.5.0
-   */
-  public static Trigger Continuous(Duration interval) {
-    return ContinuousTrigger.apply(interval);
-  }
-
-  /**
-   * A trigger that continuously processes streaming data, asynchronously 
checkpointing at
-   * the specified interval.
-   *
-   * {{{
-   *    df.writeStream.trigger(Trigger.Continuous("10 seconds"))
-   * }}}
-   * @since 3.5.0
-   */
-  public static Trigger Continuous(String interval) {
-    return ContinuousTrigger.apply(interval);
-  }
-}
diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml
index 44876fe6912..8ba1ff1b3b1 100644
--- a/dev/checkstyle-suppressions.xml
+++ b/dev/checkstyle-suppressions.xml
@@ -57,9 +57,7 @@
     <suppress checks="MethodName"
               
files="sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/>
     <suppress checks="MethodName"
-              
files="sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/>
-    <suppress checks="MethodName"
-              
files="connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/>
+              
files="sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/>
     <suppress checks="LineLength"
               files="src/main/java/org/apache/spark/sql/api/java/*"/>
 </suppressions>
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index c2ccb680cbd..3848caacbd2 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -71,7 +71,9 @@ object MimaExcludes {
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.RowFactory"),
     // [SPARK-44535][CONNECT][SQL] Move required Streaming API to sql/api
     
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.GroupStateTimeout"),
-    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.OutputMode")
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.OutputMode"),
+    // [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api
+    
ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.Trigger")
   )
 
   // Default exclude rules
diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java 
b/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java
similarity index 100%
rename from sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java
rename to sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
 
b/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
similarity index 96%
rename from 
connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
rename to 
sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
index ad19ad17805..37c5b314978 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
+++ 
b/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
@@ -28,8 +28,6 @@ import org.apache.spark.sql.streaming.Trigger
 import org.apache.spark.unsafe.types.UTF8String
 
 private object Triggers {
-  // This is a copy of the same class in 
sql/core/...execution/streaming/Triggers.scala
-
   def validate(intervalMs: Long): Unit = {
     require(intervalMs >= 0, "the interval of trigger should not be negative")
   }
@@ -87,8 +85,8 @@ object ProcessingTimeTrigger {
 }
 
 /**
- * A [[Trigger]] that continuously processes streaming data, asynchronously 
checkpointing at the
- * specified interval.
+ * A [[Trigger]] that continuously processes streaming data, asynchronously 
checkpointing at
+ * the specified interval.
  */
 case class ContinuousTrigger(intervalMs: Long) extends Trigger {
   Triggers.validate(intervalMs)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
deleted file mode 100644
index e6d1381b2b6..00000000000
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.streaming
-
-import java.util.concurrent.TimeUnit
-
-import scala.concurrent.duration.Duration
-
-import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY
-import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToMillis
-import org.apache.spark.sql.catalyst.util.IntervalUtils
-import org.apache.spark.sql.streaming.Trigger
-import org.apache.spark.unsafe.types.UTF8String
-
-private object Triggers {
-  def validate(intervalMs: Long): Unit = {
-    require(intervalMs >= 0, "the interval of trigger should not be negative")
-  }
-
-  def convert(interval: String): Long = {
-    val cal = IntervalUtils.stringToInterval(UTF8String.fromString(interval))
-    if (cal.months != 0) {
-      throw new IllegalArgumentException(s"Doesn't support month or year 
interval: $interval")
-    }
-    val microsInDays = Math.multiplyExact(cal.days, MICROS_PER_DAY)
-    microsToMillis(Math.addExact(cal.microseconds, microsInDays))
-  }
-
-  def convert(interval: Duration): Long = interval.toMillis
-
-  def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval)
-}
-
-/**
- * A [[Trigger]] that processes all available data in one batch then 
terminates the query.
- */
-case object OneTimeTrigger extends Trigger
-
-/**
- * A [[Trigger]] that processes all available data in multiple batches then 
terminates the query.
- */
-case object AvailableNowTrigger extends Trigger
-
-/**
- * A [[Trigger]] that runs a query periodically based on the processing time. 
If `interval` is 0,
- * the query will run as fast as possible.
- */
-case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger {
-  Triggers.validate(intervalMs)
-}
-
-object ProcessingTimeTrigger {
-  import Triggers._
-
-  def apply(interval: String): ProcessingTimeTrigger = {
-    ProcessingTimeTrigger(convert(interval))
-  }
-
-  def apply(interval: Duration): ProcessingTimeTrigger = {
-    ProcessingTimeTrigger(convert(interval))
-  }
-
-  def create(interval: String): ProcessingTimeTrigger = {
-    apply(interval)
-  }
-
-  def create(interval: Long, unit: TimeUnit): ProcessingTimeTrigger = {
-    ProcessingTimeTrigger(convert(interval, unit))
-  }
-}
-
-/**
- * A [[Trigger]] that continuously processes streaming data, asynchronously 
checkpointing at
- * the specified interval.
- */
-case class ContinuousTrigger(intervalMs: Long) extends Trigger {
-  Triggers.validate(intervalMs)
-}
-
-object ContinuousTrigger {
-  import Triggers._
-
-  def apply(interval: String): ContinuousTrigger = {
-    ContinuousTrigger(convert(interval))
-  }
-
-  def apply(interval: Duration): ContinuousTrigger = {
-    ContinuousTrigger(convert(interval))
-  }
-
-  def create(interval: String): ContinuousTrigger = {
-    apply(interval)
-  }
-
-  def create(interval: Long, unit: TimeUnit): ContinuousTrigger = {
-    ContinuousTrigger(convert(interval, unit))
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to