[ 
https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486839#comment-17486839
 ] 

Jungtaek Lim commented on SPARK-38033:
--------------------------------------

It is actually ambiguous issue for Spark to self heal.

There is neither 'offset' nor 'commit' for the batch 113258 which means the 
query hasn't executed the batch 113258, but there is 'offset' for the batch 
113259 which means the query has executed the batch 113258.

Spark writes the offset for the batch N on planning phase (before executing the 
batch N) to ensure the fault-tolerance semantic. When the query crashes in any 
way after writing offset for the batch N, Spark will restart the query from 
reading the offset for the batch N, so that the offset is consistent for the 
batch N, and the output will be also consistent for batch N. (If the query is 
deterministic.)

If Spark simply ignores the batch 113259 and automatically re-plans the batch 
113258, it could be likely that the offset of the batch 113258 is not same with 
the original (probably lost) one, hence different output for the batch 113258. 
If your query has a deduplication logic against batch ID (in forEachBatch), it 
may lead to data loss.

Since you lost both offset and commit, the only way to recover is to remove the 
offset 113259 and start over, but you may want to also remove the output from 
the batch 113258 manually if possible before starting, if your query aims 
"end-to-end exactly once" semantic.

That said, the amount of works needed on this scenario is conditional to the 
fault-tolerance semantic, and in some case you may want to do some manual work 
to restore the state including output storage. That makes me a bit skeptical to 
do self-heal.

Instead, as an improvement, we could probably do some simply analysis and give 
better error message if there is inconsistency between offset and commit.

> The structured streaming processing cannot be started because the commitId 
> and offsetId are inconsistent
> --------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-38033
>                 URL: https://issues.apache.org/jira/browse/SPARK-38033
>             Project: Spark
>          Issue Type: Improvement
>          Components: Structured Streaming
>    Affects Versions: 2.4.6
>            Reporter: LeeeeLiu
>            Priority: Major
>
> Streaming Processing could not start due to an unexpected machine shutdown.
> The exception is as follows
>  
> {code:java}
> ERROR 22/01/12 02:48:36 MicroBatchExecution: Query 
> streaming_4a026335eafd4bb498ee51752b49f7fb [id = 
> 647ba9e4-16d2-4972-9824-6f9179588806, runId = 
> 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error
> java.lang.IllegalStateException: batch 113258 doesn't exist
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at scala.Option.getOrElse(Option.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> {code}
> I checked checkpoint file on HDFS and found the latest offset is 113259. But 
> commits is 113257. The following
> {code:java}
> commits
> /tmp/streaming_xxxxxxxx/commits/113253
> /tmp/streaming_xxxxxxxx/commits/113254
> /tmp/streaming_xxxxxxxx/commits/113255
> /tmp/streaming_xxxxxxxx/commits/113256
> /tmp/streaming_xxxxxxxx/commits/113257
> offset
> /tmp/streaming_xxxxxxxx/offsets/113253
> /tmp/streaming_xxxxxxxx/offsets/113254
> /tmp/streaming_xxxxxxxx/offsets/113255
> /tmp/streaming_xxxxxxxx/offsets/113256
> /tmp/streaming_xxxxxxxx/offsets/113257
> /tmp/streaming_xxxxxxxx/offsets/113259{code}
> Finally, I deleted offsets “/tmp/streaming_xxxxxxxx/offsets/113259” and the 
> program started normally. I think there is a problem here and we should try 
> to handle this exception or give some resolution in the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to