[ https://issues.apache.org/jira/browse/SPARK-24315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16616622#comment-16616622 ]
Joey commented on SPARK-24315: ------------------------------ [~mgaido] , can you please explain why this is not a bug? What could be the cause of this? > Multiple streaming jobs detected error causing job failure > ---------------------------------------------------------- > > Key: SPARK-24315 > URL: https://issues.apache.org/jira/browse/SPARK-24315 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.3.0 > Reporter: Marco Gaido > Priority: Major > > We are running a simple structured streaming job. It reads data from Kafka > and writes it to HDFS. Unfortunately at startup, the application fails with > the following error. After some restarts the application finally starts > successfully. > {code} > org.apache.spark.sql.streaming.StreamingQueryException: assertion failed: > Concurrent update to the log. Multiple streaming jobs detected for 1 > === Streaming Query === > .... > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295) > at > org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189) > Caused by: java.lang.AssertionError: assertion failed: Concurrent update to > the log. Multiple streaming jobs detected for 1 > at scala.Predef$.assert(Predef.scala:170) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply$mcV$sp(MicroBatchExecution.scala:339) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch$1.apply(MicroBatchExecution.scala:338) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$constructNextBatch(MicroBatchExecution.scala:338) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:128) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271) > at > org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121) > at > org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56) > at > org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117) > at > org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279) > ... 1 more > {code} > We have not set any value for `spark.streaming.concurrentJobs`. Our code > looks like: > {code} > // read from kafka > .withWatermark("timestamp", "30 minutes") > .groupBy(window($"timestamp", "1 hour", "30 minutes"), ...).count() > // simple select of some fields with casts > .coalesce(1) > .writeStream > .trigger(Trigger.ProcessingTime("2 minutes")) > .option("checkpointLocation", checkpointDir) > // write to HDFS > .start() > .awaitTermination() > {code} > This may also be related to the presence of some data in the kafka queue to > process, so the time for the first batch may be longer than usual (as it is > quite common I think). -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org