Adrian Jones created SPARK-31931:
------------------------------------

             Summary: When using GCS as checkpoint location for Structured 
Streaming aggregation pipeline, the Spark writing job is aborted
                 Key: SPARK-31931
                 URL: https://issues.apache.org/jira/browse/SPARK-31931
             Project: Spark
          Issue Type: Bug
          Components: Structured Streaming
    Affects Versions: 2.4.5
         Environment: GCP Dataproc 1.5 Debian 10 (Hadoop 2.10.0, Spark 2.4.5, 
Cloud Storage Connector hadoop2.2.1.3, Scala 2.12.10)
            Reporter: Adrian Jones


Structured streaming checkpointing does not work with Google Cloud Storage when 
there are aggregations included in the streaming pipeline.

Using GCS as the external store works fine when there are no aggregations 
present in the pipeline (i.e. groupBy); however, once an aggregation is 
introduced, the below error is thrown.

The error is only thrown when aggregating and pointing checkpointLocation to 
GCS. The exact code works fine when pointing checkpointLocation to HDFS.

Is it expected for GCS to function as a checkpoint location for aggregated 
pipelines? Are efforts currently in progress to enable this? Is it on a roadmap?

_org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 
612a550b-992b-41cb-82f9-a95c12c51379, runId = 
90a8e64a-5f64-4bd0-90e7-5df14630c577] terminated with exception: Writing job 
aborted.```org.apache.spark.sql.streaming.StreamingQueryException: Query [id = 
612a550b-992b-41cb-82f9-a95c12c51379, runId = 
90a8e64a-5f64-4bd0-90e7-5df14630c577] terminated with exception: Writing job 
aborted.  at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:302)
  at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:193)Caused
 by: org.apache.spark.SparkException: Writing job aborted.  at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:92)
  at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:131)
  at 
org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:155)
  at 
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)  
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)  
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)  at 
org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:247)  
at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:296) 
 at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3389)  at 
org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2788)  at 
org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:3370)  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
  at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3370)  at 
org.apache.spark.sql.Dataset.collect(Dataset.scala:2788)  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$15(MicroBatchExecution.scala:540)
  at 
org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:80)
  at 
org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
  at 
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runBatch$14(MicroBatchExecution.scala:536)
  at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
  at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
  at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runBatch(MicroBatchExecution.scala:535)
  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$2(MicroBatchExecution.scala:198)
  at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)  at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken(ProgressReporter.scala:351)
  at 
org.apache.spark.sql.execution.streaming.ProgressReporter.reportTimeTaken$(ProgressReporter.scala:349)
  at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$runActivatedStream$1(MicroBatchExecution.scala:166)
  at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
  at 
org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:160)
  at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:281)
  ... 1 moreCaused by: org.apache.spark.SparkException: Job aborted due to 
stage failure: Task 2 in stage 1.0 failed 4 times, most recent failure: Lost 
task 2.3 in stage 1.0 (TID 12, 
spark-structured-streaming-w-0.c.pso-wmt-sandbox.internal, executor 1): 
org.apache.spark.util.TaskCompletionListenerException: null at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138) at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117) 
at org.apache.spark.scheduler.Task.run(Task.scala:139) at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
 at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
at java.lang.Thread.run(Thread.java:748)_
_Driver stacktrace:  at 
org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1892)
  at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1880)
  at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1879)
  at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)  
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)  
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)  at 
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1879)  at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:927)
  at 
org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:927)
  at scala.Option.foreach(Option.scala:407)  at 
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2113)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2062)
  at 
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2051)
  at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)  at 
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)  at 
org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)  at 
org.apache.spark.sql.execution.datasources.v2.WriteToDataSourceV2Exec.doExecute(WriteToDataSourceV2Exec.scala:64)
  ... 34 moreCaused by: org.apache.spark.util.TaskCompletionListenerException: 
null  at 
org.apache.spark.TaskContextImpl.invokeListeners(TaskContextImpl.scala:138)  at 
org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:117)  
at org.apache.spark.scheduler.Task.run(Task.scala:139)  at 
org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)  at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)  at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 
 at java.lang.Thread.run(Thread.java:748)_



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to