Whether I use 1 or 2 machines, the results are the same... Here follows the results I got using 1 and 2 receivers with 2 machines:
2 machines, 1 receiver: sbt/sbt "run-main Benchmark 1 machine1 9999 1000" 2>&1 | grep -i "Total delay\|record" 15/04/13 16:41:34 INFO JobScheduler: Total delay: 0.156 s for time 1428939694000 ms (execution: 0.142 s) Received 92910 records 15/04/13 16:41:35 INFO JobScheduler: Total delay: 0.155 s for time 1428939695000 ms (execution: 0.142 s) Received 92910 records 15/04/13 16:41:36 INFO JobScheduler: Total delay: 0.132 s for time 1428939696000 ms (execution: 0.119 s) Received 92910 records 15/04/13 16:41:37 INFO JobScheduler: Total delay: 0.172 s for time 1428939697000 ms (execution: 0.161 s) Received 92910 records 15/04/13 16:41:38 INFO JobScheduler: Total delay: 0.152 s for time 1428939698000 ms (execution: 0.140 s) Received 92910 records 15/04/13 16:41:39 INFO JobScheduler: Total delay: 0.162 s for time 1428939699000 ms (execution: 0.149 s) Received 92910 records 15/04/13 16:41:40 INFO JobScheduler: Total delay: 0.156 s for time 1428939700000 ms (execution: 0.143 s) Received 92910 records 15/04/13 16:41:41 INFO JobScheduler: Total delay: 0.148 s for time 1428939701000 ms (execution: 0.135 s) Received 92910 records 15/04/13 16:41:42 INFO JobScheduler: Total delay: 0.149 s for time 1428939702000 ms (execution: 0.135 s) Received 92910 records 15/04/13 16:41:43 INFO JobScheduler: Total delay: 0.153 s for time 1428939703000 ms (execution: 0.136 s) Received 92910 records 15/04/13 16:41:44 INFO JobScheduler: Total delay: 0.118 s for time 1428939704000 ms (execution: 0.111 s) Received 92910 records 15/04/13 16:41:45 INFO JobScheduler: Total delay: 0.155 s for time 1428939705000 ms (execution: 0.143 s) Received 92910 records 15/04/13 16:41:46 INFO JobScheduler: Total delay: 0.138 s for time 1428939706000 ms (execution: 0.126 s) Received 92910 records 15/04/13 16:41:47 INFO JobScheduler: Total delay: 0.154 s for time 1428939707000 ms (execution: 0.142 s) Received 92910 records 15/04/13 16:41:48 INFO JobScheduler: Total delay: 0.172 s for time 1428939708000 ms (execution: 0.160 s) Received 92910 records 15/04/13 16:41:49 INFO JobScheduler: Total delay: 0.144 s for time 1428939709000 ms (execution: 0.133 s) Receiver Statistics - Receiver - Status - Location - Records in last batch - [2015/04/13 16:53:54] - Minimum rate - [records/sec] - Median rate - [records/sec] - Maximum rate - [records/sec] - Last Error Receiver-0---10-10-100- 2 machines, 2 receivers: sbt/sbt "run-main Benchmark 2 machine1 9999 1000" 2>&1 | grep -i "Total delay\|record" Received 92910 records 15/04/13 16:43:13 INFO JobScheduler: Total delay: 0.153 s for time 1428939793000 ms (execution: 0.142 s) Received 92910 records 15/04/13 16:43:14 INFO JobScheduler: Total delay: 0.144 s for time 1428939794000 ms (execution: 0.136 s) Received 92910 records 15/04/13 16:43:15 INFO JobScheduler: Total delay: 0.145 s for time 1428939795000 ms (execution: 0.132 s) Received 92910 records 15/04/13 16:43:16 INFO JobScheduler: Total delay: 0.144 s for time 1428939796000 ms (execution: 0.134 s) Received 92910 records 15/04/13 16:43:17 INFO JobScheduler: Total delay: 0.148 s for time 1428939797000 ms (execution: 0.142 s) Received 92910 records 15/04/13 16:43:18 INFO JobScheduler: Total delay: 0.136 s for time 1428939798000 ms (execution: 0.123 s) Received 92910 records 15/04/13 16:43:19 INFO JobScheduler: Total delay: 0.155 s for time 1428939799000 ms (execution: 0.145 s) Received 92910 records 15/04/13 16:43:20 INFO JobScheduler: Total delay: 0.160 s for time 1428939800000 ms (execution: 0.152 s) Received 83619 records 15/04/13 16:43:21 INFO JobScheduler: Total delay: 0.141 s for time 1428939801000 ms (execution: 0.131 s) Received 102201 records 15/04/13 16:43:22 INFO JobScheduler: Total delay: 0.208 s for time 1428939802000 ms (execution: 0.197 s) Received 83619 records 15/04/13 16:43:23 INFO JobScheduler: Total delay: 0.160 s for time 1428939803000 ms (execution: 0.147 s) Received 92910 records 15/04/13 16:43:24 INFO JobScheduler: Total delay: 0.197 s for time 1428939804000 ms (execution: 0.185 s) Received 92910 records 15/04/13 16:43:25 INFO JobScheduler: Total delay: 0.200 s for time 1428939805000 ms (execution: 0.189 s) Received 92910 records 15/04/13 16:43:26 INFO JobScheduler: Total delay: 0.181 s for time 1428939806000 ms (execution: 0.173 s) Received 92910 records 15/04/13 16:43:27 INFO JobScheduler: Total delay: 0.189 s for time 1428939807000 ms (execution: 0.178 s) Receiver Statistics - Receiver - Status - Location - Records in last batch - [2015/04/13 16:49:36] - Minimum rate - [records/sec] - Median rate - [records/sec] - Maximum rate - [records/sec] - Last Error Receiver-0---10-10-10-9-Receiver-1--0000- On Thu, Apr 9, 2015 at 7:55 PM, Tathagata Das <t...@databricks.com> wrote: > Are you running # of receivers = # machines? > > TD > > On Thu, Apr 9, 2015 at 9:56 AM, Saiph Kappa <saiph.ka...@gmail.com> wrote: > >> Sorry, I was getting those errors because my workload was not sustainable. >> >> However, I noticed that, by just running the spark-streaming-benchmark ( >> https://github.com/tdas/spark-streaming-benchmark/blob/master/Benchmark.scala >> ), I get no difference on the execution time, number of processed records, >> and delay whether I'm using 1 machine or 2 machines with the setup >> described before (using spark standalone). Is it normal? >> >> >> >> On Fri, Mar 27, 2015 at 5:32 PM, Tathagata Das <t...@databricks.com> >> wrote: >> >>> If it is deterministically reproducible, could you generate full DEBUG >>> level logs, from the driver and the workers and give it to me? Basically I >>> want to trace through what is happening to the block that is not being >>> found. >>> And can you tell what Cluster manager are you using? Spark Standalone, >>> Mesos or YARN? >>> >>> >>> On Fri, Mar 27, 2015 at 10:09 AM, Saiph Kappa <saiph.ka...@gmail.com> >>> wrote: >>> >>>> Hi, >>>> >>>> I am just running this simple example with >>>> machineA: 1 master + 1 worker >>>> machineB: 1 worker >>>> « >>>> val ssc = new StreamingContext(sparkConf, Duration(1000)) >>>> >>>> val rawStreams = (1 to numStreams).map(_ >>>> =>ssc.rawSocketStream[String](host, port, >>>> StorageLevel.MEMORY_ONLY_SER)).toArray >>>> val union = ssc.union(rawStreams) >>>> >>>> union.filter(line => Random.nextInt(1) == 0).map(line => { >>>> var sum = BigInt(0) >>>> line.toCharArray.foreach(chr => sum += chr.toInt) >>>> fib2(sum) >>>> sum >>>> }).reduceByWindow(_+_, Seconds(1),Seconds(1)).map(s => s"### >>>> result: $s").print() >>>> » >>>> >>>> And I'm getting the following exceptions: >>>> >>>> Log from machineB >>>> « >>>> 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task >>>> 132 >>>> 15/03/27 16:21:35 INFO Executor: Running task 0.0 in stage 27.0 (TID >>>> 132) >>>> 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task >>>> 134 >>>> 15/03/27 16:21:35 INFO Executor: Running task 2.0 in stage 27.0 (TID >>>> 134) >>>> 15/03/27 16:21:35 INFO TorrentBroadcast: Started reading broadcast >>>> variable 24 >>>> 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task >>>> 136 >>>> 15/03/27 16:21:35 INFO Executor: Running task 4.0 in stage 27.0 (TID >>>> 136) >>>> 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task >>>> 138 >>>> 15/03/27 16:21:35 INFO Executor: Running task 6.0 in stage 27.0 (TID >>>> 138) >>>> 15/03/27 16:21:35 INFO CoarseGrainedExecutorBackend: Got assigned task >>>> 140 >>>> 15/03/27 16:21:35 INFO Executor: Running task 8.0 in stage 27.0 (TID >>>> 140) >>>> 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(1886) called with >>>> curMem=47117, maxMem=280248975 >>>> 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24_piece0 stored as >>>> bytes in memory (estimated size 1886.0 B, free 267.2 MB) >>>> 15/03/27 16:21:35 INFO BlockManagerMaster: Updated info of block >>>> broadcast_24_piece0 >>>> 15/03/27 16:21:35 INFO TorrentBroadcast: Reading broadcast variable 24 >>>> took 19 ms >>>> 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(3104) called with >>>> curMem=49003, maxMem=280248975 >>>> 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24 stored as values >>>> in memory (estimated size 3.0 KB, free 267.2 MB) >>>> 15/03/27 16:21:35 ERROR Executor: Exception in task 8.0 in stage 27.0 >>>> (TID 140) >>>> java.lang.Exception: Could not compute split, block >>>> input-0-1427473262420 not found >>>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> at java.lang.Thread.run(Thread.java:701) >>>> 15/03/27 16:21:35 ERROR Executor: Exception in task 6.0 in stage 27.0 >>>> (TID 138) >>>> java.lang.Exception: Could not compute split, block >>>> input-0-1427473262418 not found >>>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >>>> at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> at java.lang.Thread.run(Thread.java:701) >>>> » >>>> >>>> Log from machineA >>>> « >>>> 15/03/27 16:21:35 INFO TorrentBroadcast: Reading broadcast variable 24 >>>> took 15 ms >>>> 15/03/27 16:21:35 INFO MemoryStore: ensureFreeSpace(3104) called with >>>> curMem=269989249, maxMem=280248975 >>>> 15/03/27 16:21:35 INFO MemoryStore: Block broadcast_24 stored as values >>>> in memory (estimated size 3.0 KB, free 9.8 MB) >>>> 15/03/27 16:21:35 ERROR Executor: Exception in task 3.0 in stage 27.0 >>>> (TID 135) >>>> java.lang.Exception: Could not compute split, block >>>> input-0-1427473262415 not found >>>> at org.apache.spark.rdd.BlockRDD.compute(BlockRDD.scala:51) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:87) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at >>>> org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at org.apache.spark.rdd.MappedRDD.compute(MappedRDD.scala:31) >>>> at >>>> org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:280) >>>> at org.apache.spark.rdd.RDD.iterator(RDD.scala:247) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68) >>>> at >>>> org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:56) >>>> at >>>> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:200) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) >>>> at java.lang.Thread.run(Thread.java:745) >>>> » >>>> >>>> It seems that blocks are not being shared between machines. Am I doing >>>> something wrong? >>>> >>>> Thanks, >>>> Sergio >>>> >>> >>> >> >