Kai-Michael Roesner created SPARK-42102: -------------------------------------------
Summary: Using checkpoints in Spark Structured Streaming with the foreachBatch sink Key: SPARK-42102 URL: https://issues.apache.org/jira/browse/SPARK-42102 Project: Spark Issue Type: Question Components: PySpark, Structured Streaming Affects Versions: 3.3.1 Reporter: Kai-Michael Roesner I want to build a fault-tolerant, recoverable Spark job (using Structured Streaming in PySpark) that reads a data stream from Kafka and uses the [{{foreachBatch}}|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreachbatch] sink to implement a stateful transformation before writing the resulting data to the actual sink. The basic structure of my Spark job is like this: {code} counter = 0 def batch_handler(df, batch_id): global counter counter += 1 df.withColumn('counter', lit(counter)).show(truncate=30) spark = (SparkSession.builder .appName('test.stateful.checkpoint') .config('spark.jars.packages', f'{KAFKA_SQL},{KAFKA_CLNT}') .getOrCreate()) source = (spark.readStream .format('kafka') .options(**KAFKA_OPTIONS) .option('subscribe', 'topic-spark-stateful') .option('startingOffsets', 'earliest') .option('includeHeaders', 'true') .load()) (source .selectExpr('CAST(value AS STRING) AS data', 'CAST(timestamp AS STRING) AS time') .writeStream .option('checkpointLocation', './checkpoints/stateful') .foreachBatch(batch_handler) .start() .awaitTermination()) {code} where the simplified {{batch_handler}} function is a stand-in for the stateful transformation + writer to the actual data sink. Also for simplicity I am using a local folder as checkpoint location. This works fine as far as checkpointing of Kafka offsets is concerned. But how can I include the state of my custom batch handler ({{counter}} in my simplified example) in the checkpoints such that the job can pick up where it left after a crash? The [Spark Structured Streaming Guide|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing] doesn't say anything on the topic. With the [{{foreach}}|(https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#foreach] sink I can pass a custom row handler object but this seems to support only {{open}}, {{process}}, and {{close}} methods. Would it make sense to create a "Request" or even "Feature" ticket to enhance this with methods for restoring state from a checkpoint and exporting state to support checkpointing? PS: I have posted this on [SOF|https://stackoverflow.com/questions/74864425], too. If anyone cares to answer or comment I'd be happy to upvote their post. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org