[jira] [Updated] (SPARK-40377) Allow customize maxBroadcastTableBytes and maxBroadcastRows
[ https://issues.apache.org/jira/browse/SPARK-40377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LLiu updated SPARK-40377: - Description: Recently, we encountered some driver OOM problems. Some large tables were compressed using Snappy and then broadcast join was performed, but the actual data volume was too large, which resulted in driver OOM. The values of maxBroadcastTableBytes and maxBroadcastRows are hardcoded, 8GB and 51200 respectively. Maybe we can allow customization of these values, configure smaller values according to different scenarios, and prohibit broadcast joins for some large tables to avoid driver OOM. was: Recently, we encountered some driver OOM problems. Some tables with large data volume were compressed using Snappy and then broadcast join was performed, but the actual data volume was too large, which resulted in driver OOM. The values of maxBroadcastTableBytes and maxBroadcastRows are hardcoded, 8GB and 51200 respectively. Maybe we can allow customization of these values, configure smaller values according to different scenarios, and prohibit broadcast joins for tables with large data volumes to avoid driver OOM. > Allow customize maxBroadcastTableBytes and maxBroadcastRows > --- > > Key: SPARK-40377 > URL: https://issues.apache.org/jira/browse/SPARK-40377 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: LLiu >Priority: Major > Attachments: 截屏2022-09-07 20.40.06.png, 截屏2022-09-07 20.40.16.png > > > Recently, we encountered some driver OOM problems. Some large tables were > compressed using Snappy and then broadcast join was performed, but the actual > data volume was too large, which resulted in driver OOM. > The values of maxBroadcastTableBytes and maxBroadcastRows are hardcoded, 8GB > and 51200 respectively. Maybe we can allow customization of these values, > configure smaller values according to different scenarios, and prohibit > broadcast joins for some large tables to avoid driver OOM. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-40377) Allow customize maxBroadcastTableBytes and maxBroadcastRows
[ https://issues.apache.org/jira/browse/SPARK-40377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LLiu updated SPARK-40377: - Attachment: 截屏2022-09-07 20.40.06.png 截屏2022-09-07 20.40.16.png > Allow customize maxBroadcastTableBytes and maxBroadcastRows > --- > > Key: SPARK-40377 > URL: https://issues.apache.org/jira/browse/SPARK-40377 > Project: Spark > Issue Type: Improvement > Components: SQL >Affects Versions: 3.4.0 >Reporter: LLiu >Priority: Major > Attachments: 截屏2022-09-07 20.40.06.png, 截屏2022-09-07 20.40.16.png > > > Recently, we encountered some driver OOM problems. Some tables with large > data volume were compressed using Snappy and then broadcast join was > performed, but the actual data volume was too large, which resulted in driver > OOM. > The values of maxBroadcastTableBytes and maxBroadcastRows are hardcoded, 8GB > and 51200 respectively. Maybe we can allow customization of these values, > configure smaller values according to different scenarios, and prohibit > broadcast joins for tables with large data volumes to avoid driver OOM. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-40377) Allow customize maxBroadcastTableBytes and maxBroadcastRows
LLiu created SPARK-40377: Summary: Allow customize maxBroadcastTableBytes and maxBroadcastRows Key: SPARK-40377 URL: https://issues.apache.org/jira/browse/SPARK-40377 Project: Spark Issue Type: Improvement Components: SQL Affects Versions: 3.4.0 Reporter: LLiu Recently, we encountered some driver OOM problems. Some tables with large data volume were compressed using Snappy and then broadcast join was performed, but the actual data volume was too large, which resulted in driver OOM. The values of maxBroadcastTableBytes and maxBroadcastRows are hardcoded, 8GB and 51200 respectively. Maybe we can allow customization of these values, configure smaller values according to different scenarios, and prohibit broadcast joins for tables with large data volumes to avoid driver OOM. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38033) The structured streaming processing cannot be started because the commitId and offsetId are inconsistent
[ https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17488650#comment-17488650 ] LLiu commented on SPARK-38033: -- Hi [~kabhwan] , this is a good idea(y), thanks for your suggestion, and I'll try to provide better error message. > The structured streaming processing cannot be started because the commitId > and offsetId are inconsistent > > > Key: SPARK-38033 > URL: https://issues.apache.org/jira/browse/SPARK-38033 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.6, 3.0.3 >Reporter: LLiu >Priority: Major > > Streaming Processing could not start due to an unexpected machine shutdown. > The exception is as follows > > {code:java} > ERROR 22/01/12 02:48:36 MicroBatchExecution: Query > streaming_4a026335eafd4bb498ee51752b49f7fb [id = > 647ba9e4-16d2-4972-9824-6f9179588806, runId = > 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error > java.lang.IllegalStateException: batch 113258 doesn't exist > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) > {code} > I checked checkpoint file on HDFS and found the latest offset is 113259. But > commits is 113257. The following > {code:java} > commits > /tmp/streaming_/commits/113253 > /tmp/streaming_/commits/113254 > /tmp/streaming_/commits/113255 > /tmp/streaming_/commits/113256 > /tmp/streaming_/commits/113257 > offset > /tmp/streaming_/offsets/113253 > /tmp/streaming_/offsets/113254 > /tmp/streaming_/offsets/113255 > /tmp/streaming_/offsets/113256 > /tmp/streaming_/offsets/113257 > /tmp/streaming_/offsets/113259{code} > Finally, I deleted offsets “/tmp/streaming_/offsets/113259” and the > program started normally. I think there is a problem here and we should try > to handle this exception or give some resolution in the log. > > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-38033) The structured streaming processing cannot be started because the commitId and offsetId are inconsistent
[ https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LLiu updated SPARK-38033: - Affects Version/s: 3.0.3 > The structured streaming processing cannot be started because the commitId > and offsetId are inconsistent > > > Key: SPARK-38033 > URL: https://issues.apache.org/jira/browse/SPARK-38033 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.6, 3.0.3 >Reporter: LLiu >Priority: Major > > Streaming Processing could not start due to an unexpected machine shutdown. > The exception is as follows > > {code:java} > ERROR 22/01/12 02:48:36 MicroBatchExecution: Query > streaming_4a026335eafd4bb498ee51752b49f7fb [id = > 647ba9e4-16d2-4972-9824-6f9179588806, runId = > 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error > java.lang.IllegalStateException: batch 113258 doesn't exist > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) > {code} > I checked checkpoint file on HDFS and found the latest offset is 113259. But > commits is 113257. The following > {code:java} > commits > /tmp/streaming_/commits/113253 > /tmp/streaming_/commits/113254 > /tmp/streaming_/commits/113255 > /tmp/streaming_/commits/113256 > /tmp/streaming_/commits/113257 > offset > /tmp/streaming_/offsets/113253 > /tmp/streaming_/offsets/113254 > /tmp/streaming_/offsets/113255 > /tmp/streaming_/offsets/113256 > /tmp/streaming_/offsets/113257 > /tmp/streaming_/offsets/113259{code} > Finally, I deleted offsets “/tmp/streaming_/offsets/113259” and the > program started normally. I think there is a problem here and we should try > to handle this exception or give some resolution in the log. > > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-38033) The structured streaming processing cannot be started because the commitId and offsetId are inconsistent
[ https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482851#comment-17482851 ] LLiu edited comment on SPARK-38033 at 1/27/22, 6:35 AM: Yes, I found the same code in Spark 3+. But I didn't check in Spark 3+. {code:java} // spark 2.4.6 // MicroBatchExecution class // populateStartOffsets method /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ if (latestBatchId != 0) { val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse { throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") } committedOffsets = secondLatestBatchId.toStreamProgress(sources) } // // spark 3.0.3 // MicroBatchExecution class // populateStartOffsets method /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ if (latestBatchId != 0) { val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse { throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") } committedOffsets = secondLatestOffsets.toStreamProgress(sources) }{code} I have an idea to use latest commitId instead of \{latestBatchId - 1} when \{latestBatchId - 1} does not exist. was (Author: JIRAUSER284215): Yes, I found the same code in Spark 3+. But I didn't check in Spark 3+. {code:java} // spark 2.4.6 // MicroBatchExecution class // populateStartOffsets method /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ if (latestBatchId != 0) { val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse { throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") } committedOffsets = secondLatestBatchId.toStreamProgress(sources) } // // spark 3.0.3 // MicroBatchExecution class // populateStartOffsets method /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ if (latestBatchId != 0) { val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse { throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") } committedOffsets = secondLatestOffsets.toStreamProgress(sources) }{code} > The structured streaming processing cannot be started because the commitId > and offsetId are inconsistent > > > Key: SPARK-38033 > URL: https://issues.apache.org/jira/browse/SPARK-38033 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.6 >Reporter: LLiu >Priority: Major > > Streaming Processing could not start due to an unexpected machine shutdown. > The exception is as follows > > {code:java} > ERROR 22/01/12 02:48:36 MicroBatchExecution: Query > streaming_4a026335eafd4bb498ee51752b49f7fb [id = > 647ba9e4-16d2-4972-9824-6f9179588806, runId = > 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error > java.lang.IllegalStateException: batch 113258 doesn't exist > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at >
[jira] [Comment Edited] (SPARK-38033) The structured streaming processing cannot be started because the commitId and offsetId are inconsistent
[ https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482851#comment-17482851 ] LLiu edited comment on SPARK-38033 at 1/27/22, 6:24 AM: Yes, I found the same code in Spark 3+. But I didn't check in Spark 3+. {code:java} // spark 2.4.6 // MicroBatchExecution class // populateStartOffsets method /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ if (latestBatchId != 0) { val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse { throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") } committedOffsets = secondLatestBatchId.toStreamProgress(sources) } // // spark 3.0.3 // MicroBatchExecution class // populateStartOffsets method /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ if (latestBatchId != 0) { val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse { throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") } committedOffsets = secondLatestOffsets.toStreamProgress(sources) }{code} was (Author: JIRAUSER284215): Yes, I found the same code in Spark 3+. But I didn't check in Spark 3+. {code:java} // spark 2.4.6 // MicroBatchExecution class // populateStartOffsets method /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ if (latestBatchId != 0) { val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse { throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") } committedOffsets = secondLatestBatchId.toStreamProgress(sources) } // // spark 3.0.3 // MicroBatchExecution class // populateStartOffsets method /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ if (latestBatchId != 0) { val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse { throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") } committedOffsets = secondLatestOffsets.toStreamProgress(sources) }{code} For this problem, I have two ideas, I wonder whether it is ok? First, use latest commitId instead of offset \{latestBatchId - 1}, which is consistent in most cases. Second, explain the processing method in the log, such as deleting the wrong offset. > The structured streaming processing cannot be started because the commitId > and offsetId are inconsistent > > > Key: SPARK-38033 > URL: https://issues.apache.org/jira/browse/SPARK-38033 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.6 >Reporter: LLiu >Priority: Major > > Streaming Processing could not start due to an unexpected machine shutdown. > The exception is as follows > > {code:java} > ERROR 22/01/12 02:48:36 MicroBatchExecution: Query > streaming_4a026335eafd4bb498ee51752b49f7fb [id = > 647ba9e4-16d2-4972-9824-6f9179588806, runId = > 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error > java.lang.IllegalStateException: batch 113258 doesn't exist > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at >
[jira] [Updated] (SPARK-38033) The structured streaming processing cannot be started because the commitId and offsetId are inconsistent
[ https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] LLiu updated SPARK-38033: - Summary: The structured streaming processing cannot be started because the commitId and offsetId are inconsistent (was: The structured streaming processing cannot be started because the commit and offset are inconsistent) > The structured streaming processing cannot be started because the commitId > and offsetId are inconsistent > > > Key: SPARK-38033 > URL: https://issues.apache.org/jira/browse/SPARK-38033 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.6 >Reporter: LLiu >Priority: Major > > Streaming Processing could not start due to an unexpected machine shutdown. > The exception is as follows > > {code:java} > ERROR 22/01/12 02:48:36 MicroBatchExecution: Query > streaming_4a026335eafd4bb498ee51752b49f7fb [id = > 647ba9e4-16d2-4972-9824-6f9179588806, runId = > 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error > java.lang.IllegalStateException: batch 113258 doesn't exist > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) > {code} > I checked checkpoint file on HDFS and found the latest offset is 113259. But > commits is 113257. The following > {code:java} > commits > /tmp/streaming_/commits/113253 > /tmp/streaming_/commits/113254 > /tmp/streaming_/commits/113255 > /tmp/streaming_/commits/113256 > /tmp/streaming_/commits/113257 > offset > /tmp/streaming_/offsets/113253 > /tmp/streaming_/offsets/113254 > /tmp/streaming_/offsets/113255 > /tmp/streaming_/offsets/113256 > /tmp/streaming_/offsets/113257 > /tmp/streaming_/offsets/113259{code} > Finally, I deleted offsets “/tmp/streaming_/offsets/113259” and the > program started normally. I think there is a problem here and we should try > to handle this exception or give some resolution in the log. > > -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-38033) The structured streaming processing cannot be started because the commit and offset are inconsistent
[ https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482851#comment-17482851 ] LLiu commented on SPARK-38033: -- Yes, I found the same code in Spark 3+. But I didn't check in Spark 3+. {code:java} // spark 2.4.6 // MicroBatchExecution class // populateStartOffsets method /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ if (latestBatchId != 0) { val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse { throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") } committedOffsets = secondLatestBatchId.toStreamProgress(sources) } // // spark 3.0.3 // MicroBatchExecution class // populateStartOffsets method /* Initialize committed offsets to a committed batch, which at this * is the second latest batch id in the offset log. */ if (latestBatchId != 0) { val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse { throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist") } committedOffsets = secondLatestOffsets.toStreamProgress(sources) }{code} For this problem, I have two ideas, I wonder whether it is ok? First, use latest commitId instead of offset \{latestBatchId - 1}, which is consistent in most cases. Second, explain the processing method in the log, such as deleting the wrong offset. > The structured streaming processing cannot be started because the commit and > offset are inconsistent > > > Key: SPARK-38033 > URL: https://issues.apache.org/jira/browse/SPARK-38033 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 2.4.6 >Reporter: LLiu >Priority: Major > > Streaming Processing could not start due to an unexpected machine shutdown. > The exception is as follows > > {code:java} > ERROR 22/01/12 02:48:36 MicroBatchExecution: Query > streaming_4a026335eafd4bb498ee51752b49f7fb [id = > 647ba9e4-16d2-4972-9824-6f9179588806, runId = > 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error > java.lang.IllegalStateException: batch 113258 doesn't exist > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) > at scala.Option.getOrElse(Option.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) > {code} > I checked checkpoint file on HDFS and found the latest offset is 113259. But > commits is 113257. The following > {code:java} > commits > /tmp/streaming_/commits/113253 > /tmp/streaming_/commits/113254 > /tmp/streaming_/commits/113255 > /tmp/streaming_/commits/113256 > /tmp/streaming_/commits/113257 > offset > /tmp/streaming_/offsets/113253 > /tmp/streaming_/offsets/113254 > /tmp/streaming_/offsets/113255 > /tmp/streaming_/offsets/113256 > /tmp/streaming_/offsets/113257 > /tmp/streaming_/offsets/113259{code} > Finally, I deleted offsets
[jira] [Created] (SPARK-38033) The structured streaming processing cannot be started because the commit and offset are inconsistent
LLiu created SPARK-38033: Summary: The structured streaming processing cannot be started because the commit and offset are inconsistent Key: SPARK-38033 URL: https://issues.apache.org/jira/browse/SPARK-38033 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 2.4.6 Reporter: LLiu Streaming Processing could not start due to an unexpected machine shutdown. The exception is as follows {code:java} ERROR 22/01/12 02:48:36 MicroBatchExecution: Query streaming_4a026335eafd4bb498ee51752b49f7fb [id = 647ba9e4-16d2-4972-9824-6f9179588806, runId = 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error java.lang.IllegalStateException: batch 113258 doesn't exist at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351) at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166) at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160) at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281) at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193) {code} I checked checkpoint file on HDFS and found the latest offset is 113259. But commits is 113257. The following {code:java} commits /tmp/streaming_/commits/113253 /tmp/streaming_/commits/113254 /tmp/streaming_/commits/113255 /tmp/streaming_/commits/113256 /tmp/streaming_/commits/113257 offset /tmp/streaming_/offsets/113253 /tmp/streaming_/offsets/113254 /tmp/streaming_/offsets/113255 /tmp/streaming_/offsets/113256 /tmp/streaming_/offsets/113257 /tmp/streaming_/offsets/113259{code} Finally, I deleted offsets “/tmp/streaming_/offsets/113259” and the program started normally. I think there is a problem here and we should try to handle this exception or give some resolution in the log. -- This message was sent by Atlassian Jira (v8.20.1#820001) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org