[jira] [Updated] (SPARK-40377) Allow customize maxBroadcastTableBytes and maxBroadcastRows

2022-09-08 Thread LeeeeLiu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LLiu updated SPARK-40377:
-
Description: 
Recently, we encountered some driver OOM problems. Some large tables were 
compressed using Snappy and then broadcast join was performed, but the actual 
data volume was too large, which resulted in driver OOM.

The values of maxBroadcastTableBytes and maxBroadcastRows are hardcoded, 8GB 
and 51200 respectively. Maybe we can allow customization of these values, 
configure smaller values according to different scenarios, and prohibit 
broadcast joins for some large tables to avoid driver OOM.

  was:
Recently, we encountered some driver OOM problems. Some tables with large data 
volume were compressed using Snappy and then broadcast join was performed, but 
the actual data volume was too large, which resulted in driver OOM.

The values of maxBroadcastTableBytes and maxBroadcastRows are hardcoded, 8GB 
and 51200 respectively. Maybe we can allow customization of these values, 
configure smaller values according to different scenarios, and prohibit 
broadcast joins for tables with large data volumes to avoid driver OOM.


> Allow customize maxBroadcastTableBytes and maxBroadcastRows
> ---
>
> Key: SPARK-40377
> URL: https://issues.apache.org/jira/browse/SPARK-40377
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: LLiu
>Priority: Major
> Attachments: 截屏2022-09-07 20.40.06.png, 截屏2022-09-07 20.40.16.png
>
>
> Recently, we encountered some driver OOM problems. Some large tables were 
> compressed using Snappy and then broadcast join was performed, but the actual 
> data volume was too large, which resulted in driver OOM.
> The values of maxBroadcastTableBytes and maxBroadcastRows are hardcoded, 8GB 
> and 51200 respectively. Maybe we can allow customization of these values, 
> configure smaller values according to different scenarios, and prohibit 
> broadcast joins for some large tables to avoid driver OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-40377) Allow customize maxBroadcastTableBytes and maxBroadcastRows

2022-09-07 Thread LeeeeLiu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-40377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LLiu updated SPARK-40377:
-
Attachment: 截屏2022-09-07 20.40.06.png
截屏2022-09-07 20.40.16.png

> Allow customize maxBroadcastTableBytes and maxBroadcastRows
> ---
>
> Key: SPARK-40377
> URL: https://issues.apache.org/jira/browse/SPARK-40377
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.4.0
>Reporter: LLiu
>Priority: Major
> Attachments: 截屏2022-09-07 20.40.06.png, 截屏2022-09-07 20.40.16.png
>
>
> Recently, we encountered some driver OOM problems. Some tables with large 
> data volume were compressed using Snappy and then broadcast join was 
> performed, but the actual data volume was too large, which resulted in driver 
> OOM.
> The values of maxBroadcastTableBytes and maxBroadcastRows are hardcoded, 8GB 
> and 51200 respectively. Maybe we can allow customization of these values, 
> configure smaller values according to different scenarios, and prohibit 
> broadcast joins for tables with large data volumes to avoid driver OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-40377) Allow customize maxBroadcastTableBytes and maxBroadcastRows

2022-09-07 Thread LeeeeLiu (Jira)
LLiu created SPARK-40377:


 Summary: Allow customize maxBroadcastTableBytes and 
maxBroadcastRows
 Key: SPARK-40377
 URL: https://issues.apache.org/jira/browse/SPARK-40377
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.4.0
Reporter: LLiu


Recently, we encountered some driver OOM problems. Some tables with large data 
volume were compressed using Snappy and then broadcast join was performed, but 
the actual data volume was too large, which resulted in driver OOM.

The values of maxBroadcastTableBytes and maxBroadcastRows are hardcoded, 8GB 
and 51200 respectively. Maybe we can allow customization of these values, 
configure smaller values according to different scenarios, and prohibit 
broadcast joins for tables with large data volumes to avoid driver OOM.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38033) The structured streaming processing cannot be started because the commitId and offsetId are inconsistent

2022-02-07 Thread LeeeeLiu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17488650#comment-17488650
 ] 

LLiu commented on SPARK-38033:
--

Hi [~kabhwan] , this is a good idea(y), thanks for your suggestion, and I'll 
try to provide better error message.

> The structured streaming processing cannot be started because the commitId 
> and offsetId are inconsistent
> 
>
> Key: SPARK-38033
> URL: https://issues.apache.org/jira/browse/SPARK-38033
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.6, 3.0.3
>Reporter: LLiu
>Priority: Major
>
> Streaming Processing could not start due to an unexpected machine shutdown.
> The exception is as follows
>  
> {code:java}
> ERROR 22/01/12 02:48:36 MicroBatchExecution: Query 
> streaming_4a026335eafd4bb498ee51752b49f7fb [id = 
> 647ba9e4-16d2-4972-9824-6f9179588806, runId = 
> 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error
> java.lang.IllegalStateException: batch 113258 doesn't exist
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at scala.Option.getOrElse(Option.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> {code}
> I checked checkpoint file on HDFS and found the latest offset is 113259. But 
> commits is 113257. The following
> {code:java}
> commits
> /tmp/streaming_/commits/113253
> /tmp/streaming_/commits/113254
> /tmp/streaming_/commits/113255
> /tmp/streaming_/commits/113256
> /tmp/streaming_/commits/113257
> offset
> /tmp/streaming_/offsets/113253
> /tmp/streaming_/offsets/113254
> /tmp/streaming_/offsets/113255
> /tmp/streaming_/offsets/113256
> /tmp/streaming_/offsets/113257
> /tmp/streaming_/offsets/113259{code}
> Finally, I deleted offsets “/tmp/streaming_/offsets/113259” and the 
> program started normally. I think there is a problem here and we should try 
> to handle this exception or give some resolution in the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-38033) The structured streaming processing cannot be started because the commitId and offsetId are inconsistent

2022-02-07 Thread LeeeeLiu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LLiu updated SPARK-38033:
-
Affects Version/s: 3.0.3

> The structured streaming processing cannot be started because the commitId 
> and offsetId are inconsistent
> 
>
> Key: SPARK-38033
> URL: https://issues.apache.org/jira/browse/SPARK-38033
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.6, 3.0.3
>Reporter: LLiu
>Priority: Major
>
> Streaming Processing could not start due to an unexpected machine shutdown.
> The exception is as follows
>  
> {code:java}
> ERROR 22/01/12 02:48:36 MicroBatchExecution: Query 
> streaming_4a026335eafd4bb498ee51752b49f7fb [id = 
> 647ba9e4-16d2-4972-9824-6f9179588806, runId = 
> 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error
> java.lang.IllegalStateException: batch 113258 doesn't exist
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at scala.Option.getOrElse(Option.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> {code}
> I checked checkpoint file on HDFS and found the latest offset is 113259. But 
> commits is 113257. The following
> {code:java}
> commits
> /tmp/streaming_/commits/113253
> /tmp/streaming_/commits/113254
> /tmp/streaming_/commits/113255
> /tmp/streaming_/commits/113256
> /tmp/streaming_/commits/113257
> offset
> /tmp/streaming_/offsets/113253
> /tmp/streaming_/offsets/113254
> /tmp/streaming_/offsets/113255
> /tmp/streaming_/offsets/113256
> /tmp/streaming_/offsets/113257
> /tmp/streaming_/offsets/113259{code}
> Finally, I deleted offsets “/tmp/streaming_/offsets/113259” and the 
> program started normally. I think there is a problem here and we should try 
> to handle this exception or give some resolution in the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-38033) The structured streaming processing cannot be started because the commitId and offsetId are inconsistent

2022-01-26 Thread LeeeeLiu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482851#comment-17482851
 ] 

LLiu edited comment on SPARK-38033 at 1/27/22, 6:35 AM:


Yes, I found the same code in Spark 3+. But I didn't check in Spark 3+.

 
{code:java}
// spark 2.4.6
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestBatchId.toStreamProgress(sources)
}

// // spark 3.0.3
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestOffsets.toStreamProgress(sources)
}{code}
I have an idea to use latest commitId instead of \{latestBatchId - 1} when  
\{latestBatchId - 1} does not exist.

 


was (Author: JIRAUSER284215):
Yes, I found the same code in Spark 3+. But I didn't check in Spark 3+.

 
{code:java}
// spark 2.4.6
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestBatchId.toStreamProgress(sources)
}

// // spark 3.0.3
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestOffsets.toStreamProgress(sources)
}{code}
 

 

> The structured streaming processing cannot be started because the commitId 
> and offsetId are inconsistent
> 
>
> Key: SPARK-38033
> URL: https://issues.apache.org/jira/browse/SPARK-38033
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.6
>Reporter: LLiu
>Priority: Major
>
> Streaming Processing could not start due to an unexpected machine shutdown.
> The exception is as follows
>  
> {code:java}
> ERROR 22/01/12 02:48:36 MicroBatchExecution: Query 
> streaming_4a026335eafd4bb498ee51752b49f7fb [id = 
> 647ba9e4-16d2-4972-9824-6f9179588806, runId = 
> 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error
> java.lang.IllegalStateException: batch 113258 doesn't exist
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at scala.Option.getOrElse(Option.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at 
> 

[jira] [Comment Edited] (SPARK-38033) The structured streaming processing cannot be started because the commitId and offsetId are inconsistent

2022-01-26 Thread LeeeeLiu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482851#comment-17482851
 ] 

LLiu edited comment on SPARK-38033 at 1/27/22, 6:24 AM:


Yes, I found the same code in Spark 3+. But I didn't check in Spark 3+.

 
{code:java}
// spark 2.4.6
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestBatchId.toStreamProgress(sources)
}

// // spark 3.0.3
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestOffsets.toStreamProgress(sources)
}{code}
 

 


was (Author: JIRAUSER284215):
Yes, I found the same code in Spark 3+. But I didn't check in Spark 3+.

 
{code:java}
// spark 2.4.6
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestBatchId.toStreamProgress(sources)
}

// // spark 3.0.3
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestOffsets.toStreamProgress(sources)
}{code}
For this problem, I have two ideas, I wonder whether it is ok?

First, use latest commitId instead of offset \{latestBatchId - 1}, which is 
consistent in most cases.

Second, explain the processing method in the log, such as deleting the wrong 
offset.

 

> The structured streaming processing cannot be started because the commitId 
> and offsetId are inconsistent
> 
>
> Key: SPARK-38033
> URL: https://issues.apache.org/jira/browse/SPARK-38033
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.6
>Reporter: LLiu
>Priority: Major
>
> Streaming Processing could not start due to an unexpected machine shutdown.
> The exception is as follows
>  
> {code:java}
> ERROR 22/01/12 02:48:36 MicroBatchExecution: Query 
> streaming_4a026335eafd4bb498ee51752b49f7fb [id = 
> 647ba9e4-16d2-4972-9824-6f9179588806, runId = 
> 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error
> java.lang.IllegalStateException: batch 113258 doesn't exist
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at scala.Option.getOrElse(Option.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at 
> 

[jira] [Updated] (SPARK-38033) The structured streaming processing cannot be started because the commitId and offsetId are inconsistent

2022-01-26 Thread LeeeeLiu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

LLiu updated SPARK-38033:
-
Summary: The structured streaming processing cannot be started because the 
commitId and offsetId are inconsistent  (was: The structured streaming 
processing cannot be started because the commit and offset are inconsistent)

> The structured streaming processing cannot be started because the commitId 
> and offsetId are inconsistent
> 
>
> Key: SPARK-38033
> URL: https://issues.apache.org/jira/browse/SPARK-38033
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.6
>Reporter: LLiu
>Priority: Major
>
> Streaming Processing could not start due to an unexpected machine shutdown.
> The exception is as follows
>  
> {code:java}
> ERROR 22/01/12 02:48:36 MicroBatchExecution: Query 
> streaming_4a026335eafd4bb498ee51752b49f7fb [id = 
> 647ba9e4-16d2-4972-9824-6f9179588806, runId = 
> 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error
> java.lang.IllegalStateException: batch 113258 doesn't exist
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at scala.Option.getOrElse(Option.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> {code}
> I checked checkpoint file on HDFS and found the latest offset is 113259. But 
> commits is 113257. The following
> {code:java}
> commits
> /tmp/streaming_/commits/113253
> /tmp/streaming_/commits/113254
> /tmp/streaming_/commits/113255
> /tmp/streaming_/commits/113256
> /tmp/streaming_/commits/113257
> offset
> /tmp/streaming_/offsets/113253
> /tmp/streaming_/offsets/113254
> /tmp/streaming_/offsets/113255
> /tmp/streaming_/offsets/113256
> /tmp/streaming_/offsets/113257
> /tmp/streaming_/offsets/113259{code}
> Finally, I deleted offsets “/tmp/streaming_/offsets/113259” and the 
> program started normally. I think there is a problem here and we should try 
> to handle this exception or give some resolution in the log.
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38033) The structured streaming processing cannot be started because the commit and offset are inconsistent

2022-01-26 Thread LeeeeLiu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17482851#comment-17482851
 ] 

LLiu commented on SPARK-38033:
--

Yes, I found the same code in Spark 3+. But I didn't check in Spark 3+.

 
{code:java}
// spark 2.4.6
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestBatchId = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestBatchId.toStreamProgress(sources)
}

// // spark 3.0.3
// MicroBatchExecution class
// populateStartOffsets method

/* Initialize committed offsets to a committed batch, which at this
 * is the second latest batch id in the offset log. */
if (latestBatchId != 0) {
  val secondLatestOffsets = offsetLog.get(latestBatchId - 1).getOrElse {
throw new IllegalStateException(s"batch ${latestBatchId - 1} doesn't exist")
  }
  committedOffsets = secondLatestOffsets.toStreamProgress(sources)
}{code}
For this problem, I have two ideas, I wonder whether it is ok?

First, use latest commitId instead of offset \{latestBatchId - 1}, which is 
consistent in most cases.

Second, explain the processing method in the log, such as deleting the wrong 
offset.

 

> The structured streaming processing cannot be started because the commit and 
> offset are inconsistent
> 
>
> Key: SPARK-38033
> URL: https://issues.apache.org/jira/browse/SPARK-38033
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.4.6
>Reporter: LLiu
>Priority: Major
>
> Streaming Processing could not start due to an unexpected machine shutdown.
> The exception is as follows
>  
> {code:java}
> ERROR 22/01/12 02:48:36 MicroBatchExecution: Query 
> streaming_4a026335eafd4bb498ee51752b49f7fb [id = 
> 647ba9e4-16d2-4972-9824-6f9179588806, runId = 
> 92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error
> java.lang.IllegalStateException: batch 113258 doesn't exist
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
>         at scala.Option.getOrElse(Option.scala:121)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
>         at 
> org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
>         at 
> org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
>         at 
> org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)
> {code}
> I checked checkpoint file on HDFS and found the latest offset is 113259. But 
> commits is 113257. The following
> {code:java}
> commits
> /tmp/streaming_/commits/113253
> /tmp/streaming_/commits/113254
> /tmp/streaming_/commits/113255
> /tmp/streaming_/commits/113256
> /tmp/streaming_/commits/113257
> offset
> /tmp/streaming_/offsets/113253
> /tmp/streaming_/offsets/113254
> /tmp/streaming_/offsets/113255
> /tmp/streaming_/offsets/113256
> /tmp/streaming_/offsets/113257
> /tmp/streaming_/offsets/113259{code}
> Finally, I deleted offsets 

[jira] [Created] (SPARK-38033) The structured streaming processing cannot be started because the commit and offset are inconsistent

2022-01-26 Thread LeeeeLiu (Jira)
LLiu created SPARK-38033:


 Summary: The structured streaming processing cannot be started 
because the commit and offset are inconsistent
 Key: SPARK-38033
 URL: https://issues.apache.org/jira/browse/SPARK-38033
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 2.4.6
Reporter: LLiu


Streaming Processing could not start due to an unexpected machine shutdown.

The exception is as follows

 
{code:java}
ERROR 22/01/12 02:48:36 MicroBatchExecution: Query 
streaming_4a026335eafd4bb498ee51752b49f7fb [id = 
647ba9e4-16d2-4972-9824-6f9179588806, runId = 
92385d5b-f31f-40d0-9ac7-bb7d9796d774] terminated with error
java.lang.IllegalStateException: batch 113258 doesn't exist
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$4.apply(MicroBatchExecution.scala:256)
        at scala.Option.getOrElse(Option.scala:121)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$populateStartOffsets(MicroBatchExecution.scala:255)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:169)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:351)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:166)
        at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
        at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
        at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)

{code}
I checked checkpoint file on HDFS and found the latest offset is 113259. But 
commits is 113257. The following
{code:java}
commits
/tmp/streaming_/commits/113253
/tmp/streaming_/commits/113254
/tmp/streaming_/commits/113255
/tmp/streaming_/commits/113256
/tmp/streaming_/commits/113257

offset
/tmp/streaming_/offsets/113253
/tmp/streaming_/offsets/113254
/tmp/streaming_/offsets/113255
/tmp/streaming_/offsets/113256
/tmp/streaming_/offsets/113257
/tmp/streaming_/offsets/113259{code}
Finally, I deleted offsets “/tmp/streaming_/offsets/113259” and the 
program started normally. I think there is a problem here and we should try to 
handle this exception or give some resolution in the log.

 

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org