Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/21571#discussion_r195917011
  
    --- Diff: python/pyspark/sql/streaming.py ---
    @@ -1016,6 +1018,35 @@ def func_with_open_process_close(partition_id, 
iterator):
             self._jwrite.foreach(jForeachWriter)
             return self
     
    +    @since(2.4)
    +    def foreachBatch(self, func):
    +        """
    +        Sets the output of the streaming query to be processed using the 
provided
    +        function. This is supported only the in the micro-batch execution 
modes (that is, when the
    +        trigger is not continuous). In every micro-batch, the provided 
function will be called in
    +        every micro-batch with (i) the output rows as a DataFrame and (ii) 
the batch identifier.
    +        The batchId can be used deduplicate and transactionally write the 
output
    +        (that is, the provided Dataset) to external systems. The output 
DataFrame is guaranteed
    +        to exactly same for the same batchId (assuming all operations are 
deterministic in the
    +        query).
    +
    +        .. note:: Evolving.
    +
    +        >>> def func(batch_df, batch_id):
    +        ...     batch_df.collect()
    +        ...
    +        >>> writer = sdf.writeStream.foreach(func)
    +        """
    +
    +        from pyspark.java_gateway import ensure_callback_server_started
    +        gw = self._spark._sc._gateway
    +        java_import(gw.jvm, 
"org.apache.spark.sql.execution.streaming.sources.*")
    +
    +        wrapped_func = ForeachBatchFunction(self._spark, func)
    +        gw.jvm.PythonForeachBatchHelper.callForeachBatch(self._jwrite, 
wrapped_func)
    +        ensure_callback_server_started(gw)
    --- End diff --
    
    This is not possible because the callback from JVM ForeachBatch sink to 
Python is made ONLY after the query is started.  And the query cannot be 
started until this foreach() method finishes and start() is called.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to