[jira] [Commented] (SPARK-38033) The structured streaming processing cannot be started because the commit and offset are inconsistent

2022-01-26 Thread LeeeeLiu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482851#comment-17482851
 ] 

LLiu commented on SPARK-38033:
--

Yes, I found the same code in Spark 3+. But I didn't check in Spark 3+.

 
{code:java}
// spark 2.4.6
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestBatchId.toStreamProgress(sources)
}

// // spark 3.0.3
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestOffsets.toStreamProgress(sources)
}{code}
For this problem, I have two ideas, I wonder whether it is ok?

First, use latest commitId instead of offset \{latestBatchId - 1}, which is 
consistent in most cases.

Second, explain the processing method in the log, such as deleting the wrong 
offset.

 

> The structured streaming processing cannot be started because the commit and 
> offset are inconsistent
> 
>
> Key: SPARK-38033
> URL: https://issues.apache.org/jira/browse/SPARK-38033
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.6
>Reporter: LLiu
>Priority: Major
>
> Streaming Processing could not start due to an unexpected machine shutdown.
> The exception is as follows
>  
> {code:java}
> ERROR 22/01/12 02:48:36 MicroBatchExecution: Query 
> streaming_4a026335eafd4bb498ee51752b49f7fb [id = 
> 647ba9e4-16d2-4972-9824-6f9179588806, runId = 
> 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error
> java.lang.IllegalStateException: batch 113258 doesn't exist
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at scala.Option.getOrElse(Option.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> {code}
> I checked checkpoint file on HDFS and found the latest offset is 113259. But 
> commits is 113257. The following
> {code:java}
> commits
> /tmp/streaming_/commits/113253
> /tmp/streaming_/commits/113254
> /tmp/streaming_/commits/113255
> /tmp/streaming_/commits/113256
> /tmp/streaming_/commits/113257
> offset
> /tmp/streaming_/offsets/113253
> /tmp/streaming_/offsets/113254
> /tmp/streaming_/offsets/113255
> /tmp/streaming_/offsets/113256
> /tmp/streaming_/offsets/113257
> /tmp/streaming_/offsets/113259{code}
> Finally, I deleted offsets “/tmp/st

[jira] [Commented] (SPARK-38033) The structured streaming processing cannot be started because the commit and offset are inconsistent

2022-01-26 Thread Hyukjin Kwon (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17482829#comment-17482829
 ] 

Hyukjin Kwon commented on SPARK-38033:
--

Spark 2.4 is EOL so no bug fixes will likely be landed. Does the bug persist in 
Spark 3+ too?

> The structured streaming processing cannot be started because the commit and 
> offset are inconsistent
> 
>
> Key: SPARK-38033
> URL: https://issues.apache.org/jira/browse/SPARK-38033
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.6
>Reporter: LLiu
>Priority: Major
>
> Streaming Processing could not start due to an unexpected machine shutdown.
> The exception is as follows
>  
> {code:java}
> ERROR 22/01/12 02:48:36 MicroBatchExecution: Query 
> streaming_4a026335eafd4bb498ee51752b49f7fb [id = 
> 647ba9e4-16d2-4972-9824-6f9179588806, runId = 
> 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error
> java.lang.IllegalStateException: batch 113258 doesn't exist
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at scala.Option.getOrElse(Option.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> {code}
> I checked checkpoint file on HDFS and found the latest offset is 113259. But 
> commits is 113257. The following
> {code:java}
> commits
> /tmp/streaming_/commits/113253
> /tmp/streaming_/commits/113254
> /tmp/streaming_/commits/113255
> /tmp/streaming_/commits/113256
> /tmp/streaming_/commits/113257
> offset
> /tmp/streaming_/offsets/113253
> /tmp/streaming_/offsets/113254
> /tmp/streaming_/offsets/113255
> /tmp/streaming_/offsets/113256
> /tmp/streaming_/offsets/113257
> /tmp/streaming_/offsets/113259{code}
> Finally, I deleted offsets “/tmp/streaming_/offsets/113259” and the 
> program started normally. I think there is a problem here and we should try 
> to handle this exception or give some resolution in the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org