[ https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17486839#comment-17486839 ]
Jungtaek Lim commented on SPARK-38033: -------------------------------------- It is actually ambiguous issue for Spark to self heal. There is neither 'offset' nor 'commit' for the batch 113258 which means the query hasn't executed the batch 113258, but there is 'offset' for the batch 113259 which means the query has executed the batch 113258. Spark writes the offset for the batch N on planning phase (before executing the batch N) to ensure the fault-tolerance semantic. When the query crashes in any way after writing offset for the batch N, Spark will restart the query from reading the offset for the batch N, so that the offset is consistent for the batch N, and the output will be also consistent for batch N. (If the query is deterministic.) If Spark simply ignores the batch 113259 and automatically re-plans the batch 113258, it could be likely that the offset of the batch 113258 is not same with the original (probably lost) one, hence different output for the batch 113258. If your query has a deduplication logic against batch ID (in forEachBatch), it may lead to data loss. Since you lost both offset and commit, the only way to recover is to remove the offset 113259 and start over, but you may want to also remove the output from the batch 113258 manually if possible before starting, if your query aims "end-to-end exactly once" semantic. That said, the amount of works needed on this scenario is conditional to the fault-tolerance semantic, and in some case you may want to do some manual work to restore the state including output storage. That makes me a bit skeptical to do self-heal. Instead, as an improvement, we could probably do some simply analysis and give better error message if there is inconsistency between offset and commit. > The structured streaming processing cannot be started because the commitId > and offsetId are inconsistent > -------------------------------------------------------------------------------------------------------- > > Key: SPARK-38033 > URL: https://issues.apache.org/jira/browse/SPARK-38033 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming > Affects Versions: 2.4.6 > Reporter: LeeeeLiu > Priority: Major > > Streaming Processing could not start due to an unexpected machine shutdown. > The exception is as follows > > {code:java} > ERROR 22/01/12 02:48:36 MicroBatchExecution: Query > streaming_4a026335eafd4bb498ee51752b49f7fb [id = > 647ba9e4-16d2-4972-9824-6f9179588806, runId = > 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error > java.lang.IllegalStateException: batch 113258 doesn't exist > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) > {code} > I checked checkpoint file on HDFS and found the latest offset is 113259. But > commits is 113257. The following > {code:java} > commits > /tmp/streaming_xxxxxxxx/commits/113253 > /tmp/streaming_xxxxxxxx/commits/113254 > /tmp/streaming_xxxxxxxx/commits/113255 > /tmp/streaming_xxxxxxxx/commits/113256 > /tmp/streaming_xxxxxxxx/commits/113257 > offset > /tmp/streaming_xxxxxxxx/offsets/113253 > /tmp/streaming_xxxxxxxx/offsets/113254 > /tmp/streaming_xxxxxxxx/offsets/113255 > /tmp/streaming_xxxxxxxx/offsets/113256 > /tmp/streaming_xxxxxxxx/offsets/113257 > /tmp/streaming_xxxxxxxx/offsets/113259{code} > Finally, I deleted offsets “/tmp/streaming_xxxxxxxx/offsets/113259” and the > program started normally. I think there is a problem here and we should try > to handle this exception or give some resolution in the log. > > -- This message was sent by Atlassian Jira (v8.20.1#820001) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org