Can you show us the output of DStream#print() if you have it ? Thanks
On Tue, Mar 31, 2015 at 2:55 AM, Nicolas Phung <nicolas.ph...@gmail.com> wrote: > Hello, > > @Akhil Das I'm trying to use the experimental API > https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala > <https://www.google.com/url?q=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fexamples%2Fscala-2.10%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FDirectKafkaWordCount.scala&sa=D&sntz=1&usg=AFQjCNFOmScaSfP-2J4Zn56k86-jHUkYaw>. > I'm reusing the same code snippet to initialize my topicSet. > > @Cody Koeninger I don't see any previous error messages (see the full log > at the end). To create the topic, I'm doing the following : > > "kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 > --partitions 10 --topic toto" > > "kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 > --partitions 1 --topic toto-single" > > I'm launching my Spark Streaming in local mode. > > @Ted Yu There's no log "Couldn't connect to leader for topic", here's the > full version : > > "spark-submit --conf config.resource=application-integration.conf --class > nextgen.Main assembly-0.1-SNAPSHOT.jar > > 15/03/31 10:47:12 INFO SecurityManager: Changing view acls to: nphung > 15/03/31 10:47:12 INFO SecurityManager: Changing modify acls to: nphung > 15/03/31 10:47:12 INFO SecurityManager: SecurityManager: authentication > disabled; ui acls disabled; users with view permissions: Set(nphung); users > with modify permissions: Set(nphung) > 15/03/31 10:47:13 INFO Slf4jLogger: Slf4jLogger started > 15/03/31 10:47:13 INFO Remoting: Starting remoting > 15/03/31 10:47:13 INFO Remoting: Remoting started; listening on addresses > :[akka.tcp://sparkDriver@int.local:44180] > 15/03/31 10:47:13 INFO Remoting: Remoting now listens on addresses: > [akka.tcp://sparkDriver@int.local:44180] > 15/03/31 10:47:13 INFO Utils: Successfully started service 'sparkDriver' on > port 44180. > 15/03/31 10:47:13 INFO SparkEnv: Registering MapOutputTracker > 15/03/31 10:47:13 INFO SparkEnv: Registering BlockManagerMaster > 15/03/31 10:47:13 INFO DiskBlockManager: Created local directory at > /tmp/spark-local-20150331104713-2238 > 15/03/31 10:47:13 INFO MemoryStore: MemoryStore started with capacity 265.1 MB > 15/03/31 10:47:15 INFO HttpFileServer: HTTP File server directory is > /tmp/spark-2c8e34a0-bec3-4f1e-9fe7-83e08efc4f53 > 15/03/31 10:47:15 INFO HttpServer: Starting HTTP Server > 15/03/31 10:47:15 INFO Utils: Successfully started service 'HTTP file server' > on port 50204. > 15/03/31 10:47:15 INFO Utils: Successfully started service 'SparkUI' on port > 4040. > 15/03/31 10:47:15 INFO SparkUI: Started SparkUI at http://int.local:4040 > 15/03/31 10:47:16 INFO SparkContext: Added JAR > file:/home/nphung/assembly-0.1-SNAPSHOT.jar at > http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with timestamp > 1427791636151 > 15/03/31 10:47:16 INFO AkkaUtils: Connecting to HeartbeatReceiver: > akka.tcp://sparkDriver@int.local:44180/user/HeartbeatReceiver > 15/03/31 10:47:16 INFO NettyBlockTransferService: Server created on 40630 > 15/03/31 10:47:16 INFO BlockManagerMaster: Trying to register BlockManager > 15/03/31 10:47:16 INFO BlockManagerMasterActor: Registering block manager > localhost:40630 with 265.1 MB RAM, BlockManagerId(<driver>, localhost, 40630) > 15/03/31 10:47:16 INFO BlockManagerMaster: Registered BlockManager > 15/03/31 10:47:17 INFO EventLoggingListener: Logging events to > hdfs://int.local:8020/user/spark/applicationHistory/local-1427791636195 > 15/03/31 10:47:17 INFO VerifiableProperties: Verifying properties > 15/03/31 10:47:17 INFO VerifiableProperties: Property group.id is overridden > to > 15/03/31 10:47:17 INFO VerifiableProperties: Property zookeeper.connect is > overridden to > 15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1 > 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 > 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1 > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and validated > org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44 > 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms > 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null > 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms > 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.MappedDStream@7fd8c559 > 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms > 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null > 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms > 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.MappedDStream@44c13103 > 15/03/31 10:47:17 INFO ForEachDStream: Slide time = 2000 ms > 15/03/31 10:47:17 INFO ForEachDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/31 10:47:17 INFO ForEachDStream: Checkpoint interval = null > 15/03/31 10:47:17 INFO ForEachDStream: Remember duration = 2000 ms > 15/03/31 10:47:17 INFO ForEachDStream: Initialized and validated > org.apache.spark.streaming.dstream.ForEachDStream@8f2098e > 15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1 > 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 > 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 > 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1 > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level = > StorageLevel(false, false, false, false, 1) > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms > 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and validated > org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44 > 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms > 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null > 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms > 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.MappedDStream@7fd8c559 > 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms > 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null > 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms > 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.MappedDStream@44c13103 > 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms > 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null > 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms > 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated > org.apache.spark.streaming.dstream.MappedDStream@6c6366cf > 15/03/31 10:47:17 INFO ForEachDStream: Slide time = 2000 ms > 15/03/31 10:47:17 INFO ForEachDStream: Storage level = StorageLevel(false, > false, false, false, 1) > 15/03/31 10:47:17 INFO ForEachDStream: Checkpoint interval = null > 15/03/31 10:47:17 INFO ForEachDStream: Remember duration = 2000 ms > 15/03/31 10:47:17 INFO ForEachDStream: Initialized and validated > org.apache.spark.streaming.dstream.ForEachDStream@55a88417 > 15/03/31 10:47:17 INFO RecurringTimer: Started timer for JobGenerator at time > 1427791638000 > 15/03/31 10:47:17 INFO JobGenerator: Started JobGenerator at 1427791638000 ms > 15/03/31 10:47:17 INFO JobScheduler: Started JobScheduler > 15/03/31 10:47:18 INFO VerifiableProperties: Verifying properties > 15/03/31 10:47:18 INFO VerifiableProperties: Property group.id is overridden > to > 15/03/31 10:47:18 INFO VerifiableProperties: Property zookeeper.connect is > overridden to > 15/03/31 10:47:18 INFO JobScheduler: Added jobs for time 1427791638000 ms > 15/03/31 10:47:18 INFO JobScheduler: Starting job streaming job 1427791638000 > ms.0 from job set of time 1427791638000 ms > 15/03/31 10:47:18 INFO SparkContext: Starting job: print at Main.scala:59 > 15/03/31 10:47:18 INFO DAGScheduler: Got job 0 (print at Main.scala:59) with > 1 output partitions (allowLocal=true) > 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 0(print at > Main.scala:59) > 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List() > 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List() > 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at map > at Main.scala:50), which has no missing parents > 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(3568) called with > curMem=0, maxMem=278019440 > 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_0 stored as values in > memory (estimated size 3.5 KB, free 265.1 MB) > 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(1946) called with > curMem=3568, maxMem=278019440 > 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes > in memory (estimated size 1946.0 B, free 265.1 MB) > 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory > on localhost:40630 (size: 1946.0 B, free: 265.1 MB) > 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block > broadcast_0_piece0 > 15/03/31 10:47:18 INFO SparkContext: Created broadcast 0 from broadcast at > DAGScheduler.scala:838 > 15/03/31 10:47:18 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 > (MappedRDD[2] at map at Main.scala:50) > 15/03/31 10:47:18 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks > 15/03/31 10:47:18 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, > localhost, ANY, 1266 bytes) > 15/03/31 10:47:18 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) > 15/03/31 10:47:18 INFO Executor: Fetching > http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with timestamp > 1427791636151 > 15/03/31 10:47:18 INFO Utils: Fetching > http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar to > /tmp/fetchFileTemp4524847716079927807.tmp > 15/03/31 10:47:18 INFO Executor: Adding > file:/tmp/spark-d295d121-f359-4c7d-8725-e1f958ebff5a/assembly-0.1-SNAPSHOT.jar > to class loader > 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 6 > 15/03/31 10:47:18 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 570 > bytes result sent to driver > 15/03/31 10:47:18 INFO DAGScheduler: Stage 0 (print at Main.scala:59) > finished in 0,371 s > 15/03/31 10:47:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) > in 350 ms on localhost (1/1) > 15/03/31 10:47:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks > have all completed, from pool > 15/03/31 10:47:18 INFO DAGScheduler: Job 0 finished: print at Main.scala:59, > took 0,717929 s > 15/03/31 10:47:18 INFO SparkContext: Starting job: print at Main.scala:59 > 15/03/31 10:47:18 INFO DAGScheduler: Got job 1 (print at Main.scala:59) with > 4 output partitions (allowLocal=true) > 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 1(print at > Main.scala:59) > 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List() > 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List() > 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[2] at map > at Main.scala:50), which has no missing parents > 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(3568) called with > curMem=5514, maxMem=278019440 > 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_1 stored as values in > memory (estimated size 3.5 KB, free 265.1 MB) > 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(1946) called with > curMem=9082, maxMem=278019440 > 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes > in memory (estimated size 1946.0 B, free 265.1 MB) > 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory > on localhost:40630 (size: 1946.0 B, free: 265.1 MB) > 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block > broadcast_1_piece0 > 15/03/31 10:47:18 INFO SparkContext: Created broadcast 1 from broadcast at > DAGScheduler.scala:838 > 15/03/31 10:47:18 INFO DAGScheduler: Submitting 4 missing tasks from Stage 1 > (MappedRDD[2] at map at Main.scala:50) > 15/03/31 10:47:18 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks > 15/03/31 10:47:18 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, > localhost, ANY, 1266 bytes) > 15/03/31 10:47:18 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, > localhost, ANY, 1266 bytes) > 15/03/31 10:47:18 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3, > localhost, ANY, 1266 bytes) > 15/03/31 10:47:18 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 4, > localhost, ANY, 1266 bytes) > 15/03/31 10:47:18 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) > 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 3 > 15/03/31 10:47:18 INFO Executor: Running task 1.0 in stage 1.0 (TID 2) > 15/03/31 10:47:18 INFO Executor: Running task 2.0 in stage 1.0 (TID 3) > 15/03/31 10:47:18 INFO Executor: Running task 3.0 in stage 1.0 (TID 4) > 15/03/31 10:47:18 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 570 > bytes result sent to driver > 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 9 > 15/03/31 10:47:18 INFO Executor: Finished task 1.0 in stage 1.0 (TID 2). 570 > bytes result sent to driver > 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 8 > 15/03/31 10:47:18 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) > in 21 ms on localhost (1/4) > 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 0 > 15/03/31 10:47:18 INFO Executor: Finished task 3.0 in stage 1.0 (TID 4). 570 > bytes result sent to driver > 15/03/31 10:47:18 INFO Executor: Finished task 2.0 in stage 1.0 (TID 3). 570 > bytes result sent to driver > 15/03/31 10:47:18 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) > in 31 ms on localhost (2/4) > 15/03/31 10:47:18 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 4) > in 39 ms on localhost (3/4) > 15/03/31 10:47:18 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 3) > in 43 ms on localhost (4/4) > 15/03/31 10:47:18 INFO DAGScheduler: Stage 1 (print at Main.scala:59) > finished in 0,048 s > 15/03/31 10:47:18 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks > have all completed, from pool > 15/03/31 10:47:18 INFO DAGScheduler: Job 1 finished: print at Main.scala:59, > took 0,073960 s > 15/03/31 10:47:18 INFO SparkContext: Starting job: print at Main.scala:59 > 15/03/31 10:47:18 INFO DAGScheduler: Got job 2 (print at Main.scala:59) with > 5 output partitions (allowLocal=true) > 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 2(print at > Main.scala:59) > 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List() > 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List() > 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 2 (MappedRDD[2] at map > at Main.scala:50), which has no missing parents > 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(3568) called with > curMem=11028, maxMem=278019440 > 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_2 stored as values in > memory (estimated size 3.5 KB, free 265.1 MB) > 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(1946) called with > curMem=14596, maxMem=278019440 > 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes > in memory (estimated size 1946.0 B, free 265.1 MB) > 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory > on localhost:40630 (size: 1946.0 B, free: 265.1 MB) > 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block > broadcast_2_piece0 > 15/03/31 10:47:18 INFO SparkContext: Created broadcast 2 from broadcast at > DAGScheduler.scala:838 > 15/03/31 10:47:18 INFO DAGScheduler: Submitting 5 missing tasks from Stage 2 > (MappedRDD[2] at map at Main.scala:50) > 15/03/31 10:47:18 INFO TaskSchedulerImpl: Adding task set 2.0 with 5 tasks > 15/03/31 10:47:18 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 5, > localhost, ANY, 1266 bytes) > 15/03/31 10:47:18 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 6, > localhost, ANY, 1266 bytes) > 15/03/31 10:47:18 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID 7, > localhost, ANY, 1266 bytes) > 15/03/31 10:47:18 INFO TaskSetManager: Starting task 3.0 in stage 2.0 (TID 8, > localhost, ANY, 1266 bytes) > 15/03/31 10:47:18 INFO Executor: Running task 0.0 in stage 2.0 (TID 5) > 15/03/31 10:47:18 INFO Executor: Running task 2.0 in stage 2.0 (TID 7) > 15/03/31 10:47:18 INFO Executor: Running task 1.0 in stage 2.0 (TID 6) > 15/03/31 10:47:18 INFO Executor: Running task 3.0 in stage 2.0 (TID 8) > 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 2 > 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 5 > 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 7 > 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 1 > 15/03/31 10:47:18 INFO Executor: Finished task 0.0 in stage 2.0 (TID 5). 570 > bytes result sent to driver > 15/03/31 10:47:18 INFO Executor: Finished task 1.0 in stage 2.0 (TID 6). 570 > bytes result sent to driver > 15/03/31 10:47:18 INFO TaskSetManager: Starting task 4.0 in stage 2.0 (TID 9, > localhost, ANY, 1266 bytes) > 15/03/31 10:47:18 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 5) > in 12 ms on localhost (1/5) > 15/03/31 10:47:18 INFO Executor: Running task 4.0 in stage 2.0 (TID 9) > 15/03/31 10:47:18 INFO Executor: Finished task 3.0 in stage 2.0 (TID 8). 570 > bytes result sent to driver > 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 4 > 15/03/31 10:47:18 INFO Executor: Finished task 2.0 in stage 2.0 (TID 7). 570 > bytes result sent to driver > 15/03/31 10:47:18 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 6) > in 18 ms on localhost (2/5) > 15/03/31 10:47:18 INFO Executor: Finished task 4.0 in stage 2.0 (TID 9). 570 > bytes result sent to driver > 15/03/31 10:47:18 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID 8) > in 23 ms on localhost (3/5) > 15/03/31 10:47:18 INFO TaskSetManager: Finished task 2.0 in stage 2.0 (TID 7) > in 27 ms on localhost (4/5) > 15/03/31 10:47:18 INFO TaskSetManager: Finished task 4.0 in stage 2.0 (TID 9) > in 21 ms on localhost (5/5) > 15/03/31 10:47:18 INFO DAGScheduler: Stage 2 (print at Main.scala:59) > finished in 0,033 s > 15/03/31 10:47:18 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks > have all completed, from pool > ------------------------------------------- > Time: 1427791638000 ms > ------------------------------------------- > 15/03/31 10:47:18 INFO DAGScheduler: Job 2 finished: print at Main.scala:59, > took 0,057381 s > > 15/03/31 10:47:18 INFO JobScheduler: Finished job streaming job 1427791638000 > ms.0 from job set of time 1427791638000 ms > 15/03/31 10:47:18 INFO JobScheduler: Starting job streaming job 1427791638000 > ms.1 from job set of time 1427791638000 ms > 15/03/31 10:47:18 INFO SparkContext: Starting job: foreachRDD at Main.scala:70 > 15/03/31 10:47:18 INFO DAGScheduler: Got job 3 (foreachRDD at Main.scala:70) > with 10 output partitions (allowLocal=false) > 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 3(foreachRDD at > Main.scala:70) > 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List() > 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List() > 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 3 (MappedRDD[3] at map > at Main.scala:62), which has no missing parents > 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(5064) called with > curMem=16542, maxMem=278019440 > 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_3 stored as values in > memory (estimated size 4.9 KB, free 265.1 MB) > 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(2791) called with > curMem=21606, maxMem=278019440 > 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes > in memory (estimated size 2.7 KB, free 265.1 MB) > 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory > on localhost:40630 (size: 2.7 KB, free: 265.1 MB) > 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block > broadcast_3_piece0 > 15/03/31 10:47:19 INFO SparkContext: Created broadcast 3 from broadcast at > DAGScheduler.scala:838 > 15/03/31 10:47:19 INFO DAGScheduler: Submitting 10 missing tasks from Stage 3 > (MappedRDD[3] at map at Main.scala:62) > 15/03/31 10:47:19 INFO TaskSchedulerImpl: Adding task set 3.0 with 10 tasks > 15/03/31 10:47:19 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID > 10, localhost, ANY, 1266 bytes) > 15/03/31 10:47:19 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID > 11, localhost, ANY, 1266 bytes) > 15/03/31 10:47:19 INFO TaskSetManager: Starting task 2.0 in stage 3.0 (TID > 12, localhost, ANY, 1266 bytes) > 15/03/31 10:47:19 INFO TaskSetManager: Starting task 3.0 in stage 3.0 (TID > 13, localhost, ANY, 1266 bytes) > 15/03/31 10:47:19 INFO Executor: Running task 0.0 in stage 3.0 (TID 10) > 15/03/31 10:47:19 INFO Executor: Running task 2.0 in stage 3.0 (TID 12) > 15/03/31 10:47:19 INFO Executor: Running task 1.0 in stage 3.0 (TID 11) > 15/03/31 10:47:19 INFO Executor: Running task 3.0 in stage 3.0 (TID 13) > 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 6 > 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 3 > 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 9 > 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 0 > 15/03/31 10:47:19 INFO BlockManager: Removing broadcast 2 > 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_2_piece0 > 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_2_piece0 of size 1946 > dropped from memory (free 277996989) > 15/03/31 10:47:19 INFO BlockManagerInfo: Removed broadcast_2_piece0 on > localhost:40630 in memory (size: 1946.0 B, free: 265.1 MB) > 15/03/31 10:47:19 INFO BlockManagerMaster: Updated info of block > broadcast_2_piece0 > 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_2 > 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_2 of size 3568 dropped > from memory (free 278000557) > 15/03/31 10:47:19 INFO ContextCleaner: Cleaned broadcast 2 > 15/03/31 10:47:19 INFO BlockManager: Removing broadcast 1 > 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_1 > 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_1 of size 3568 dropped > from memory (free 278004125) > 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_1_piece0 > 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_1_piece0 of size 1946 > dropped from memory (free 278006071) > 15/03/31 10:47:19 INFO BlockManagerInfo: Removed broadcast_1_piece0 on > localhost:40630 in memory (size: 1946.0 B, free: 265.1 MB) > 15/03/31 10:47:19 INFO BlockManagerMaster: Updated info of block > broadcast_1_piece0 > 15/03/31 10:47:19 INFO ContextCleaner: Cleaned broadcast 1 > 15/03/31 10:47:19 INFO BlockManager: Removing broadcast 0 > 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_0 > 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_0 of size 3568 dropped > from memory (free 278009639) > 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_0_piece0 > 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_0_piece0 of size 1946 > dropped from memory (free 278011585) > 15/03/31 10:47:19 INFO BlockManagerInfo: Removed broadcast_0_piece0 on > localhost:40630 in memory (size: 1946.0 B, free: 265.1 MB) > 15/03/31 10:47:19 INFO BlockManagerMaster: Updated info of block > broadcast_0_piece0 > 15/03/31 10:47:19 INFO ContextCleaner: Cleaned broadcast 0 > 15/03/31 10:47:19 INFO Executor: Finished task 3.0 in stage 3.0 (TID 13). 569 > bytes result sent to driver > 15/03/31 10:47:19 INFO Executor: Finished task 0.0 in stage 3.0 (TID 10). 569 > bytes result sent to driver > 15/03/31 10:47:19 INFO TaskSetManager: Starting task 4.0 in stage 3.0 (TID > 14, localhost, ANY, 1266 bytes) > 15/03/31 10:47:19 INFO Executor: Running task 4.0 in stage 3.0 (TID 14) > 15/03/31 10:47:19 INFO TaskSetManager: Finished task 3.0 in stage 3.0 (TID > 13) in 303 ms on localhost (1/10) > 15/03/31 10:47:19 INFO Executor: Finished task 1.0 in stage 3.0 (TID 11). 569 > bytes result sent to driver > 15/03/31 10:47:19 INFO Executor: Finished task 2.0 in stage 3.0 (TID 12). 569 > bytes result sent to driver > 15/03/31 10:47:19 INFO TaskSetManager: Starting task 5.0 in stage 3.0 (TID > 15, localhost, ANY, 1266 bytes) > 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 8 > 15/03/31 10:47:19 INFO Executor: Running task 5.0 in stage 3.0 (TID 15) > 15/03/31 10:47:19 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID > 10) in 312 ms on localhost (2/10) > 15/03/31 10:47:19 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID > 11) in 314 ms on localhost (3/10) > 15/03/31 10:47:19 INFO TaskSetManager: Starting task 6.0 in stage 3.0 (TID > 16, localhost, ANY, 1266 bytes) > 15/03/31 10:47:19 INFO TaskSetManager: Starting task 7.0 in stage 3.0 (TID > 17, localhost, ANY, 1266 bytes) > 15/03/31 10:47:19 INFO Executor: Running task 6.0 in stage 3.0 (TID 16) > 15/03/31 10:47:19 INFO Executor: Running task 7.0 in stage 3.0 (TID 17) > 15/03/31 10:47:19 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID > 12) in 317 ms on localhost (4/10) > 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 5 > 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 1 > 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 2 > 15/03/31 10:47:19 INFO Executor: Finished task 5.0 in stage 3.0 (TID 15). 569 > bytes result sent to driver > 15/03/31 10:47:19 INFO TaskSetManager: Starting task 8.0 in stage 3.0 (TID > 18, localhost, ANY, 1266 bytes) > 15/03/31 10:47:19 INFO Executor: Finished task 7.0 in stage 3.0 (TID 17). 569 > bytes result sent to driver > 15/03/31 10:47:19 INFO TaskSetManager: Starting task 9.0 in stage 3.0 (TID > 19, localhost, ANY, 1266 bytes) > 15/03/31 10:47:19 INFO Executor: Finished task 4.0 in stage 3.0 (TID 14). 569 > bytes result sent to driver > 15/03/31 10:47:19 INFO Executor: Running task 9.0 in stage 3.0 (TID 19) > 15/03/31 10:47:19 INFO TaskSetManager: Finished task 7.0 in stage 3.0 (TID > 17) in 40 ms on localhost (5/10) > 15/03/31 10:47:19 INFO Executor: Finished task 6.0 in stage 3.0 (TID 16). 569 > bytes result sent to driver > 15/03/31 10:47:19 INFO Executor: Running task 8.0 in stage 3.0 (TID 18) > 15/03/31 10:47:19 INFO TaskSetManager: Finished task 5.0 in stage 3.0 (TID > 15) in 53 ms on localhost (6/10) > 15/03/31 10:47:19 INFO TaskSetManager: Finished task 4.0 in stage 3.0 (TID > 14) in 59 ms on localhost (7/10) > 15/03/31 10:47:19 INFO TaskSetManager: Finished task 6.0 in stage 3.0 (TID > 16) in 50 ms on localhost (8/10) > 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 4 > 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 7 > 15/03/31 10:47:19 INFO Executor: Finished task 8.0 in stage 3.0 (TID 18). 569 > bytes result sent to driver > 15/03/31 10:47:19 INFO Executor: Finished task 9.0 in stage 3.0 (TID 19). 569 > bytes result sent to driver > 15/03/31 10:47:19 INFO TaskSetManager: Finished task 8.0 in stage 3.0 (TID > 18) in 49 ms on localhost (9/10) > 15/03/31 10:47:19 INFO TaskSetManager: Finished task 9.0 in stage 3.0 (TID > 19) in 50 ms on localhost (10/10) > 15/03/31 10:47:19 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks > have all completed, from pool > 15/03/31 10:47:19 INFO DAGScheduler: Stage 3 (foreachRDD at Main.scala:70) > finished in 0,410 s > 15/03/31 10:47:19 INFO DAGScheduler: Job 3 finished: foreachRDD at > Main.scala:70, took 0,438664 s > 15/03/31 10:47:19 INFO JobScheduler: Finished job streaming job 1427791638000 > ms.1 from job set of time 1427791638000 ms > 15/03/31 10:47:19 INFO JobScheduler: Total delay: 1,420 s for time > 1427791638000 ms (execution: 1,372 s) > 15/03/31 10:47:19 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() > 15/03/31 10:47:20 INFO JobScheduler: Starting job streaming job 1427791640000 > ms.0 from job set of time 1427791640000 ms > 15/03/31 10:47:20 INFO JobScheduler: Added jobs for time 1427791640000 ms > 15/03/31 10:47:20 INFO SparkContext: Starting job: print at Main.scala:59 > 15/03/31 10:47:20 INFO DAGScheduler: Got job 4 (print at Main.scala:59) with > 1 output partitions (allowLocal=true) > 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 4(print at > Main.scala:59) > 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List() > 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List() > 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 4 (MappedRDD[6] at map > at Main.scala:50), which has no missing parents > 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(3568) called with > curMem=7855, maxMem=278019440 > 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_4 stored as values in > memory (estimated size 3.5 KB, free 265.1 MB) > 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(1945) called with > curMem=11423, maxMem=278019440 > 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes > in memory (estimated size 1945.0 B, free 265.1 MB) > 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory > on localhost:40630 (size: 1945.0 B, free: 265.1 MB) > 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block > broadcast_4_piece0 > 15/03/31 10:47:20 INFO SparkContext: Created broadcast 4 from getCallSite at > DStream.scala:294 > 15/03/31 10:47:20 INFO DAGScheduler: Submitting 1 missing tasks from Stage 4 > (MappedRDD[6] at map at Main.scala:50) > 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID > 20, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 4.0 (TID 20) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 6 > 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 4.0 (TID 20). 570 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID > 20) in 6 ms on localhost (1/1) > 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks > have all completed, from pool > 15/03/31 10:47:20 INFO DAGScheduler: Stage 4 (print at Main.scala:59) > finished in 0,007 s > 15/03/31 10:47:20 INFO DAGScheduler: Job 4 finished: print at Main.scala:59, > took 0,021293 s > 15/03/31 10:47:20 INFO SparkContext: Starting job: print at Main.scala:59 > 15/03/31 10:47:20 INFO DAGScheduler: Got job 5 (print at Main.scala:59) with > 4 output partitions (allowLocal=true) > 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 5(print at > Main.scala:59) > 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List() > 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List() > 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 5 (MappedRDD[6] at map > at Main.scala:50), which has no missing parents > 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(3568) called with > curMem=13368, maxMem=278019440 > 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_5 stored as values in > memory (estimated size 3.5 KB, free 265.1 MB) > 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(1945) called with > curMem=16936, maxMem=278019440 > 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes > in memory (estimated size 1945.0 B, free 265.1 MB) > 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory > on localhost:40630 (size: 1945.0 B, free: 265.1 MB) > 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block > broadcast_5_piece0 > 15/03/31 10:47:20 INFO SparkContext: Created broadcast 5 from broadcast at > DAGScheduler.scala:838 > 15/03/31 10:47:20 INFO DAGScheduler: Submitting 4 missing tasks from Stage 5 > (MappedRDD[6] at map at Main.scala:50) > 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 5.0 with 4 tasks > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID > 21, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 1.0 in stage 5.0 (TID > 22, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 2.0 in stage 5.0 (TID > 23, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 3.0 in stage 5.0 (TID > 24, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 5.0 (TID 21) > 15/03/31 10:47:20 INFO Executor: Running task 2.0 in stage 5.0 (TID 23) > 15/03/31 10:47:20 INFO Executor: Running task 3.0 in stage 5.0 (TID 24) > 15/03/31 10:47:20 INFO Executor: Running task 1.0 in stage 5.0 (TID 22) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 0 > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 8 > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 9 > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 3 > 15/03/31 10:47:20 INFO Executor: Finished task 2.0 in stage 5.0 (TID 23). 570 > bytes result sent to driver > 15/03/31 10:47:20 INFO Executor: Finished task 3.0 in stage 5.0 (TID 24). 570 > bytes result sent to driver > 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 5.0 (TID 21). 570 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 2.0 in stage 5.0 (TID > 23) in 7 ms on localhost (1/4) > 15/03/31 10:47:20 INFO Executor: Finished task 1.0 in stage 5.0 (TID 22). 570 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 3.0 in stage 5.0 (TID > 24) in 10 ms on localhost (2/4) > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID > 21) in 14 ms on localhost (3/4) > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 1.0 in stage 5.0 (TID > 22) in 16 ms on localhost (4/4) > 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks > have all completed, from pool > 15/03/31 10:47:20 INFO DAGScheduler: Stage 5 (print at Main.scala:59) > finished in 0,020 s > 15/03/31 10:47:20 INFO DAGScheduler: Job 5 finished: print at Main.scala:59, > took 0,034514 s > 15/03/31 10:47:20 INFO SparkContext: Starting job: print at Main.scala:59 > 15/03/31 10:47:20 INFO DAGScheduler: Got job 6 (print at Main.scala:59) with > 5 output partitions (allowLocal=true) > 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 6(print at > Main.scala:59) > 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List() > 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List() > 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 6 (MappedRDD[6] at map > at Main.scala:50), which has no missing parents > 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(3568) called with > curMem=18881, maxMem=278019440 > 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_6 stored as values in > memory (estimated size 3.5 KB, free 265.1 MB) > 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(1945) called with > curMem=22449, maxMem=278019440 > 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes > in memory (estimated size 1945.0 B, free 265.1 MB) > 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory > on localhost:40630 (size: 1945.0 B, free: 265.1 MB) > 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block > broadcast_6_piece0 > 15/03/31 10:47:20 INFO SparkContext: Created broadcast 6 from broadcast at > DAGScheduler.scala:838 > 15/03/31 10:47:20 INFO DAGScheduler: Submitting 5 missing tasks from Stage 6 > (MappedRDD[6] at map at Main.scala:50) > 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 6.0 with 5 tasks > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID > 25, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID > 26, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID > 27, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 3.0 in stage 6.0 (TID > 28, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 6.0 (TID 25) > 15/03/31 10:47:20 INFO Executor: Running task 2.0 in stage 6.0 (TID 27) > 15/03/31 10:47:20 INFO Executor: Running task 3.0 in stage 6.0 (TID 28) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 2 > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 1 > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 7 > 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 6.0 (TID 25). 570 > bytes result sent to driver > 15/03/31 10:47:20 INFO Executor: Running task 1.0 in stage 6.0 (TID 26) > 15/03/31 10:47:20 INFO Executor: Finished task 3.0 in stage 6.0 (TID 28). 570 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 4.0 in stage 6.0 (TID > 29, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 5 > 15/03/31 10:47:20 INFO Executor: Finished task 2.0 in stage 6.0 (TID 27). 570 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID > 25) in 8 ms on localhost (1/5) > 15/03/31 10:47:20 INFO Executor: Finished task 1.0 in stage 6.0 (TID 26). 570 > bytes result sent to driver > 15/03/31 10:47:20 INFO Executor: Running task 4.0 in stage 6.0 (TID 29) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 4 > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 3.0 in stage 6.0 (TID > 28) in 10 ms on localhost (2/5) > 15/03/31 10:47:20 INFO Executor: Finished task 4.0 in stage 6.0 (TID 29). 570 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 2.0 in stage 6.0 (TID > 27) in 13 ms on localhost (3/5) > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 1.0 in stage 6.0 (TID > 26) in 15 ms on localhost (4/5) > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 4.0 in stage 6.0 (TID > 29) in 12 ms on localhost (5/5) > 15/03/31 10:47:20 INFO DAGScheduler: Stage 6 (print at Main.scala:59) > finished in 0,019 s > 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks > have all completed, from pool > ------------------------------------------- > Time: 1427791640000 ms > ------------------------------------------- > > 15/03/31 10:47:20 INFO DAGScheduler: Job 6 finished: print at Main.scala:59, > took 0,034922 s > 15/03/31 10:47:20 INFO JobScheduler: Finished job streaming job 1427791640000 > ms.0 from job set of time 1427791640000 ms > 15/03/31 10:47:20 INFO SparkContext: Starting job: foreachRDD at Main.scala:70 > 15/03/31 10:47:20 INFO DAGScheduler: Got job 7 (foreachRDD at Main.scala:70) > with 10 output partitions (allowLocal=false) > 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 7(foreachRDD at > Main.scala:70) > 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List() > 15/03/31 10:47:20 INFO JobScheduler: Starting job streaming job 1427791640000 > ms.1 from job set of time 1427791640000 ms > 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List() > 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 7 (MappedRDD[7] at map > at Main.scala:62), which has no missing parents > 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(5064) called with > curMem=24394, maxMem=278019440 > 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_7 stored as values in > memory (estimated size 4.9 KB, free 265.1 MB) > 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(2792) called with > curMem=29458, maxMem=278019440 > 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes > in memory (estimated size 2.7 KB, free 265.1 MB) > 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory > on localhost:40630 (size: 2.7 KB, free: 265.1 MB) > 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block > broadcast_7_piece0 > 15/03/31 10:47:20 INFO SparkContext: Created broadcast 7 from getCallSite at > DStream.scala:294 > 15/03/31 10:47:20 INFO DAGScheduler: Submitting 10 missing tasks from Stage 7 > (MappedRDD[7] at map at Main.scala:62) > 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 7.0 with 10 tasks > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 7.0 (TID > 30, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 1.0 in stage 7.0 (TID > 31, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 2.0 in stage 7.0 (TID > 32, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 3.0 in stage 7.0 (TID > 33, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 7.0 (TID 30) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 6 > 15/03/31 10:47:20 INFO Executor: Running task 1.0 in stage 7.0 (TID 31) > 15/03/31 10:47:20 INFO Executor: Running task 2.0 in stage 7.0 (TID 32) > 15/03/31 10:47:20 INFO Executor: Running task 3.0 in stage 7.0 (TID 33) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 9 > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 0 > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 3 > 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 7.0 (TID 30). 569 > bytes result sent to driver > 15/03/31 10:47:20 INFO Executor: Finished task 2.0 in stage 7.0 (TID 32). 569 > bytes result sent to driver > 15/03/31 10:47:20 INFO Executor: Finished task 1.0 in stage 7.0 (TID 31). 569 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 4.0 in stage 7.0 (TID > 34, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO Executor: Running task 4.0 in stage 7.0 (TID 34) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 5.0 in stage 7.0 (TID > 35, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 6.0 in stage 7.0 (TID > 36, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 8 > 15/03/31 10:47:20 INFO Executor: Running task 6.0 in stage 7.0 (TID 36) > 15/03/31 10:47:20 INFO Executor: Finished task 3.0 in stage 7.0 (TID 33). 569 > bytes result sent to driver > 15/03/31 10:47:20 INFO Executor: Running task 5.0 in stage 7.0 (TID 35) > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID > 30) in 34 ms on localhost (1/10) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 5 > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 1.0 in stage 7.0 (TID > 31) in 42 ms on localhost (2/10) > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 7.0 in stage 7.0 (TID > 37, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 2 > 15/03/31 10:47:20 INFO Executor: Running task 7.0 in stage 7.0 (TID 37) > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 2.0 in stage 7.0 (TID > 32) in 45 ms on localhost (3/10) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 1 > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 3.0 in stage 7.0 (TID > 33) in 46 ms on localhost (4/10) > 15/03/31 10:47:20 INFO Executor: Finished task 4.0 in stage 7.0 (TID 34). 569 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 8.0 in stage 7.0 (TID > 38, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO Executor: Running task 8.0 in stage 7.0 (TID 38) > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 4.0 in stage 7.0 (TID > 34) in 31 ms on localhost (5/10) > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 7 > 15/03/31 10:47:20 INFO Executor: Finished task 6.0 in stage 7.0 (TID 36). 569 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Starting task 9.0 in stage 7.0 (TID > 39, localhost, ANY, 1266 bytes) > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 6.0 in stage 7.0 (TID > 36) in 35 ms on localhost (6/10) > 15/03/31 10:47:20 INFO Executor: Running task 9.0 in stage 7.0 (TID 39) > 15/03/31 10:47:20 INFO Executor: Finished task 5.0 in stage 7.0 (TID 35). 569 > bytes result sent to driver > 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 4 > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 5.0 in stage 7.0 (TID > 35) in 39 ms on localhost (7/10) > 15/03/31 10:47:20 INFO Executor: Finished task 7.0 in stage 7.0 (TID 37). 569 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 7.0 in stage 7.0 (TID > 37) in 36 ms on localhost (8/10) > 15/03/31 10:47:20 INFO Executor: Finished task 8.0 in stage 7.0 (TID 38). 569 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 8.0 in stage 7.0 (TID > 38) in 26 ms on localhost (9/10) > 15/03/31 10:47:20 INFO Executor: Finished task 9.0 in stage 7.0 (TID 39). 569 > bytes result sent to driver > 15/03/31 10:47:20 INFO TaskSetManager: Finished task 9.0 in stage 7.0 (TID > 39) in 27 ms on localhost (10/10) > 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 7.0, whose tasks > have all completed, from pool > 15/03/31 10:47:20 INFO DAGScheduler: Stage 7 (foreachRDD at Main.scala:70) > finished in 0,098 s > 15/03/31 10:47:20 INFO DAGScheduler: Job 7 finished: foreachRDD at > Main.scala:70, took 0,111899 s > 15/03/31 10:47:20 INFO JobScheduler: Finished job streaming job 1427791640000 > ms.1 from job set of time 1427791640000 ms > 15/03/31 10:47:20 INFO JobScheduler: Total delay: 0,231 s for time > 1427791640000 ms (execution: 0,213 s) > 15/03/31 10:47:20 INFO MappedRDD: Removing RDD 2 from persistence list > 15/03/31 10:47:20 INFO BlockManager: Removing RDD 2 > 15/03/31 10:47:20 INFO MappedRDD: Removing RDD 1 from persistence list > 15/03/31 10:47:20 INFO KafkaRDD: Removing RDD 0 from persistence list > 15/03/31 10:47:20 INFO BlockManager: Removing RDD 0 > 15/03/31 10:47:20 INFO MappedRDD: Removing RDD 3 from persistence list > 15/03/31 10:47:20 INFO BlockManager: Removing RDD 3 > 15/03/31 10:47:20 INFO BlockManager: Removing RDD 1 > 15/03/31 10:47:20 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() > 15/03/31 10:47:22 INFO JobScheduler: Added jobs for time 1427791642000 ms > 15/03/31 10:47:22 INFO JobScheduler: Starting job streaming job 1427791642000 > ms.0 from job set of time 1427791642000 ms > 15/03/31 10:47:22 INFO SparkContext: Starting job: print at Main.scala:59 > 15/03/31 10:47:22 INFO DAGScheduler: Got job 8 (print at Main.scala:59) with > 1 output partitions (allowLocal=true) > 15/03/31 10:47:22 INFO DAGScheduler: Final stage: Stage 8(print at > Main.scala:59) > 15/03/31 10:47:22 INFO DAGScheduler: Parents of final stage: List() > 15/03/31 10:47:22 INFO DAGScheduler: Missing parents: List() > 15/03/31 10:47:22 INFO DAGScheduler: Submitting Stage 8 (MappedRDD[10] at map > at Main.scala:50), which has no missing parents > 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(3568) called with > curMem=32250, maxMem=278019440 > 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_8 stored as values in > memory (estimated size 3.5 KB, free 265.1 MB) > 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(1949) called with > curMem=35818, maxMem=278019440 > 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes > in memory (estimated size 1949.0 B, free 265.1 MB) > 15/03/31 10:47:22 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory > on localhost:40630 (size: 1949.0 B, free: 265.1 MB) > 15/03/31 10:47:22 INFO BlockManagerMaster: Updated info of block > broadcast_8_piece0 > 15/03/31 10:47:22 INFO SparkContext: Created broadcast 8 from broadcast at > DAGScheduler.scala:838 > 15/03/31 10:47:22 INFO DAGScheduler: Submitting 1 missing tasks from Stage 8 > (MappedRDD[10] at map at Main.scala:50) > 15/03/31 10:47:22 INFO TaskSchedulerImpl: Adding task set 8.0 with 1 tasks > 15/03/31 10:47:22 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID > 40, localhost, ANY, 1266 bytes) > 15/03/31 10:47:22 INFO Executor: Running task 0.0 in stage 8.0 (TID 40) > 15/03/31 10:47:22 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 6 > 15/03/31 10:47:22 INFO Executor: Finished task 0.0 in stage 8.0 (TID 40). 570 > bytes result sent to driver > 15/03/31 10:47:22 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID > 40) in 6 ms on localhost (1/1) > 15/03/31 10:47:22 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks > have all completed, from pool > 15/03/31 10:47:22 INFO DAGScheduler: Stage 8 (print at Main.scala:59) > finished in 0,008 s > 15/03/31 10:47:22 INFO DAGScheduler: Job 8 finished: print at Main.scala:59, > took 0,021082 s > 15/03/31 10:47:22 INFO SparkContext: Starting job: print at Main.scala:59 > 15/03/31 10:47:22 INFO DAGScheduler: Got job 9 (print at Main.scala:59) with > 4 output partitions (allowLocal=true) > 15/03/31 10:47:22 INFO DAGScheduler: Final stage: Stage 9(print at > Main.scala:59) > 15/03/31 10:47:22 INFO DAGScheduler: Parents of final stage: List() > 15/03/31 10:47:22 INFO DAGScheduler: Missing parents: List() > 15/03/31 10:47:22 INFO DAGScheduler: Submitting Stage 9 (MappedRDD[10] at map > at Main.scala:50), which has no missing parents > 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(3568) called with > curMem=37767, maxMem=278019440 > 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_9 stored as values in > memory (estimated size 3.5 KB, free 265.1 MB) > 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(1949) called with > curMem=41335, maxMem=278019440 > 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes > in memory (estimated size 1949.0 B, free 265.1 MB) > 15/03/31 10:47:22 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory > on localhost:40630 (size: 1949.0 B, free: 265.1 MB) > 15/03/31 10:47:22 INFO BlockManagerMaster: Updated info of block > broadcast_9_piece0 > 15/03/31 10:47:22 INFO SparkContext: Created broadcast 9 from broadcast at > DAGScheduler.scala:838 > 15/03/31 10:47:22 INFO DAGScheduler: Submitting 4 missing tasks from Stage 9 > (MappedRDD[10] at map at Main.scala:50) > 15/03/31 10:47:22 INFO TaskSchedulerImpl: Adding task set 9.0 with 4 tasks > 15/03/31 10:47:22 INFO TaskSetManager: Starting task 0.0 in stage 9.0 (TID > 41, localhost, ANY, 1266 bytes) > 15/03/31 10:47:22 INFO TaskSetManager: Starting task 1.0 in stage 9.0 (TID > 42, localhost, ANY, 1266 bytes) > 15/03/31 10:47:22 INFO TaskSetManager: Starting task 2.0 in stage 9.0 (TID > 43, localhost, ANY, 1266 bytes) > 15/03/31 10:47:22 INFO TaskSetManager: Starting task 3.0 in stage 9.0 (TID > 44, localhost, ANY, 1266 bytes) > 15/03/31 10:47:22 INFO Executor: Running task 2.0 in stage 9.0 (TID 43) > 15/03/31 10:47:22 INFO Executor: Running task 1.0 in stage 9.0 (TID 42) > 15/03/31 10:47:22 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 0 > 15/03/31 10:47:22 INFO Executor: Running task 0.0 in stage 9.0 (TID 41) > 15/03/31 10:47:22 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the > same as ending offset skipping toto 3 > 15/03/31 10:47:22 INFO Executor: Finished task 2.0 in stage 9.0 (TID 43). 570 > bytes result sent to driver > 15/03/31 10:47:22 INFO Executor: Finished task 0.0 in stage 9.0 (TID 41). 570 > bytes result sent to driver > 15/03/31 10:47:22 INFO Executor: Running task 3.0 in stage 9.0 (TID 44) > 15/03/31 10:47:22 INFO TaskSetManager: Finished task 2.0 in stage 9.0 (TID > 43) in 5 ms on localhost (1/4) > 15/03/31 10:47:22 INFO KafkaRDD: Computing topic toto, partition 9 offsets 0 > -> 1 > 15/03/31 10:47:22 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID > 41) in 9 ms on localhost (2/4) > 15/03/31 10:47:22 INFO VerifiableProperties: Verifying properties > 15/03/31 10:47:22 INFO VerifiableProperties: Property group.id is overridden > to > 15/03/31 10:47:22 INFO VerifiableProperties: Property zookeeper.connect is > overridden to > 15/03/31 10:47:22 INFO KafkaRDD: Computing topic toto, partition 8 offsets 0 > -> 1 > 15/03/31 10:47:22 ERROR TaskContextImpl: Error in TaskCompletionListener > java.lang.NullPointerException > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158) > at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) > at > org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:58) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/03/31 10:47:22 INFO VerifiableProperties: Verifying properties > 15/03/31 10:47:22 INFO VerifiableProperties: Property group.id is overridden > to > 15/03/31 10:47:22 INFO VerifiableProperties: Property zookeeper.connect is > overridden to > 15/03/31 10:47:22 ERROR TaskContextImpl: Error in TaskCompletionListener > java.lang.NullPointerException > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158) > at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) > at > org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) > at > org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) > at > org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) > at org.apache.spark.scheduler.Task.run(Task.scala:58) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/03/31 10:47:22 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 42) > org.apache.spark.util.TaskCompletionListenerException > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) > at org.apache.spark.scheduler.Task.run(Task.scala:58) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/03/31 10:47:22 ERROR Executor: Exception in task 3.0 in stage 9.0 (TID 44) > org.apache.spark.util.TaskCompletionListenerException > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) > at org.apache.spark.scheduler.Task.run(Task.scala:58) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > 15/03/31 10:47:22 WARN TaskSetManager: Lost task 3.0 in stage 9.0 (TID 44, > localhost): org.apache.spark.util.TaskCompletionListenerException > at > org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) > at org.apache.spark.scheduler.Task.run(Task.scala:58) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > > 15/03/31 10:47:22 ERROR TaskSetManager: Task 3 in stage 9.0 failed 1 > times; aborting job" > > I don't know if this could help but using the Spark UI. I find some > additional log when the tasks failed : > > org.apache.spark.streaming.dstream.DStream.print(DStream.scala:624) > > which corresponds to > https://github.com/cloudera/spark/blob/cdh5.3.2-release/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala > > For integration, I'm using the Spark version ship with cloudera 5.3.2 > whereas in development Spark 1.3.0. Can this cause some troubles ? > > Thanks for your answer. > Regards, > Nicolas PHUNG > > On Mon, Mar 30, 2015 at 7:05 PM, Ted Yu <yuzhih...@gmail.com> wrote: > >> Nicolas: >> See if there was occurrence of the following exception in the log: >> errs => throw new SparkException( >> s"Couldn't connect to leader for topic ${part.topic} >> ${part.partition}: " + >> errs.mkString("\n")), >> >> Cheers >> >> On Mon, Mar 30, 2015 at 9:40 AM, Cody Koeninger <c...@koeninger.org> >> wrote: >> >>> This line >>> >>> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close( >>> KafkaRDD.scala:158) >>> >>> is the attempt to close the underlying kafka simple consumer. >>> >>> We can add a null pointer check, but the underlying issue of the >>> consumer being null probably indicates a problem earlier. Do you see any >>> previous error messages? >>> >>> Also, can you clarify for the successful and failed cases which topics >>> you are attempting this on, how many partitions there are, and whether >>> there are messages in the partitions? There's an existing jira regarding >>> empty partitions. >>> >>> >>> >>> >>> On Mon, Mar 30, 2015 at 11:05 AM, Nicolas Phung <nicolas.ph...@gmail.com >>> > wrote: >>> >>>> Hello, >>>> >>>> I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach >>>> 2: Direct Approach (No Receivers)" ( >>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html). >>>> I'm using the following code snippets : >>>> >>>> // Create direct kafka stream with brokers and topics >>>> val messages = KafkaUtils.createDirectStream[String, Array[Byte], >>>> StringDecoder, DefaultDecoder]( >>>> ssc, kafkaParams, topicsSet) >>>> >>>> // Get the stuff from Kafka and print them >>>> val raw = messages.map(_._2) >>>> val dStream: DStream[RawScala] = raw.map( >>>> byte => { >>>> // Avro Decoder >>>> println("Byte length: " + byte.length) >>>> val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema) >>>> RawScala.toScala(rawDecoder.fromBytes(byte)) >>>> } >>>> ) >>>> // Useful for debug >>>> dStream.print() >>>> >>>> I launch my Spark Streaming and everything is fine if there's no >>>> incoming logs from Kafka. When I'm sending a log, I got the following error >>>> : >>>> >>>> 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener >>>> java.lang.NullPointerException >>>> at >>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158) >>>> at >>>> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) >>>> at >>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) >>>> at >>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) >>>> at >>>> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49) >>>> at >>>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) >>>> at >>>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) >>>> at >>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>> at >>>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:58) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0 >>>> (TID 94) in 12 ms on localhost (2/4) >>>> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0 >>>> (TID 93) in 13 ms on localhost (3/4) >>>> 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0 >>>> (TID 91) >>>> org.apache.spark.util.TaskCompletionListenerException >>>> at >>>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:58) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID >>>> 91, localhost): org.apache.spark.util.TaskCompletionListenerException >>>> at >>>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:58) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1 >>>> times; aborting job >>>> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose >>>> tasks have all completed, from pool >>>> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28 >>>> 15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at >>>> HotFCANextGen.scala:63, took 0,041068 s >>>> 15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job >>>> 1427729680000 ms.1 from job set of time 1427729680000 ms >>>> 15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job >>>> 1427729680000 ms.0 >>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>>> 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage >>>> 28.0 (TID 91, localhost): >>>> org.apache.spark.util.TaskCompletionListenerException >>>> at >>>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) >>>> at org.apache.spark.scheduler.Task.run(Task.scala:58) >>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>>> at >>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>>> at java.lang.Thread.run(Thread.java:745) >>>> >>>> Driver stacktrace: >>>> at org.apache.spark.scheduler.DAGScheduler.org >>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) >>>> at >>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >>>> at scala.Option.foreach(Option.scala:236) >>>> at >>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) >>>> at >>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) >>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>>> at >>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>>> at >>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>>> >>>> The same code snippet works well if the topic is with a single >>>> partition. The issue happens when the topic is using multiple partitions. >>>> Am I doing something wrong ? Can you help me find the right way to write >>>> this with kafka topic with multiple partitions. >>>> >>>> Regards, >>>> Nicolas PHUNG >>>> >>> >>> >> >