Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195878970
  
    --- Diff: python/pyspark/sql/streaming.py ---
    @@ -1016,6 +1018,35 @@ def func_with_open_process_close(partition_id, 
iterator):
             self._jwrite.foreach(jForeachWriter)
             return self
     
    +    @since(2.4)
    +    def foreachBatch(self, func):
    +        """
    +        Sets the output of the streaming query to be processed using the 
provided
    +        function. This is supported only the in the micro-batch execution 
modes (that is, when the
    +        trigger is not continuous). In every micro-batch, the provided 
function will be called in
    +        every micro-batch with (i) the output rows as a DataFrame and (ii) 
the batch identifier.
    +        The batchId can be used deduplicate and transactionally write the 
output
    +        (that is, the provided Dataset) to external systems. The output 
DataFrame is guaranteed
    +        to exactly same for the same batchId (assuming all operations are 
deterministic in the
    +        query).
    +
    +        .. note:: Evolving.
    +
    +        >>> def func(batch_df, batch_id):
    +        ...     batch_df.collect()
    +        ...
    +        >>> writer = sdf.writeStream.foreach(func)
    +        """
    +
    +        from pyspark.java_gateway import ensure_callback_server_started
    +        gw = self._spark._sc._gateway
    +        java_import(gw.jvm, 
"org.apache.spark.sql.execution.streaming.sources.*")
    +
    +        wrapped_func = ForeachBatchFunction(self._spark, func)
    +        gw.jvm.PythonForeachBatchHelper.callForeachBatch(self._jwrite, 
wrapped_func)
    +        ensure_callback_server_started(gw)
    --- End diff --
    
    this should be above otherwise there is a race that the streaming query 
calls this python func before the callback server is started.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to