This is an automated email from the ASF dual-hosted git repository. hvanhovell pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 4eea89d3396 [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api 4eea89d3396 is described below commit 4eea89d339649152a1afcd8b7a32020454e71d42 Author: Herman van Hovell <her...@databricks.com> AuthorDate: Tue Aug 8 00:42:13 2023 +0200 [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api ### What changes were proposed in this pull request? This PR moves `Triggers.scala` and `Trigger.scala` from `sql/core` to `sql/api`, and it removes the duplicates from the connect scala client. ### Why are the changes needed? Not really needed, just some deduplication. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #42368 from hvanhovell/SPARK-44692. Authored-by: Herman van Hovell <her...@databricks.com> Signed-off-by: Herman van Hovell <her...@databricks.com> --- .../org/apache/spark/sql/streaming/Trigger.java | 180 --------------------- dev/checkstyle-suppressions.xml | 4 +- project/MimaExcludes.scala | 4 +- .../org/apache/spark/sql/streaming/Trigger.java | 0 .../spark/sql/execution/streaming/Triggers.scala | 6 +- .../spark/sql/execution/streaming/Triggers.scala | 113 ------------- 6 files changed, 6 insertions(+), 301 deletions(-) diff --git a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java deleted file mode 100644 index 27ffe67d990..00000000000 --- a/connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java +++ /dev/null @@ -1,180 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.streaming; - -import java.util.concurrent.TimeUnit; - -import scala.concurrent.duration.Duration; - -import org.apache.spark.annotation.Evolving; -import org.apache.spark.sql.execution.streaming.AvailableNowTrigger$; -import org.apache.spark.sql.execution.streaming.ContinuousTrigger; -import org.apache.spark.sql.execution.streaming.OneTimeTrigger$; -import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger; - -/** - * Policy used to indicate how often results should be produced by a [[StreamingQuery]]. - * - * @since 3.5.0 - */ -@Evolving -public class Trigger { - // This is a copy of the same class in sql/core/.../streaming/Trigger.java - - /** - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `interval` is 0, the query will run as fast as possible. - * - * @since 3.5.0 - */ - public static Trigger ProcessingTime(long intervalMs) { - return ProcessingTimeTrigger.create(intervalMs, TimeUnit.MILLISECONDS); - } - - /** - * (Java-friendly) - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `interval` is 0, the query will run as fast as possible. - * - * {{{ - * import java.util.concurrent.TimeUnit - * df.writeStream().trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS)) - * }}} - * - * @since 3.5.0 - */ - public static Trigger ProcessingTime(long interval, TimeUnit timeUnit) { - return ProcessingTimeTrigger.create(interval, timeUnit); - } - - /** - * (Scala-friendly) - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `duration` is 0, the query will run as fast as possible. - * - * {{{ - * import scala.concurrent.duration._ - * df.writeStream.trigger(Trigger.ProcessingTime(10.seconds)) - * }}} - * @since 3.5.0 - */ - public static Trigger ProcessingTime(Duration interval) { - return ProcessingTimeTrigger.apply(interval); - } - - /** - * A trigger policy that runs a query periodically based on an interval in processing time. - * If `interval` is effectively 0, the query will run as fast as possible. - * - * {{{ - * df.writeStream.trigger(Trigger.ProcessingTime("10 seconds")) - * }}} - * @since 3.5.0 - */ - public static Trigger ProcessingTime(String interval) { - return ProcessingTimeTrigger.apply(interval); - } - - /** - * A trigger that processes all available data in a single batch then terminates the query. - * - * @since 3.5.0 - * @deprecated This is deprecated as of Spark 3.4.0. Use {@link #AvailableNow()} to leverage - * better guarantee of processing, fine-grained scale of batches, and better gradual - * processing of watermark advancement including no-data batch. - * See the NOTES in {@link #AvailableNow()} for details. - */ - @Deprecated - public static Trigger Once() { - return OneTimeTrigger$.MODULE$; - } - - /** - * A trigger that processes all available data at the start of the query in one or multiple - * batches, then terminates the query. - * - * Users are encouraged to set the source options to control the size of the batch as similar as - * controlling the size of the batch in {@link #ProcessingTime(long)} trigger. - * - * NOTES: - * - This trigger provides a strong guarantee of processing: regardless of how many batches were - * left over in previous run, it ensures all available data at the time of execution gets - * processed before termination. All uncommitted batches will be processed first. - * - Watermark gets advanced per each batch, and no-data batch gets executed before termination - * if the last batch advances the watermark. This helps to maintain smaller and predictable - * state size and smaller latency on the output of stateful operators. - * - * @since 3.5.0 - */ - public static Trigger AvailableNow() { - return AvailableNowTrigger$.MODULE$; - } - - /** - * A trigger that continuously processes streaming data, asynchronously checkpointing at - * the specified interval. - * - * @since 3.5.0 - */ - public static Trigger Continuous(long intervalMs) { - return ContinuousTrigger.apply(intervalMs); - } - - /** - * A trigger that continuously processes streaming data, asynchronously checkpointing at - * the specified interval. - * - * {{{ - * import java.util.concurrent.TimeUnit - * df.writeStream.trigger(Trigger.Continuous(10, TimeUnit.SECONDS)) - * }}} - * - * @since 3.5.0 - */ - public static Trigger Continuous(long interval, TimeUnit timeUnit) { - return ContinuousTrigger.create(interval, timeUnit); - } - - /** - * (Scala-friendly) - * A trigger that continuously processes streaming data, asynchronously checkpointing at - * the specified interval. - * - * {{{ - * import scala.concurrent.duration._ - * df.writeStream.trigger(Trigger.Continuous(10.seconds)) - * }}} - * @since 3.5.0 - */ - public static Trigger Continuous(Duration interval) { - return ContinuousTrigger.apply(interval); - } - - /** - * A trigger that continuously processes streaming data, asynchronously checkpointing at - * the specified interval. - * - * {{{ - * df.writeStream.trigger(Trigger.Continuous("10 seconds")) - * }}} - * @since 3.5.0 - */ - public static Trigger Continuous(String interval) { - return ContinuousTrigger.apply(interval); - } -} diff --git a/dev/checkstyle-suppressions.xml b/dev/checkstyle-suppressions.xml index 44876fe6912..8ba1ff1b3b1 100644 --- a/dev/checkstyle-suppressions.xml +++ b/dev/checkstyle-suppressions.xml @@ -57,9 +57,7 @@ <suppress checks="MethodName" files="sql/api/src/main/java/org/apache/spark/sql/streaming/GroupStateTimeout.java"/> <suppress checks="MethodName" - files="sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/> - <suppress checks="MethodName" - files="connector/connect/client/jvm/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/> + files="sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java"/> <suppress checks="LineLength" files="src/main/java/org/apache/spark/sql/api/java/*"/> </suppressions> diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 9e5eb66ce94..8da132f5de3 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -77,7 +77,9 @@ object MimaExcludes { ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.GroupStateTimeout"), ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.OutputMode"), // [SPARK-44198][CORE] Support propagation of the log level to the executors - ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$SparkAppConfig$") + ProblemFilters.exclude[MissingTypesProblem]("org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages$SparkAppConfig$"), + // [SPARK-44692][CONNECT][SQL] Move Trigger(s) to sql/api + ProblemFilters.exclude[MissingClassProblem]("org.apache.spark.sql.streaming.Trigger") ) // Default exclude rules diff --git a/sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java b/sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java similarity index 100% rename from sql/core/src/main/java/org/apache/spark/sql/streaming/Trigger.java rename to sql/api/src/main/java/org/apache/spark/sql/streaming/Trigger.java diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala similarity index 96% rename from connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala rename to sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala index ad19ad17805..37c5b314978 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala @@ -28,8 +28,6 @@ import org.apache.spark.sql.streaming.Trigger import org.apache.spark.unsafe.types.UTF8String private object Triggers { - // This is a copy of the same class in sql/core/...execution/streaming/Triggers.scala - def validate(intervalMs: Long): Unit = { require(intervalMs >= 0, "the interval of trigger should not be negative") } @@ -87,8 +85,8 @@ object ProcessingTimeTrigger { } /** - * A [[Trigger]] that continuously processes streaming data, asynchronously checkpointing at the - * specified interval. + * A [[Trigger]] that continuously processes streaming data, asynchronously checkpointing at + * the specified interval. */ case class ContinuousTrigger(intervalMs: Long) extends Trigger { Triggers.validate(intervalMs) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala deleted file mode 100644 index e6d1381b2b6..00000000000 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/Triggers.scala +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.execution.streaming - -import java.util.concurrent.TimeUnit - -import scala.concurrent.duration.Duration - -import org.apache.spark.sql.catalyst.util.DateTimeConstants.MICROS_PER_DAY -import org.apache.spark.sql.catalyst.util.DateTimeUtils.microsToMillis -import org.apache.spark.sql.catalyst.util.IntervalUtils -import org.apache.spark.sql.streaming.Trigger -import org.apache.spark.unsafe.types.UTF8String - -private object Triggers { - def validate(intervalMs: Long): Unit = { - require(intervalMs >= 0, "the interval of trigger should not be negative") - } - - def convert(interval: String): Long = { - val cal = IntervalUtils.stringToInterval(UTF8String.fromString(interval)) - if (cal.months != 0) { - throw new IllegalArgumentException(s"Doesn't support month or year interval: $interval") - } - val microsInDays = Math.multiplyExact(cal.days, MICROS_PER_DAY) - microsToMillis(Math.addExact(cal.microseconds, microsInDays)) - } - - def convert(interval: Duration): Long = interval.toMillis - - def convert(interval: Long, unit: TimeUnit): Long = unit.toMillis(interval) -} - -/** - * A [[Trigger]] that processes all available data in one batch then terminates the query. - */ -case object OneTimeTrigger extends Trigger - -/** - * A [[Trigger]] that processes all available data in multiple batches then terminates the query. - */ -case object AvailableNowTrigger extends Trigger - -/** - * A [[Trigger]] that runs a query periodically based on the processing time. If `interval` is 0, - * the query will run as fast as possible. - */ -case class ProcessingTimeTrigger(intervalMs: Long) extends Trigger { - Triggers.validate(intervalMs) -} - -object ProcessingTimeTrigger { - import Triggers._ - - def apply(interval: String): ProcessingTimeTrigger = { - ProcessingTimeTrigger(convert(interval)) - } - - def apply(interval: Duration): ProcessingTimeTrigger = { - ProcessingTimeTrigger(convert(interval)) - } - - def create(interval: String): ProcessingTimeTrigger = { - apply(interval) - } - - def create(interval: Long, unit: TimeUnit): ProcessingTimeTrigger = { - ProcessingTimeTrigger(convert(interval, unit)) - } -} - -/** - * A [[Trigger]] that continuously processes streaming data, asynchronously checkpointing at - * the specified interval. - */ -case class ContinuousTrigger(intervalMs: Long) extends Trigger { - Triggers.validate(intervalMs) -} - -object ContinuousTrigger { - import Triggers._ - - def apply(interval: String): ContinuousTrigger = { - ContinuousTrigger(convert(interval)) - } - - def apply(interval: Duration): ContinuousTrigger = { - ContinuousTrigger(convert(interval)) - } - - def create(interval: String): ContinuousTrigger = { - apply(interval) - } - - def create(interval: Long, unit: TimeUnit): ContinuousTrigger = { - ContinuousTrigger(convert(interval, unit)) - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org