[ 
https://issues.apache.org/jira/browse/SPARK-13693?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15258856#comment-15258856
 ] 

Josh Rosen commented on SPARK-13693:
------------------------------------

Yep, there's definitely a race condition here:

{code}
16/04/26 11:14:04.451 block-manager-slave-async-thread-pool-0 INFO 
BlockManager: Removing RDD 89
16/04/26 11:14:04.451 JobGenerator INFO JobGenerator: Checkpointing graph for 
time 7000 ms
16/04/26 11:14:04.451 JobGenerator INFO DStreamGraph: Updating checkpoint data 
for time 7000 ms
16/04/26 11:14:04.451 JobGenerator INFO DStreamGraph: Updated checkpoint data 
for time 7000 ms
16/04/26 11:14:04.452 JobGenerator INFO CheckpointWriter: Submitted checkpoint 
of time 7000 ms writer queue
16/04/26 11:14:04.452 pool-1787-thread-1 INFO CheckpointWriter: Saving 
checkpoint for time 7000 ms to file 
'file:/home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/streaming/checkpoint/spark-dcb9046e-7dc0-440b-a406-f02883f3ae3b/checkpoint-7000'
16/04/26 11:14:14.452 pool-1-thread-1-ScalaTest-running-MapWithStateSuite INFO 
CheckpointWriter: CheckpointWriter executor terminated ? false, waited for 
10000 ms.
16/04/26 11:14:14.452 pool-1-thread-1-ScalaTest-running-MapWithStateSuite INFO 
JobGenerator: Stopped JobGenerator
16/04/26 11:14:14.452 pool-1-thread-1-ScalaTest-running-MapWithStateSuite INFO 
JobScheduler: Stopped JobScheduler
16/04/26 11:14:14.453 pool-1-thread-1-ScalaTest-running-MapWithStateSuite INFO 
StreamingContext: StreamingContext stopped successfully
16/04/26 11:14:14.453 pool-1-thread-1-ScalaTest-running-MapWithStateSuite INFO 
MapWithStateSuite: 

===== FINISHED o.a.s.streaming.MapWithStateSuite: 'mapWithState - basic 
operations with advanced API' =====

16/04/26 11:14:14.781 pool-1787-thread-1 WARN CheckpointWriter: Error in 
attempt 1 of writing checkpoint to 
file:/home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/streaming/checkpoint/spark-dcb9046e-7dc0-440b-a406-f02883f3ae3b/checkpoint-7000
java.io.IOException: java.lang.InterruptedException
        at org.apache.hadoop.util.Shell.runCommand(Shell.java:541)
        at org.apache.hadoop.util.Shell.run(Shell.java:455)
        at 
org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:715)
        at org.apache.hadoop.util.Shell.execCommand(Shell.java:808)
        at org.apache.hadoop.util.Shell.execCommand(Shell.java:791)
        at 
org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:656)
        at 
org.apache.hadoop.fs.FilterFileSystem.setPermission(FilterFileSystem.java:490)
        at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:462)
        at 
org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:428)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:908)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:889)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:786)
        at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:775)
        at 
org.apache.spark.streaming.CheckpointWriter$CheckpointWriteHandler.run(Checkpoint.scala:224)
        at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
16/04/26 11:14:14.781 pool-1787-thread-1 WARN CheckpointWriter: Could not write 
checkpoint for time 7000 ms to file 
file:/home/jenkins/workspace/spark-master-test-sbt-hadoop-2.6/streaming/checkpoint/spark-dcb9046e-7dc0-440b-a406-f02883f3ae3b/checkpoint-7000'
16/04/26 11:14:14.786 dispatcher-event-loop-7 INFO 
MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
{code}

It looks like the CheckpointWriter doesn't finish writing before we return from 
the shutdown. There's code to wait but the wait duration is limited to 10 
seconds, which doesn't seem to be enough. From CheckpointWriter:

{code}
def stop(): Unit = synchronized {
    if (stopped) return

    executor.shutdown()
    val startTime = System.currentTimeMillis()
    val terminated = executor.awaitTermination(10, 
java.util.concurrent.TimeUnit.SECONDS)
    if (!terminated) {
      executor.shutdownNow()
    }
    val endTime = System.currentTimeMillis()
    logInfo("CheckpointWriter executor terminated ? " + terminated +
      ", waited for " + (endTime - startTime) + " ms.")
    stopped = true
  }
{code}

I wonder if the shutdown process is causing the CheckpointWriter thread to get 
blocked / locked in a way that prevents it from finishing in time. This is a 
unit test with tiny data, so there's no way that the checkpoint should take 10 
seconds to write.

> Flaky test: o.a.s.streaming.MapWithStateSuite
> ---------------------------------------------
>
>                 Key: SPARK-13693
>                 URL: https://issues.apache.org/jira/browse/SPARK-13693
>             Project: Spark
>          Issue Type: Test
>          Components: Tests
>            Reporter: Shixiong Zhu
>            Assignee: Shixiong Zhu
>            Priority: Minor
>             Fix For: 2.0.0
>
>
> Fixed the following flaky test:
> https://amplab.cs.berkeley.edu/jenkins/job/spark-master-test-sbt-hadoop-2.7/256/testReport/junit/org.apache.spark.streaming/MapWithStateSuite/_It_is_not_a_test_/
> {code}
> sbt.ForkMain$ForkError: java.io.IOException: Failed to delete: 
> /home/jenkins/workspace/spark-master-test-sbt-hadoop-2.7/streaming/checkpoint/spark-e97794a8-b940-4b21-8685-bf1221f9444d
>       at org.apache.spark.util.Utils$.deleteRecursively(Utils.scala:934)
>       at 
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply$mcV$sp(MapWithStateSuite.scala:47)
>       at 
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply(MapWithStateSuite.scala:45)
>       at 
> org.apache.spark.streaming.MapWithStateSuite$$anonfun$2.apply(MapWithStateSuite.scala:45)
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to