Kai-Michael Roesner created SPARK-42102:
-------------------------------------------

             Summary: Using checkpoints in Spark Structured Streaming with the 
foreachBatch sink
                 Key: SPARK-42102
                 URL: https://issues.apache.org/jira/browse/SPARK-42102
             Project: Spark
          Issue Type: Question
          Components: PySpark, Structured Streaming
    Affects Versions: 3.3.1
            Reporter: Kai-Michael Roesner


I want to build a fault-tolerant, recoverable Spark job (using Structured 
Streaming in PySpark) that reads a data stream from Kafka and uses the 
[{{foreachBatch}}|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch]
 sink to implement a stateful transformation before writing the resulting data 
to the actual sink.

The basic structure of my Spark job is like this:
{code}
counter = 0

def batch_handler(df, batch_id):
  global counter
  counter += 1
  df.withColumn('counter', lit(counter)).show(truncate=30)

spark = (SparkSession.builder
  .appName('test.stateful.checkpoint')
  .config('spark.jars.packages', f'{KAFKA_SQL},{KAFKA_CLNT}')
  .getOrCreate())

source = (spark.readStream
  .format('kafka')
  .options(**KAFKA_OPTIONS)
  .option('subscribe', 'topic-spark-stateful')
  .option('startingOffsets', 'earliest')
  .option('includeHeaders', 'true')
  .load())

(source
  .selectExpr('CAST(value AS STRING) AS data', 'CAST(timestamp AS STRING) AS 
time')
  .writeStream
  .option('checkpointLocation', './checkpoints/stateful')
  .foreachBatch(batch_handler)
  .start()
  .awaitTermination())
{code}

where the simplified {{batch_handler}} function is a stand-in for the stateful 
transformation + writer to the actual data sink. Also for simplicity I am using 
a local folder as checkpoint location. 

This works fine as far as checkpointing of Kafka offsets is concerned. But how 
can I include the state of my custom batch handler ({{counter}} in my 
simplified example) in the checkpoints such that the job can pick up where it 
left after a crash?

The [Spark Structured Streaming 
Guide|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing]
 doesn't say anything on the topic. With the 
[{{foreach}}|(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach]
 sink I can pass a custom row handler object but this seems to support only 
{{open}}, {{process}}, and {{close}} methods.

Would it make sense to create a "Request" or even "Feature" ticket to enhance 
this with methods for restoring state from a checkpoint and exporting state to 
support checkpointing?

PS: I have posted this on [SOF|https://stackoverflow.com/questions/74864425], 
too. If anyone cares to answer or comment I'd be happy to upvote their post.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to