Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22627#discussion_r223479414
  
    --- Diff: docs/structured-streaming-programming-guide.md ---
    @@ -1989,22 +2026,211 @@ head(sql("select * from aggregates"))
     </div>
     </div>
     
    -##### Using Foreach
    -The `foreach` operation allows arbitrary operations to be computed on the 
output data. As of Spark 2.1, this is available only for Scala and Java. To use 
this, you will have to implement the interface `ForeachWriter`
    
-([Scala](api/scala/index.html#org.apache.spark.sql.ForeachWriter)/[Java](api/java/org/apache/spark/sql/ForeachWriter.html)
 docs),
    -which has methods that get called whenever there is a sequence of rows 
generated as output after a trigger. Note the following important points.
    +##### Using Foreach and ForeachBatch
    +The `foreach` and `foreachBatch` operations allow you to apply arbitrary 
operations and writing 
    +logic on the output of a streaming query. They have slightly different use 
cases - while `foreach` 
    +allows custom write logic on every row, `foreachBatch` allows arbitrary 
operations 
    +and custom logic on the output of each micro-batch. Let's understand their 
usages in more detail.  
    +
    +###### ForeachBatch
    +`foreachBatch(...)` allows you to specify a function that is executed on 
    +the output data of every micro-batch of a streaming query. Since Spark 
2.4, this is supported in Scala, Java and Python. 
    +It takes two parameters: a DataFrame or Dataset that has the output data 
of a micro-batch and the unique ID of the micro-batch.
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) 
=>
    +  // Transform and write batchDF 
    +}.start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +{% highlight java %}
    +streamingDatasetOfString.writeStream.foreachBatch(
    +  new VoidFunction2<Dataset<String>, long> {
    +    void call(Dataset<String> dataset, long batchId) {
    +      // Transform and write batchDF
    +    }    
    +  }
    +).start();
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +{% highlight python %}
    +def foreachBatchFunction(df, epochId):
    +  # Transform and write batchDF
    +  pass
    +  
    +streamingDF.writeStream.foreachBatch(foreachBatchFunction).start()   
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +With foreachBatch, you can do the following.
    +
    +- **Reuse existing batch data sources** - For many storage systems, there 
may not be a streaming sink available yet, 
    +  but there may already exist a data writer for batch queries. Using 
foreachBatch(), you can use the batch
    +  data writers on the output of each micro-batch.
    +- **Write to multiple locations** - If you want to write the output of a 
streaming query to multiple locations, 
    +  then you can simply write the output DataFrame/Dataset multiple times. 
However, each attempt to write can 
    +  cause the output data to be recomputed (including possible re-reading of 
the input data). To avoid recomputations,
    +  you should cache the output DataFrame/Dataset, write it to multiple 
locations, and then uncache it. Here is an outline.  
    +
    +    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: 
Long) =>
    +      batchDF.cache()
    +      batchDF.write.format(...).save(...)  // location 1
    +      batchDF.write.format(...).save(...)  // location 2
    +      batchDF.uncache()
    +    }
    +
    +- **Apply additional DataFrame operations** - Many DataFrame and Dataset 
operations are not supported 
    +  in streaming DataFrames because Spark does not support generating 
incremental plans in those cases. 
    +  Using foreachBatch() you can apply some of these operations on each 
micro-batch output. However, you will have to reason about the end-to-end 
semantics of doing that operation yourself.
    +
    +**Note:**
    +- By default, `foreachBatch` provides only at-least-once write guarantees. 
However, you can use the 
    +  batchId provided to the function as way to deduplicate the output and 
get an exactly-once guarantee.  
    +- `foreachBatch` does not work with the continuous processing mode as it 
fundamentally relies on the
    +  micro-batch execution of a streaming query. If you write data in the 
continuous mode, use `foreach` instead.
    +
    +
    +###### Foreach
    +If `foreachBatch` is not an option (for example, corresponding batch data 
writer does not exist, or 
    +continuous processing mode), then you can express you custom writer logic 
using `foreach`. 
    +Specifically, you can express the data writing logic by dividing it into 
three methods: `open`, `process`, and `close`.
    +Since Spark 2.4, `foreach` is available in Scala, Java and Python. 
    +
    +<div class="codetabs">
    +<div data-lang="scala"  markdown="1">
    +
    +In Scala, you have to extend the class `ForeachWriter` 
([docs](api/scala/index.html#org.apache.spark.sql.ForeachWriter)).
    +
    +{% highlight scala %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    def open(partitionId: Long, version: Long): Boolean = {
    +      // Open connection
    +    }
    +
    +    def process(record: String) = {
    +      // Write string to connection
    +    }
    +
    +    def close(errorOrNull: Throwable): Unit = {
    +      // Close the connection
    +    }
    +  }
    +).start()
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="java"  markdown="1">
    +
    +In Java, you have to extend the class `ForeachWriter` 
([docs](api/java/org/apache/spark/sql/ForeachWriter.html)).
    +{% highlight java %}
    +streamingDF.writeStream.foreach(
    +  new ForeachWriter[String] {
    +
    +    boolean open(long partitionId, long version) {
    +      // Open connection
    +    }
    +
    +    void process(String record) {
    +      // Write string to connection
    +    }
    +
    +    void close(Throwable errorOrNull) {
    +      // Close the connection
    +    }
    +  }
    +).start();
    +
    +{% endhighlight %}
    +
    +</div>
    +<div data-lang="python"  markdown="1">
    +
    +In Python, you can invoke foreach in two ways: in a function or in an 
object. 
    +The function offers a simple way to express your processing logic but does 
not allow you to 
    +deduplicate generated data when failures cause reprocessing of some input 
data. 
    +For that situation you must specify the processing logic in an object.
    +
    +1. The function takes a row as input.
    +
    +  {% highlight python %}
    +      def processRow(row):
    +          // Write row to storage
    +          pass
    +      
    +      query = streamingDF.writeStream.foreach(processRow).start()  
    +  {% endhighlight %}
    +
    +2. The object has a process method and optional open and close methods: 
    +
    +  {% highlight python %}
    +      class ForeachWriter:
    +          def open(self, partition_id, epoch_id):
    +              // Open connection. This method is optional in Python.
    +      
    +          def process(self, row):
    +              // Write row to connection. This method is NOT optional in 
Python.
    +      
    +          def close(self, error):
    +              // Close the connection. This method in optional in Python.
    +      
    +      query = streamingDF.writeStream.foreach(ForeachWriter()).start()
    +  {% endhighlight %}
    +
    +</div>
    +<div data-lang="r"  markdown="1">
    +R is not yet supported.
    +</div>
    +</div>
    +
    +
    +**Execution semantics**
    +When the streaming query is started, Spark calls the function or the 
object’s methods in the following way:
    +
    +- A single copy of this object is responsible for all the data generated 
by a single task in a query. 
    +  In other words, one instance is responsible for processing one partition 
of the data generated in a distributed manner.
    +
    +- This object must be serializable, because each task will get a fresh 
serialized-deserialized copy 
    +  of the provided object. Hence, it is strongly recommended that any 
initialization for writing data 
    +  (for example. opening a connection or starting a transaction) is done 
after the open() method has 
    +  been called, which signifies that the task is ready to generate data.
    +
    +- The lifecycle of the methods are as follows:
    +
    +  - For each partition with partition_id:
     
    -- The writer must be serializable, as it will be serialized and sent to 
the executors for execution.
    +    - For each batch/epoch of streaming data with epoch_id:
     
    -- All the three methods, `open`, `process` and `close` will be called on 
the executors.
    +      - Method open(partitionId, epochId) is called.
     
    -- The writer must do all the initialization (e.g. opening connections, 
starting a transaction, etc.) only when the `open` method is called. Be aware 
that, if there is any initialization in the class as soon as the object is 
created, then that initialization will happen in the driver (because that is 
where the instance is being created), which may not be what you intend.
    +      - If open(...) returns true, for each row in the partition and 
batch/epoch, method process(row) is called.
     
    -- `version` and `partition` are two parameters in `open` that uniquely 
represent a set of rows that needs to be pushed out. `version` is a 
monotonically increasing id that increases with every trigger. `partition` is 
an id that represents a partition of the output, since the output is 
distributed and will be processed on multiple executors.
    +      - Method close(error) is called with error (if any) seen while 
processing rows.
     
    -- `open` can use the `version` and `partition` to choose whether it needs 
to write the sequence of rows. Accordingly, it can return `true` (proceed with 
writing), or `false` (no need to write). If `false` is returned, then `process` 
will not be called on any row. For example, after a partial failure, some of 
the output partitions of the failed trigger may have already been committed to 
a database. Based on metadata stored in the database, the writer can identify 
partitions that have already been committed and accordingly return false to 
skip committing them again. 
    +- The close() method (if it exists) is called if an open() method exists 
and returns successfully (irrespective of the return value), except if the JVM 
or Python process crashes in the middle.
     
    -- Whenever `open` is called, `close` will also be called (unless the JVM 
exits due to some error). This is true even if `open` returns false. If there 
is any error in processing and writing the data, `close` will be called with 
the error. It is your responsibility to clean up state (e.g. connections, 
transactions, etc.) that have been created in `open` such that there are no 
resource leaks.
    +- **Note:** The partitionId and epochId in the open() method can be used 
to deduplicate generated data 
    +  when failures cause reprocessing of some input data. This depends on the 
execution mode of the query. 
    +  If the streaming query is being executed in the micro-batch mode, then 
every partition represented 
    +  by a unique tuple (partition_id, epoch_id) is guaranteed to have the 
same data. 
    +  Hence, (partition_id, epoch_id) can be used to deduplicate and/or 
transactionally commit 
    +  data and achieve exactly-once guarantees. However, if the streaming 
query is being executed 
    +  in the continuous mode, then this guarantee does not hold and therefore 
should not be used for deduplication.
    --- End diff --
    
    I agree with @HeartSaVioR 
    I continuous processing, when a epoch is reprocessed, the engine and offset 
tracking will ensure that the starting offset of that epoch is same as what was 
recorded with the previous epoch's offset, but the ending offset is not 
guaranteed to be the same as what was processed before the failure. It may so 
happen that the epoch E of partition P processed offsets X to Y (and the output 
of partition P was written), but the query failed before Y was recorded (as 
other partitions may not have completed epoch E). So after restarting, it may 
so happens that the re-executed epoch E may process offsets X to Y + Z before 
the epoch is incremented. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org

Reply via email to