This is an automated email from the ASF dual-hosted git repository.

hvanhovell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5a27dc9b617 [SPARK-44761][CONNECT] Support 
DataStreamWriter.foreachBatch(VoidFunction2)
5a27dc9b617 is described below

commit 5a27dc9b6174fedefd08bcfe8a5b42bdfde2b7f6
Author: Herman van Hovell <her...@databricks.com>
AuthorDate: Fri Aug 11 14:36:22 2023 +0200

    [SPARK-44761][CONNECT] Support DataStreamWriter.foreachBatch(VoidFunction2)
    
    ### What changes were proposed in this pull request?
    This PR adds the `DatastreamWriter.foreachBatch(VoidFunction2).
    
    ### Why are the changes needed?
    To increase binary compatibility with the APIs in `sql/core`.
    
    ### Does this PR introduce _any_ user-facing change?
    Yes. It adds a new method to DatastreamWriter.
    
    ### How was this patch tested?
    I modified an existing code path.
    
    Closes #42430 from hvanhovell/SPARK-44761.
    
    Lead-authored-by: Herman van Hovell <her...@databricks.com>
    Co-authored-by: Hyukjin Kwon <gurwls...@gmail.com>
    Signed-off-by: Herman van Hovell <her...@databricks.com>
---
 .../spark/sql/streaming/DataStreamWriter.scala     | 22 ++++++++++++++++++++--
 .../CheckConnectJvmClientCompatibility.scala       |  3 ---
 .../sql/streaming/ClientStreamingQuerySuite.scala  |  9 ++++++---
 .../apache/spark/sql/connect/common/UdfUtils.scala |  3 +++
 4 files changed, 29 insertions(+), 8 deletions(-)

diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index b9aa1f5bc58..54eb6e76140 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -25,13 +25,13 @@ import scala.collection.JavaConverters._
 import com.google.protobuf.ByteString
 
 import org.apache.spark.annotation.Evolving
+import org.apache.spark.api.java.function.VoidFunction2
 import org.apache.spark.connect.proto
 import org.apache.spark.connect.proto.Command
 import org.apache.spark.connect.proto.WriteStreamOperationStart
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, ForeachWriter}
-import org.apache.spark.sql.connect.common.DataTypeProtoConverter
-import org.apache.spark.sql.connect.common.ForeachWriterPacket
+import org.apache.spark.sql.connect.common.{DataTypeProtoConverter, 
ForeachWriterPacket, UdfUtils}
 import org.apache.spark.sql.execution.streaming.AvailableNowTrigger
 import org.apache.spark.sql.execution.streaming.ContinuousTrigger
 import org.apache.spark.sql.execution.streaming.OneTimeTrigger
@@ -247,6 +247,24 @@ final class DataStreamWriter[T] private[sql] (ds: 
Dataset[T]) extends Logging {
     this
   }
 
+  /**
+   * :: Experimental ::
+   *
+   * (Java-specific) Sets the output of the streaming query to be processed 
using the provided
+   * function. This is supported only in the micro-batch execution modes (that 
is, when the
+   * trigger is not continuous). In every micro-batch, the provided function 
will be called in
+   * every micro-batch with (i) the output rows as a Dataset and (ii) the 
batch identifier. The
+   * batchId can be used to deduplicate and transactionally write the output 
(that is, the
+   * provided Dataset) to external systems. The output Dataset is guaranteed 
to be exactly the
+   * same for the same batchId (assuming all operations are deterministic in 
the query).
+   *
+   * @since 3.5.0
+   */
+  @Evolving
+  def foreachBatch(function: VoidFunction2[Dataset[T], java.lang.Long]): 
DataStreamWriter[T] = {
+    foreachBatch(UdfUtils.foreachBatchFuncToScalaFunc(function))
+  }
+
   /**
    * Starts the execution of the streaming query, which will continually 
output results to the
    * given path as new data arrives. The returned [[StreamingQuery]] object 
can be used to
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
index 3fc02d7c397..04b162eceec 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/client/CheckConnectJvmClientCompatibility.scala
@@ -234,9 +234,6 @@ object CheckConnectJvmClientCompatibility {
       // DataStreamWriter
       ProblemFilters.exclude[MissingClassProblem](
         "org.apache.spark.sql.streaming.DataStreamWriter$"),
-      ProblemFilters.exclude[Problem](
-        "org.apache.spark.sql.streaming.DataStreamWriter.foreachBatch" // 
TODO(SPARK-42944)
-      ),
       ProblemFilters.exclude[Problem](
         "org.apache.spark.sql.streaming.DataStreamWriter.SOURCE*" // These are 
constant vals.
       ),
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
index ab92431bc11..944a999a860 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/streaming/ClientStreamingQuerySuite.scala
@@ -27,6 +27,7 @@ import org.scalatest.concurrent.Eventually.eventually
 import org.scalatest.concurrent.Futures.timeout
 import org.scalatest.time.SpanSugar._
 
+import org.apache.spark.api.java.function.VoidFunction2
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{DataFrame, ForeachWriter, Row, SparkSession, 
SQLHelper}
 import org.apache.spark.sql.connect.client.util.QueryTest
@@ -412,11 +413,13 @@ class EventCollector extends StreamingQueryListener {
   }
 }
 
-class ForeachBatchFn(val viewName: String) extends ((DataFrame, Long) => Unit) 
with Serializable {
-  override def apply(df: DataFrame, batchId: Long): Unit = {
+class ForeachBatchFn(val viewName: String)
+    extends VoidFunction2[DataFrame, java.lang.Long]
+    with Serializable {
+  override def call(df: DataFrame, batchId: java.lang.Long): Unit = {
     val count = df.count()
     df.sparkSession
-      .createDataFrame(Seq((batchId, count)))
+      .createDataFrame(Seq((batchId.toLong, count)))
       .createOrReplaceGlobalTempView(viewName)
   }
 }
diff --git 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
index ceacc595d15..16d5823f4a4 100644
--- 
a/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
+++ 
b/connector/connect/common/src/main/scala/org/apache/spark/sql/connect/common/UdfUtils.scala
@@ -59,6 +59,9 @@ private[sql] object UdfUtils extends Serializable {
   def foreachPartitionFuncToScalaFunc[T](f: ForeachPartitionFunction[T]): 
Iterator[T] => Unit =
     x => f.call(x.asJava)
 
+  def foreachBatchFuncToScalaFunc[D](f: VoidFunction2[D, java.lang.Long]): (D, 
Long) => Unit =
+    (d, i) => f.call(d, i)
+
   def flatMapFuncToScalaFunc[T, U](f: FlatMapFunction[T, U]): T => 
TraversableOnce[U] = x =>
     f.call(x).asScala
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to