spark's behavior about failed tasks
Hello there, I have a spark running in a 20 node cluster. The job is logically simple, just a mapPartition and then sum. The return value of the mapPartitions is an integer for each partition. The tasks got some random failure (which could be caused by a 3rh party key-value store connections. The cause is irrelevant to my question). In more details, Description: 1. spark 1.1.1. 2. 4096 tasks total. 3. 66 failed tasks. Issue: Spark seems rerunning all the 4096 tasks instead of the 66 failed tasks. It current runs at 469/4096 (stage2). Is this behavior normal? Thanks for your help! -- View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/spark-s-behavior-about-failed-tasks-tp24232.html Sent from the Apache Spark User List mailing list archive at Nabble.com. - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Spark's Behavior 2
Hi TD, I have sent more informations now using 8 workers. The gap has been 27 sec now. Have you seen? Thanks BR -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Spark's behavior
Ok Andrew, Thanks I sent informations of test with 8 worker and the gap is grown up. On May 4, 2014, at 2:31, Andrew Ash and...@andrewash.com wrote: From the logs, I see that the print() starts printing stuff 10 seconds after the context is started. And that 10 seconds is taken by the initial empty job (50 map + 20 reduce tasks) that spark streaming starts to ensure all the executors have started. Somehow the first empty task takes 7-8 seconds to complete. See if this can be reproduced by running a simple, empty job in spark shell (in the same cluster) and see if the first task takes 7-8 seconds. Either way, I didnt see the 30 second gap, but a 10 second gap. And that does not seem to be a persistent problem as after that 10 seconds, the data is being received and processed. TD -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Spark's behavior
Hi TD, Thanks for reply This last experiment I did with one computer, like local, but I think that time gap grow up when I add more computer. I will do again now with 8 worker and 1 word source and I will see what’s go on. I will control the time too, like suggested by Andrew. On May 3, 2014, at 1:19, Tathagata Das tathagata.das1...@gmail.com wrote: From the logs, I see that the print() starts printing stuff 10 seconds after the context is started. And that 10 seconds is taken by the initial empty job (50 map + 20 reduce tasks) that spark streaming starts to ensure all the executors have started. Somehow the first empty task takes 7-8 seconds to complete. See if this can be reproduced by running a simple, empty job in spark shell (in the same cluster) and see if the first task takes 7-8 seconds. Either way, I didnt see the 30 second gap, but a 10 second gap. And that does not seem to be a persistent problem as after that 10 seconds, the data is being received and processed. TD On Fri, May 2, 2014 at 2:14 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, I got the another information today using Spark 1.0 RC3 and the situation remain the same: PastedGraphic-1.png The lines begin after 17 sec: 14/05/02 21:52:25 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140502215225-0005/0 on hostPort computer8.ant-net:57229 with 2 cores, 2.0 GB RAM 14/05/02 21:52:25 INFO AppClient$ClientActor: Executor updated: app-20140502215225-0005/0 is now RUNNING 14/05/02 21:52:25 INFO ReceiverTracker: ReceiverTracker started 14/05/02 21:52:26 INFO ForEachDStream: metadataCleanupDelay = -1 14/05/02 21:52:26 INFO SocketInputDStream: metadataCleanupDelay = -1 14/05/02 21:52:26 INFO SocketInputDStream: Slide time = 1000 ms 14/05/02 21:52:26 INFO SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 14/05/02 21:52:26 INFO SocketInputDStream: Checkpoint interval = null 14/05/02 21:52:26 INFO SocketInputDStream: Remember duration = 1000 ms 14/05/02 21:52:26 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5433868e 14/05/02 21:52:26 INFO ForEachDStream: Slide time = 1000 ms 14/05/02 21:52:26 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 14/05/02 21:52:26 INFO ForEachDStream: Checkpoint interval = null 14/05/02 21:52:26 INFO ForEachDStream: Remember duration = 1000 ms 14/05/02 21:52:26 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1ebdbc05 14/05/02 21:52:26 INFO SparkContext: Starting job: collect at ReceiverTracker.scala:270 14/05/02 21:52:26 INFO RecurringTimer: Started timer for JobGenerator at time 1399060346000 14/05/02 21:52:26 INFO JobGenerator: Started JobGenerator at 1399060346000 ms 14/05/02 21:52:26 INFO JobScheduler: Started JobScheduler 14/05/02 21:52:26 INFO DAGScheduler: Registering RDD 3 (reduceByKey at ReceiverTracker.scala:270) 14/05/02 21:52:26 INFO ReceiverTracker: Stream 0 received 0 blocks 14/05/02 21:52:26 INFO DAGScheduler: Got job 0 (collect at ReceiverTracker.scala:270) with 20 output partitions (allowLocal=false) 14/05/02 21:52:26 INFO DAGScheduler: Final stage: Stage 0(collect at ReceiverTracker.scala:270) 14/05/02 21:52:26 INFO DAGScheduler: Parents of final stage: List(Stage 1) 14/05/02 21:52:26 INFO JobScheduler: Added jobs for time 1399060346000 ms 14/05/02 21:52:26 INFO JobScheduler: Starting job streaming job 1399060346000 ms.0 from job set of time 1399060346000 ms 14/05/02 21:52:26 INFO JobGenerator: Checkpointing graph for time 1399060346000 ms ---14/05/02 21:52:26 INFO DStreamGraph: Updating checkpoint data for time 1399060346000 ms Time: 1399060346000 ms --- 14/05/02 21:52:26 INFO JobScheduler: Finished job streaming job 1399060346000 ms.0 from job set of time 1399060346000 ms 14/05/02 21:52:26 INFO JobScheduler: Total delay: 0.325 s for time 1399060346000 ms (execution: 0.024 s) 14/05/02 21:52:42 INFO JobScheduler: Added jobs for time 1399060362000 ms 14/05/02 21:52:42 INFO JobGenerator: Checkpointing graph for time 1399060362000 ms 14/05/02 21:52:42 INFO DStreamGraph: Updating checkpoint data for time 1399060362000 ms 14/05/02 21:52:42 INFO DStreamGraph: Updated checkpoint data for time 1399060362000 ms 14/05/02 21:52:42 INFO JobScheduler: Starting job streaming job 1399060362000 ms.0 from job set of time 1399060362000 ms 14/05/02 21:52:42 INFO SparkContext: Starting job: take at DStream.scala:593 14/05/02 21:52:42 INFO DAGScheduler: Got job 2 (take at DStream.scala:593) with 1 output partitions (allowLocal=true) 14/05/02 21:52:42 INFO DAGScheduler: Final stage: Stage 3(take at DStream.scala:593) 14/05/02 21:52:42 INFO DAGScheduler: Parents of final stage: List() 14/05/02 21:52:42 INFO
Re: Spark's behavior
Hi Eduardo, Yep those machines look pretty well synchronized at this point. Just wanted to throw that out there and eliminate it as a possible source of confusion. Good luck on continuing the debugging! Andrew On Sat, May 3, 2014 at 11:59 AM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, I did a test with 8 workers and 1 word source, the time gap was 27 sec, how can see in the log files(in attach). Hi Andrew, I configured the ntp, all machines are synchronized. root@computer8:/opt/unibs_test/spark-1.0RC3# for num in {1,8,10,11,13,15,16,18,22}; do ssh computer$num date; done Sat May 3 20:57:41 CEST 2014 Sat May 3 20:57:41 CEST 2014 Sat May 3 20:57:41 CEST 2014 Sat May 3 20:57:42 CEST 2014 Sat May 3 20:57:42 CEST 2014 Sat May 3 20:57:42 CEST 2014 Sat May 3 20:57:42 CEST 2014 Sat May 3 20:57:42 CEST 2014 Sat May 3 20:57:42 CEST 2014 Informativa sulla Privacy: http://www.unibs.it/node/8155 On May 3, 2014, at 15:46, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, Thanks for reply This last experiment I did with one computer, like local, but I think that time gap grow up when I add more computer. I will do again now with 8 worker and 1 word source and I will see what’s go on. I will control the time too, like suggested by Andrew. On May 3, 2014, at 1:19, Tathagata Das tathagata.das1...@gmail.com wrote: From the logs, I see that the print() starts printing stuff 10 seconds after the context is started. And that 10 seconds is taken by the initial empty job (50 map + 20 reduce tasks) that spark streaming starts to ensure all the executors have started. Somehow the first empty task takes 7-8 seconds to complete. See if this can be reproduced by running a simple, empty job in spark shell (in the same cluster) and see if the first task takes 7-8 seconds. Either way, I didnt see the 30 second gap, but a 10 second gap. And that does not seem to be a persistent problem as after that 10 seconds, the data is being received and processed. TD On Fri, May 2, 2014 at 2:14 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, I got the another information today using Spark 1.0 RC3 and the situation remain the same: PastedGraphic-1.png The lines begin after 17 sec: 14/05/02 21:52:25 INFO SparkDeploySchedulerBackend: Granted executor ID app-20140502215225-0005/0 on hostPort computer8.ant-net:57229 with 2 cores, 2.0 GB RAM 14/05/02 21:52:25 INFO AppClient$ClientActor: Executor updated: app-20140502215225-0005/0 is now RUNNING 14/05/02 21:52:25 INFO ReceiverTracker: ReceiverTracker started 14/05/02 21:52:26 INFO ForEachDStream: metadataCleanupDelay = -1 14/05/02 21:52:26 INFO SocketInputDStream: metadataCleanupDelay = -1 14/05/02 21:52:26 INFO SocketInputDStream: Slide time = 1000 ms 14/05/02 21:52:26 INFO SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 14/05/02 21:52:26 INFO SocketInputDStream: Checkpoint interval = null 14/05/02 21:52:26 INFO SocketInputDStream: Remember duration = 1000 ms 14/05/02 21:52:26 INFO SocketInputDStream: Initialized and validated org.apache.spark.streaming.dstream.SocketInputDStream@5433868e 14/05/02 21:52:26 INFO ForEachDStream: Slide time = 1000 ms 14/05/02 21:52:26 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 14/05/02 21:52:26 INFO ForEachDStream: Checkpoint interval = null 14/05/02 21:52:26 INFO ForEachDStream: Remember duration = 1000 ms 14/05/02 21:52:26 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@1ebdbc05 14/05/02 21:52:26 INFO SparkContext: Starting job: collect at ReceiverTracker.scala:270 14/05/02 21:52:26 INFO RecurringTimer: Started timer for JobGenerator at time 1399060346000 14/05/02 21:52:26 INFO JobGenerator: Started JobGenerator at 1399060346000 ms 14/05/02 21:52:26 INFO JobScheduler: Started JobScheduler 14/05/02 21:52:26 INFO DAGScheduler: Registering RDD 3 (reduceByKey at ReceiverTracker.scala:270) 14/05/02 21:52:26 INFO ReceiverTracker: Stream 0 received 0 blocks 14/05/02 21:52:26 INFO DAGScheduler: Got job 0 (collect at ReceiverTracker.scala:270) with 20 output partitions (allowLocal=false) 14/05/02 21:52:26 INFO DAGScheduler: Final stage: Stage 0(collect at ReceiverTracker.scala:270) 14/05/02 21:52:26 INFO DAGScheduler: Parents of final stage: List(Stage 1) 14/05/02 21:52:26 INFO JobScheduler: Added jobs for time 1399060346000 ms 14/05/02 21:52:26 INFO JobScheduler: Starting job streaming job 1399060346000 ms.0 from job set of time 1399060346000 ms 14/05/02 21:52:26 INFO JobGenerator: Checkpointing graph for time 1399060346000 ms ---14/05/02 21:52:26 INFO DStreamGraph: Updating checkpoint data for time 1399060346000 ms Time: 1399060346000 ms --- 14/05/02 21:52:26 INFO JobScheduler: Finished job streaming job 1399060346000
Spark's behavior
Hi TD, In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) code and a program that I wrote that sends words to the Spark worker, I use TCP as transport. I verified that after starting Spark, it connects to my source which actually starts sending, but the first word count is advertised approximately 30 seconds after the context creation. So I'm wondering where is stored the 30 seconds data already sent by the source. Is this a normal spark’s behaviour? I saw the same behaviour using the shipped JavaNetworkWordCount application. Many thanks. -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Spark's behavior
Hi TD, We are not using stream context with master local, we have 1 Master and 8 Workers and 1 word source. The command line that we are using is: bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount spark://192.168.0.13:7077 On Apr 30, 2014, at 0:09, Tathagata Das tathagata.das1...@gmail.com wrote: Is you batch size 30 seconds by any chance? Assuming not, please check whether you are creating the streaming context with master local[n] where n 2. With local or local[1], the system only has one processing slot, which is occupied by the receiver leaving no room for processing the received data. It could be that after 30 seconds, the server disconnects, the receiver terminates, releasing the single slot for the processing to proceed. TD On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) code and a program that I wrote that sends words to the Spark worker, I use TCP as transport. I verified that after starting Spark, it connects to my source which actually starts sending, but the first word count is advertised approximately 30 seconds after the context creation. So I'm wondering where is stored the 30 seconds data already sent by the source. Is this a normal spark’s behaviour? I saw the same behaviour using the shipped JavaNetworkWordCount application. Many thanks. -- Informativa sulla Privacy: http://www.unibs.it/node/8155 -- Informativa sulla Privacy: http://www.unibs.it/node/8155
Re: Spark's behavior
Strange! Can you just do lines.print() to print the raw data instead of doing word count. Beyond that we can do two things. 1. Can see the Spark stage UI to see whether there are stages running during the 30 second period you referred to? 2. If you upgrade to using Spark master branch (or Spark 1.0 RC3, see different thread by Patrick), it has a streaming UI, which shows the number of records received, the state of the receiver, etc. That may be more useful in debugging whats going on . TD On Tue, Apr 29, 2014 at 3:31 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, We are not using stream context with master local, we have 1 Master and 8 Workers and 1 word source. The command line that we are using is: bin/run-example org.apache.spark.streaming.examples.JavaNetworkWordCount spark://192.168.0.13:7077 On Apr 30, 2014, at 0:09, Tathagata Das tathagata.das1...@gmail.com wrote: Is you batch size 30 seconds by any chance? Assuming not, please check whether you are creating the streaming context with master local[n] where n 2. With local or local[1], the system only has one processing slot, which is occupied by the receiver leaving no room for processing the received data. It could be that after 30 seconds, the server disconnects, the receiver terminates, releasing the single slot for the processing to proceed. TD On Tue, Apr 29, 2014 at 2:28 PM, Eduardo Costa Alfaia e.costaalf...@unibs.it wrote: Hi TD, In my tests with spark streaming, I'm using JavaNetworkWordCount(modified) code and a program that I wrote that sends words to the Spark worker, I use TCP as transport. I verified that after starting Spark, it connects to my source which actually starts sending, but the first word count is advertised approximately 30 seconds after the context creation. So I'm wondering where is stored the 30 seconds data already sent by the source. Is this a normal spark’s behaviour? I saw the same behaviour using the shipped JavaNetworkWordCount application. Many thanks. -- Informativa sulla Privacy: http://www.unibs.it/node/8155 Informativa sulla Privacy: http://www.unibs.it/node/8155