Can you show us the output of DStream#print() if you have it ?

Thanks

On Tue, Mar 31, 2015 at 2:55 AM, Nicolas Phung <nicolas.ph...@gmail.com>
wrote:

> Hello,
>
> @Akhil Das I'm trying to use the experimental API
> https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala
> <https://www.google.com/url?q=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fexamples%2Fscala-2.10%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FDirectKafkaWordCount.scala&sa=D&sntz=1&usg=AFQjCNFOmScaSfP-2J4Zn56k86-jHUkYaw>.
> I'm reusing the same code snippet to initialize my topicSet.
>
> @Cody Koeninger I don't see any previous error messages (see the full log
> at the end). To create the topic, I'm doing the following :
>
> "kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 
> --partitions 10 --topic toto"
>
> "kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 
> --partitions 1 --topic toto-single"
>
> I'm launching my Spark Streaming in local mode.
>
> @Ted Yu There's no log "Couldn't connect to leader for topic", here's the
> full version :
>
> "spark-submit --conf config.resource=application-integration.conf --class
> nextgen.Main assembly-0.1-SNAPSHOT.jar
>
> 15/03/31 10:47:12 INFO SecurityManager: Changing view acls to: nphung
> 15/03/31 10:47:12 INFO SecurityManager: Changing modify acls to: nphung
> 15/03/31 10:47:12 INFO SecurityManager: SecurityManager: authentication 
> disabled; ui acls disabled; users with view permissions: Set(nphung); users 
> with modify permissions: Set(nphung)
> 15/03/31 10:47:13 INFO Slf4jLogger: Slf4jLogger started
> 15/03/31 10:47:13 INFO Remoting: Starting remoting
> 15/03/31 10:47:13 INFO Remoting: Remoting started; listening on addresses 
> :[akka.tcp://sparkDriver@int.local:44180]
> 15/03/31 10:47:13 INFO Remoting: Remoting now listens on addresses: 
> [akka.tcp://sparkDriver@int.local:44180]
> 15/03/31 10:47:13 INFO Utils: Successfully started service 'sparkDriver' on 
> port 44180.
> 15/03/31 10:47:13 INFO SparkEnv: Registering MapOutputTracker
> 15/03/31 10:47:13 INFO SparkEnv: Registering BlockManagerMaster
> 15/03/31 10:47:13 INFO DiskBlockManager: Created local directory at 
> /tmp/spark-local-20150331104713-2238
> 15/03/31 10:47:13 INFO MemoryStore: MemoryStore started with capacity 265.1 MB
> 15/03/31 10:47:15 INFO HttpFileServer: HTTP File server directory is 
> /tmp/spark-2c8e34a0-bec3-4f1e-9fe7-83e08efc4f53
> 15/03/31 10:47:15 INFO HttpServer: Starting HTTP Server
> 15/03/31 10:47:15 INFO Utils: Successfully started service 'HTTP file server' 
> on port 50204.
> 15/03/31 10:47:15 INFO Utils: Successfully started service 'SparkUI' on port 
> 4040.
> 15/03/31 10:47:15 INFO SparkUI: Started SparkUI at http://int.local:4040
> 15/03/31 10:47:16 INFO SparkContext: Added JAR 
> file:/home/nphung/assembly-0.1-SNAPSHOT.jar at 
> http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with timestamp 
> 1427791636151
> 15/03/31 10:47:16 INFO AkkaUtils: Connecting to HeartbeatReceiver: 
> akka.tcp://sparkDriver@int.local:44180/user/HeartbeatReceiver
> 15/03/31 10:47:16 INFO NettyBlockTransferService: Server created on 40630
> 15/03/31 10:47:16 INFO BlockManagerMaster: Trying to register BlockManager
> 15/03/31 10:47:16 INFO BlockManagerMasterActor: Registering block manager 
> localhost:40630 with 265.1 MB RAM, BlockManagerId(<driver>, localhost, 40630)
> 15/03/31 10:47:16 INFO BlockManagerMaster: Registered BlockManager
> 15/03/31 10:47:17 INFO EventLoggingListener: Logging events to 
> hdfs://int.local:8020/user/spark/applicationHistory/local-1427791636195
> 15/03/31 10:47:17 INFO VerifiableProperties: Verifying properties
> 15/03/31 10:47:17 INFO VerifiableProperties: Property group.id is overridden 
> to
> 15/03/31 10:47:17 INFO VerifiableProperties: Property zookeeper.connect is 
> overridden to
> 15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level = 
> StorageLevel(false, false, false, false, 1)
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and validated 
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44
> 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, 
> false, false, false, 1)
> 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.MappedDStream@7fd8c559
> 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, 
> false, false, false, 1)
> 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.MappedDStream@44c13103
> 15/03/31 10:47:17 INFO ForEachDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO ForEachDStream: Storage level = StorageLevel(false, 
> false, false, false, 1)
> 15/03/31 10:47:17 INFO ForEachDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO ForEachDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 INFO ForEachDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.ForEachDStream@8f2098e
> 15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level = 
> StorageLevel(false, false, false, false, 1)
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and validated 
> org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44
> 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, 
> false, false, false, 1)
> 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.MappedDStream@7fd8c559
> 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, 
> false, false, false, 1)
> 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.MappedDStream@44c13103
> 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, 
> false, false, false, 1)
> 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.MappedDStream@6c6366cf
> 15/03/31 10:47:17 INFO ForEachDStream: Slide time = 2000 ms
> 15/03/31 10:47:17 INFO ForEachDStream: Storage level = StorageLevel(false, 
> false, false, false, 1)
> 15/03/31 10:47:17 INFO ForEachDStream: Checkpoint interval = null
> 15/03/31 10:47:17 INFO ForEachDStream: Remember duration = 2000 ms
> 15/03/31 10:47:17 INFO ForEachDStream: Initialized and validated 
> org.apache.spark.streaming.dstream.ForEachDStream@55a88417
> 15/03/31 10:47:17 INFO RecurringTimer: Started timer for JobGenerator at time 
> 1427791638000
> 15/03/31 10:47:17 INFO JobGenerator: Started JobGenerator at 1427791638000 ms
> 15/03/31 10:47:17 INFO JobScheduler: Started JobScheduler
> 15/03/31 10:47:18 INFO VerifiableProperties: Verifying properties
> 15/03/31 10:47:18 INFO VerifiableProperties: Property group.id is overridden 
> to
> 15/03/31 10:47:18 INFO VerifiableProperties: Property zookeeper.connect is 
> overridden to
> 15/03/31 10:47:18 INFO JobScheduler: Added jobs for time 1427791638000 ms
> 15/03/31 10:47:18 INFO JobScheduler: Starting job streaming job 1427791638000 
> ms.0 from job set of time 1427791638000 ms
> 15/03/31 10:47:18 INFO SparkContext: Starting job: print at Main.scala:59
> 15/03/31 10:47:18 INFO DAGScheduler: Got job 0 (print at Main.scala:59) with 
> 1 output partitions (allowLocal=true)
> 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 0(print at 
> Main.scala:59)
> 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List()
> 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List()
> 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at map 
> at Main.scala:50), which has no missing parents
> 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(3568) called with 
> curMem=0, maxMem=278019440
> 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_0 stored as values in 
> memory (estimated size 3.5 KB, free 265.1 MB)
> 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(1946) called with 
> curMem=3568, maxMem=278019440
> 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes 
> in memory (estimated size 1946.0 B, free 265.1 MB)
> 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory 
> on localhost:40630 (size: 1946.0 B, free: 265.1 MB)
> 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block 
> broadcast_0_piece0
> 15/03/31 10:47:18 INFO SparkContext: Created broadcast 0 from broadcast at 
> DAGScheduler.scala:838
> 15/03/31 10:47:18 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 
> (MappedRDD[2] at map at Main.scala:50)
> 15/03/31 10:47:18 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
> 15/03/31 10:47:18 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, 
> localhost, ANY, 1266 bytes)
> 15/03/31 10:47:18 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
> 15/03/31 10:47:18 INFO Executor: Fetching 
> http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with timestamp 
> 1427791636151
> 15/03/31 10:47:18 INFO Utils: Fetching 
> http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar to 
> /tmp/fetchFileTemp4524847716079927807.tmp
> 15/03/31 10:47:18 INFO Executor: Adding 
> file:/tmp/spark-d295d121-f359-4c7d-8725-e1f958ebff5a/assembly-0.1-SNAPSHOT.jar
>  to class loader
> 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 6
> 15/03/31 10:47:18 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 570 
> bytes result sent to driver
> 15/03/31 10:47:18 INFO DAGScheduler: Stage 0 (print at Main.scala:59) 
> finished in 0,371 s
> 15/03/31 10:47:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) 
> in 350 ms on localhost (1/1)
> 15/03/31 10:47:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks 
> have all completed, from pool
> 15/03/31 10:47:18 INFO DAGScheduler: Job 0 finished: print at Main.scala:59, 
> took 0,717929 s
> 15/03/31 10:47:18 INFO SparkContext: Starting job: print at Main.scala:59
> 15/03/31 10:47:18 INFO DAGScheduler: Got job 1 (print at Main.scala:59) with 
> 4 output partitions (allowLocal=true)
> 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 1(print at 
> Main.scala:59)
> 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List()
> 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List()
> 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[2] at map 
> at Main.scala:50), which has no missing parents
> 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(3568) called with 
> curMem=5514, maxMem=278019440
> 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_1 stored as values in 
> memory (estimated size 3.5 KB, free 265.1 MB)
> 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(1946) called with 
> curMem=9082, maxMem=278019440
> 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes 
> in memory (estimated size 1946.0 B, free 265.1 MB)
> 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory 
> on localhost:40630 (size: 1946.0 B, free: 265.1 MB)
> 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block 
> broadcast_1_piece0
> 15/03/31 10:47:18 INFO SparkContext: Created broadcast 1 from broadcast at 
> DAGScheduler.scala:838
> 15/03/31 10:47:18 INFO DAGScheduler: Submitting 4 missing tasks from Stage 1 
> (MappedRDD[2] at map at Main.scala:50)
> 15/03/31 10:47:18 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks
> 15/03/31 10:47:18 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, 
> localhost, ANY, 1266 bytes)
> 15/03/31 10:47:18 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, 
> localhost, ANY, 1266 bytes)
> 15/03/31 10:47:18 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3, 
> localhost, ANY, 1266 bytes)
> 15/03/31 10:47:18 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 4, 
> localhost, ANY, 1266 bytes)
> 15/03/31 10:47:18 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
> 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 3
> 15/03/31 10:47:18 INFO Executor: Running task 1.0 in stage 1.0 (TID 2)
> 15/03/31 10:47:18 INFO Executor: Running task 2.0 in stage 1.0 (TID 3)
> 15/03/31 10:47:18 INFO Executor: Running task 3.0 in stage 1.0 (TID 4)
> 15/03/31 10:47:18 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 570 
> bytes result sent to driver
> 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 9
> 15/03/31 10:47:18 INFO Executor: Finished task 1.0 in stage 1.0 (TID 2). 570 
> bytes result sent to driver
> 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 8
> 15/03/31 10:47:18 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) 
> in 21 ms on localhost (1/4)
> 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 0
> 15/03/31 10:47:18 INFO Executor: Finished task 3.0 in stage 1.0 (TID 4). 570 
> bytes result sent to driver
> 15/03/31 10:47:18 INFO Executor: Finished task 2.0 in stage 1.0 (TID 3). 570 
> bytes result sent to driver
> 15/03/31 10:47:18 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) 
> in 31 ms on localhost (2/4)
> 15/03/31 10:47:18 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 4) 
> in 39 ms on localhost (3/4)
> 15/03/31 10:47:18 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 3) 
> in 43 ms on localhost (4/4)
> 15/03/31 10:47:18 INFO DAGScheduler: Stage 1 (print at Main.scala:59) 
> finished in 0,048 s
> 15/03/31 10:47:18 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks 
> have all completed, from pool
> 15/03/31 10:47:18 INFO DAGScheduler: Job 1 finished: print at Main.scala:59, 
> took 0,073960 s
> 15/03/31 10:47:18 INFO SparkContext: Starting job: print at Main.scala:59
> 15/03/31 10:47:18 INFO DAGScheduler: Got job 2 (print at Main.scala:59) with 
> 5 output partitions (allowLocal=true)
> 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 2(print at 
> Main.scala:59)
> 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List()
> 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List()
> 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 2 (MappedRDD[2] at map 
> at Main.scala:50), which has no missing parents
> 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(3568) called with 
> curMem=11028, maxMem=278019440
> 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_2 stored as values in 
> memory (estimated size 3.5 KB, free 265.1 MB)
> 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(1946) called with 
> curMem=14596, maxMem=278019440
> 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes 
> in memory (estimated size 1946.0 B, free 265.1 MB)
> 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory 
> on localhost:40630 (size: 1946.0 B, free: 265.1 MB)
> 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block 
> broadcast_2_piece0
> 15/03/31 10:47:18 INFO SparkContext: Created broadcast 2 from broadcast at 
> DAGScheduler.scala:838
> 15/03/31 10:47:18 INFO DAGScheduler: Submitting 5 missing tasks from Stage 2 
> (MappedRDD[2] at map at Main.scala:50)
> 15/03/31 10:47:18 INFO TaskSchedulerImpl: Adding task set 2.0 with 5 tasks
> 15/03/31 10:47:18 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 5, 
> localhost, ANY, 1266 bytes)
> 15/03/31 10:47:18 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 6, 
> localhost, ANY, 1266 bytes)
> 15/03/31 10:47:18 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID 7, 
> localhost, ANY, 1266 bytes)
> 15/03/31 10:47:18 INFO TaskSetManager: Starting task 3.0 in stage 2.0 (TID 8, 
> localhost, ANY, 1266 bytes)
> 15/03/31 10:47:18 INFO Executor: Running task 0.0 in stage 2.0 (TID 5)
> 15/03/31 10:47:18 INFO Executor: Running task 2.0 in stage 2.0 (TID 7)
> 15/03/31 10:47:18 INFO Executor: Running task 1.0 in stage 2.0 (TID 6)
> 15/03/31 10:47:18 INFO Executor: Running task 3.0 in stage 2.0 (TID 8)
> 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 2
> 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 5
> 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 7
> 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 1
> 15/03/31 10:47:18 INFO Executor: Finished task 0.0 in stage 2.0 (TID 5). 570 
> bytes result sent to driver
> 15/03/31 10:47:18 INFO Executor: Finished task 1.0 in stage 2.0 (TID 6). 570 
> bytes result sent to driver
> 15/03/31 10:47:18 INFO TaskSetManager: Starting task 4.0 in stage 2.0 (TID 9, 
> localhost, ANY, 1266 bytes)
> 15/03/31 10:47:18 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 5) 
> in 12 ms on localhost (1/5)
> 15/03/31 10:47:18 INFO Executor: Running task 4.0 in stage 2.0 (TID 9)
> 15/03/31 10:47:18 INFO Executor: Finished task 3.0 in stage 2.0 (TID 8). 570 
> bytes result sent to driver
> 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 4
> 15/03/31 10:47:18 INFO Executor: Finished task 2.0 in stage 2.0 (TID 7). 570 
> bytes result sent to driver
> 15/03/31 10:47:18 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 6) 
> in 18 ms on localhost (2/5)
> 15/03/31 10:47:18 INFO Executor: Finished task 4.0 in stage 2.0 (TID 9). 570 
> bytes result sent to driver
> 15/03/31 10:47:18 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID 8) 
> in 23 ms on localhost (3/5)
> 15/03/31 10:47:18 INFO TaskSetManager: Finished task 2.0 in stage 2.0 (TID 7) 
> in 27 ms on localhost (4/5)
> 15/03/31 10:47:18 INFO TaskSetManager: Finished task 4.0 in stage 2.0 (TID 9) 
> in 21 ms on localhost (5/5)
> 15/03/31 10:47:18 INFO DAGScheduler: Stage 2 (print at Main.scala:59) 
> finished in 0,033 s
> 15/03/31 10:47:18 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks 
> have all completed, from pool
> -------------------------------------------
> Time: 1427791638000 ms
> -------------------------------------------
> 15/03/31 10:47:18 INFO DAGScheduler: Job 2 finished: print at Main.scala:59, 
> took 0,057381 s
>
> 15/03/31 10:47:18 INFO JobScheduler: Finished job streaming job 1427791638000 
> ms.0 from job set of time 1427791638000 ms
> 15/03/31 10:47:18 INFO JobScheduler: Starting job streaming job 1427791638000 
> ms.1 from job set of time 1427791638000 ms
> 15/03/31 10:47:18 INFO SparkContext: Starting job: foreachRDD at Main.scala:70
> 15/03/31 10:47:18 INFO DAGScheduler: Got job 3 (foreachRDD at Main.scala:70) 
> with 10 output partitions (allowLocal=false)
> 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 3(foreachRDD at 
> Main.scala:70)
> 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List()
> 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List()
> 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 3 (MappedRDD[3] at map 
> at Main.scala:62), which has no missing parents
> 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(5064) called with 
> curMem=16542, maxMem=278019440
> 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_3 stored as values in 
> memory (estimated size 4.9 KB, free 265.1 MB)
> 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(2791) called with 
> curMem=21606, maxMem=278019440
> 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes 
> in memory (estimated size 2.7 KB, free 265.1 MB)
> 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory 
> on localhost:40630 (size: 2.7 KB, free: 265.1 MB)
> 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block 
> broadcast_3_piece0
> 15/03/31 10:47:19 INFO SparkContext: Created broadcast 3 from broadcast at 
> DAGScheduler.scala:838
> 15/03/31 10:47:19 INFO DAGScheduler: Submitting 10 missing tasks from Stage 3 
> (MappedRDD[3] at map at Main.scala:62)
> 15/03/31 10:47:19 INFO TaskSchedulerImpl: Adding task set 3.0 with 10 tasks
> 15/03/31 10:47:19 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 
> 10, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:19 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 
> 11, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:19 INFO TaskSetManager: Starting task 2.0 in stage 3.0 (TID 
> 12, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:19 INFO TaskSetManager: Starting task 3.0 in stage 3.0 (TID 
> 13, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:19 INFO Executor: Running task 0.0 in stage 3.0 (TID 10)
> 15/03/31 10:47:19 INFO Executor: Running task 2.0 in stage 3.0 (TID 12)
> 15/03/31 10:47:19 INFO Executor: Running task 1.0 in stage 3.0 (TID 11)
> 15/03/31 10:47:19 INFO Executor: Running task 3.0 in stage 3.0 (TID 13)
> 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 6
> 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 3
> 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 9
> 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 0
> 15/03/31 10:47:19 INFO BlockManager: Removing broadcast 2
> 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_2_piece0
> 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_2_piece0 of size 1946 
> dropped from memory (free 277996989)
> 15/03/31 10:47:19 INFO BlockManagerInfo: Removed broadcast_2_piece0 on 
> localhost:40630 in memory (size: 1946.0 B, free: 265.1 MB)
> 15/03/31 10:47:19 INFO BlockManagerMaster: Updated info of block 
> broadcast_2_piece0
> 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_2
> 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_2 of size 3568 dropped 
> from memory (free 278000557)
> 15/03/31 10:47:19 INFO ContextCleaner: Cleaned broadcast 2
> 15/03/31 10:47:19 INFO BlockManager: Removing broadcast 1
> 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_1
> 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_1 of size 3568 dropped 
> from memory (free 278004125)
> 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_1_piece0
> 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_1_piece0 of size 1946 
> dropped from memory (free 278006071)
> 15/03/31 10:47:19 INFO BlockManagerInfo: Removed broadcast_1_piece0 on 
> localhost:40630 in memory (size: 1946.0 B, free: 265.1 MB)
> 15/03/31 10:47:19 INFO BlockManagerMaster: Updated info of block 
> broadcast_1_piece0
> 15/03/31 10:47:19 INFO ContextCleaner: Cleaned broadcast 1
> 15/03/31 10:47:19 INFO BlockManager: Removing broadcast 0
> 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_0
> 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_0 of size 3568 dropped 
> from memory (free 278009639)
> 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_0_piece0
> 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_0_piece0 of size 1946 
> dropped from memory (free 278011585)
> 15/03/31 10:47:19 INFO BlockManagerInfo: Removed broadcast_0_piece0 on 
> localhost:40630 in memory (size: 1946.0 B, free: 265.1 MB)
> 15/03/31 10:47:19 INFO BlockManagerMaster: Updated info of block 
> broadcast_0_piece0
> 15/03/31 10:47:19 INFO ContextCleaner: Cleaned broadcast 0
> 15/03/31 10:47:19 INFO Executor: Finished task 3.0 in stage 3.0 (TID 13). 569 
> bytes result sent to driver
> 15/03/31 10:47:19 INFO Executor: Finished task 0.0 in stage 3.0 (TID 10). 569 
> bytes result sent to driver
> 15/03/31 10:47:19 INFO TaskSetManager: Starting task 4.0 in stage 3.0 (TID 
> 14, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:19 INFO Executor: Running task 4.0 in stage 3.0 (TID 14)
> 15/03/31 10:47:19 INFO TaskSetManager: Finished task 3.0 in stage 3.0 (TID 
> 13) in 303 ms on localhost (1/10)
> 15/03/31 10:47:19 INFO Executor: Finished task 1.0 in stage 3.0 (TID 11). 569 
> bytes result sent to driver
> 15/03/31 10:47:19 INFO Executor: Finished task 2.0 in stage 3.0 (TID 12). 569 
> bytes result sent to driver
> 15/03/31 10:47:19 INFO TaskSetManager: Starting task 5.0 in stage 3.0 (TID 
> 15, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 8
> 15/03/31 10:47:19 INFO Executor: Running task 5.0 in stage 3.0 (TID 15)
> 15/03/31 10:47:19 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 
> 10) in 312 ms on localhost (2/10)
> 15/03/31 10:47:19 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 
> 11) in 314 ms on localhost (3/10)
> 15/03/31 10:47:19 INFO TaskSetManager: Starting task 6.0 in stage 3.0 (TID 
> 16, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:19 INFO TaskSetManager: Starting task 7.0 in stage 3.0 (TID 
> 17, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:19 INFO Executor: Running task 6.0 in stage 3.0 (TID 16)
> 15/03/31 10:47:19 INFO Executor: Running task 7.0 in stage 3.0 (TID 17)
> 15/03/31 10:47:19 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID 
> 12) in 317 ms on localhost (4/10)
> 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 5
> 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 1
> 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 2
> 15/03/31 10:47:19 INFO Executor: Finished task 5.0 in stage 3.0 (TID 15). 569 
> bytes result sent to driver
> 15/03/31 10:47:19 INFO TaskSetManager: Starting task 8.0 in stage 3.0 (TID 
> 18, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:19 INFO Executor: Finished task 7.0 in stage 3.0 (TID 17). 569 
> bytes result sent to driver
> 15/03/31 10:47:19 INFO TaskSetManager: Starting task 9.0 in stage 3.0 (TID 
> 19, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:19 INFO Executor: Finished task 4.0 in stage 3.0 (TID 14). 569 
> bytes result sent to driver
> 15/03/31 10:47:19 INFO Executor: Running task 9.0 in stage 3.0 (TID 19)
> 15/03/31 10:47:19 INFO TaskSetManager: Finished task 7.0 in stage 3.0 (TID 
> 17) in 40 ms on localhost (5/10)
> 15/03/31 10:47:19 INFO Executor: Finished task 6.0 in stage 3.0 (TID 16). 569 
> bytes result sent to driver
> 15/03/31 10:47:19 INFO Executor: Running task 8.0 in stage 3.0 (TID 18)
> 15/03/31 10:47:19 INFO TaskSetManager: Finished task 5.0 in stage 3.0 (TID 
> 15) in 53 ms on localhost (6/10)
> 15/03/31 10:47:19 INFO TaskSetManager: Finished task 4.0 in stage 3.0 (TID 
> 14) in 59 ms on localhost (7/10)
> 15/03/31 10:47:19 INFO TaskSetManager: Finished task 6.0 in stage 3.0 (TID 
> 16) in 50 ms on localhost (8/10)
> 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 4
> 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 7
> 15/03/31 10:47:19 INFO Executor: Finished task 8.0 in stage 3.0 (TID 18). 569 
> bytes result sent to driver
> 15/03/31 10:47:19 INFO Executor: Finished task 9.0 in stage 3.0 (TID 19). 569 
> bytes result sent to driver
> 15/03/31 10:47:19 INFO TaskSetManager: Finished task 8.0 in stage 3.0 (TID 
> 18) in 49 ms on localhost (9/10)
> 15/03/31 10:47:19 INFO TaskSetManager: Finished task 9.0 in stage 3.0 (TID 
> 19) in 50 ms on localhost (10/10)
> 15/03/31 10:47:19 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks 
> have all completed, from pool
> 15/03/31 10:47:19 INFO DAGScheduler: Stage 3 (foreachRDD at Main.scala:70) 
> finished in 0,410 s
> 15/03/31 10:47:19 INFO DAGScheduler: Job 3 finished: foreachRDD at 
> Main.scala:70, took 0,438664 s
> 15/03/31 10:47:19 INFO JobScheduler: Finished job streaming job 1427791638000 
> ms.1 from job set of time 1427791638000 ms
> 15/03/31 10:47:19 INFO JobScheduler: Total delay: 1,420 s for time 
> 1427791638000 ms (execution: 1,372 s)
> 15/03/31 10:47:19 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
> 15/03/31 10:47:20 INFO JobScheduler: Starting job streaming job 1427791640000 
> ms.0 from job set of time 1427791640000 ms
> 15/03/31 10:47:20 INFO JobScheduler: Added jobs for time 1427791640000 ms
> 15/03/31 10:47:20 INFO SparkContext: Starting job: print at Main.scala:59
> 15/03/31 10:47:20 INFO DAGScheduler: Got job 4 (print at Main.scala:59) with 
> 1 output partitions (allowLocal=true)
> 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 4(print at 
> Main.scala:59)
> 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List()
> 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List()
> 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 4 (MappedRDD[6] at map 
> at Main.scala:50), which has no missing parents
> 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(3568) called with 
> curMem=7855, maxMem=278019440
> 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_4 stored as values in 
> memory (estimated size 3.5 KB, free 265.1 MB)
> 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(1945) called with 
> curMem=11423, maxMem=278019440
> 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes 
> in memory (estimated size 1945.0 B, free 265.1 MB)
> 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory 
> on localhost:40630 (size: 1945.0 B, free: 265.1 MB)
> 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block 
> broadcast_4_piece0
> 15/03/31 10:47:20 INFO SparkContext: Created broadcast 4 from getCallSite at 
> DStream.scala:294
> 15/03/31 10:47:20 INFO DAGScheduler: Submitting 1 missing tasks from Stage 4 
> (MappedRDD[6] at map at Main.scala:50)
> 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 
> 20, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 4.0 (TID 20)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 6
> 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 4.0 (TID 20). 570 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 
> 20) in 6 ms on localhost (1/1)
> 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks 
> have all completed, from pool
> 15/03/31 10:47:20 INFO DAGScheduler: Stage 4 (print at Main.scala:59) 
> finished in 0,007 s
> 15/03/31 10:47:20 INFO DAGScheduler: Job 4 finished: print at Main.scala:59, 
> took 0,021293 s
> 15/03/31 10:47:20 INFO SparkContext: Starting job: print at Main.scala:59
> 15/03/31 10:47:20 INFO DAGScheduler: Got job 5 (print at Main.scala:59) with 
> 4 output partitions (allowLocal=true)
> 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 5(print at 
> Main.scala:59)
> 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List()
> 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List()
> 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 5 (MappedRDD[6] at map 
> at Main.scala:50), which has no missing parents
> 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(3568) called with 
> curMem=13368, maxMem=278019440
> 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_5 stored as values in 
> memory (estimated size 3.5 KB, free 265.1 MB)
> 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(1945) called with 
> curMem=16936, maxMem=278019440
> 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes 
> in memory (estimated size 1945.0 B, free 265.1 MB)
> 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory 
> on localhost:40630 (size: 1945.0 B, free: 265.1 MB)
> 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block 
> broadcast_5_piece0
> 15/03/31 10:47:20 INFO SparkContext: Created broadcast 5 from broadcast at 
> DAGScheduler.scala:838
> 15/03/31 10:47:20 INFO DAGScheduler: Submitting 4 missing tasks from Stage 5 
> (MappedRDD[6] at map at Main.scala:50)
> 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 5.0 with 4 tasks
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 
> 21, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 1.0 in stage 5.0 (TID 
> 22, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 2.0 in stage 5.0 (TID 
> 23, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 3.0 in stage 5.0 (TID 
> 24, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 5.0 (TID 21)
> 15/03/31 10:47:20 INFO Executor: Running task 2.0 in stage 5.0 (TID 23)
> 15/03/31 10:47:20 INFO Executor: Running task 3.0 in stage 5.0 (TID 24)
> 15/03/31 10:47:20 INFO Executor: Running task 1.0 in stage 5.0 (TID 22)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 0
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 8
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 9
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 3
> 15/03/31 10:47:20 INFO Executor: Finished task 2.0 in stage 5.0 (TID 23). 570 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO Executor: Finished task 3.0 in stage 5.0 (TID 24). 570 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 5.0 (TID 21). 570 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 2.0 in stage 5.0 (TID 
> 23) in 7 ms on localhost (1/4)
> 15/03/31 10:47:20 INFO Executor: Finished task 1.0 in stage 5.0 (TID 22). 570 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 3.0 in stage 5.0 (TID 
> 24) in 10 ms on localhost (2/4)
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 
> 21) in 14 ms on localhost (3/4)
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 1.0 in stage 5.0 (TID 
> 22) in 16 ms on localhost (4/4)
> 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks 
> have all completed, from pool
> 15/03/31 10:47:20 INFO DAGScheduler: Stage 5 (print at Main.scala:59) 
> finished in 0,020 s
> 15/03/31 10:47:20 INFO DAGScheduler: Job 5 finished: print at Main.scala:59, 
> took 0,034514 s
> 15/03/31 10:47:20 INFO SparkContext: Starting job: print at Main.scala:59
> 15/03/31 10:47:20 INFO DAGScheduler: Got job 6 (print at Main.scala:59) with 
> 5 output partitions (allowLocal=true)
> 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 6(print at 
> Main.scala:59)
> 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List()
> 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List()
> 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 6 (MappedRDD[6] at map 
> at Main.scala:50), which has no missing parents
> 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(3568) called with 
> curMem=18881, maxMem=278019440
> 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_6 stored as values in 
> memory (estimated size 3.5 KB, free 265.1 MB)
> 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(1945) called with 
> curMem=22449, maxMem=278019440
> 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes 
> in memory (estimated size 1945.0 B, free 265.1 MB)
> 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory 
> on localhost:40630 (size: 1945.0 B, free: 265.1 MB)
> 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block 
> broadcast_6_piece0
> 15/03/31 10:47:20 INFO SparkContext: Created broadcast 6 from broadcast at 
> DAGScheduler.scala:838
> 15/03/31 10:47:20 INFO DAGScheduler: Submitting 5 missing tasks from Stage 6 
> (MappedRDD[6] at map at Main.scala:50)
> 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 6.0 with 5 tasks
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 
> 25, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID 
> 26, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 
> 27, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 3.0 in stage 6.0 (TID 
> 28, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 6.0 (TID 25)
> 15/03/31 10:47:20 INFO Executor: Running task 2.0 in stage 6.0 (TID 27)
> 15/03/31 10:47:20 INFO Executor: Running task 3.0 in stage 6.0 (TID 28)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 2
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 1
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 7
> 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 6.0 (TID 25). 570 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO Executor: Running task 1.0 in stage 6.0 (TID 26)
> 15/03/31 10:47:20 INFO Executor: Finished task 3.0 in stage 6.0 (TID 28). 570 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 4.0 in stage 6.0 (TID 
> 29, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 5
> 15/03/31 10:47:20 INFO Executor: Finished task 2.0 in stage 6.0 (TID 27). 570 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 
> 25) in 8 ms on localhost (1/5)
> 15/03/31 10:47:20 INFO Executor: Finished task 1.0 in stage 6.0 (TID 26). 570 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO Executor: Running task 4.0 in stage 6.0 (TID 29)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 4
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 3.0 in stage 6.0 (TID 
> 28) in 10 ms on localhost (2/5)
> 15/03/31 10:47:20 INFO Executor: Finished task 4.0 in stage 6.0 (TID 29). 570 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 2.0 in stage 6.0 (TID 
> 27) in 13 ms on localhost (3/5)
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 1.0 in stage 6.0 (TID 
> 26) in 15 ms on localhost (4/5)
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 4.0 in stage 6.0 (TID 
> 29) in 12 ms on localhost (5/5)
> 15/03/31 10:47:20 INFO DAGScheduler: Stage 6 (print at Main.scala:59) 
> finished in 0,019 s
> 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks 
> have all completed, from pool
> -------------------------------------------
> Time: 1427791640000 ms
> -------------------------------------------
>
> 15/03/31 10:47:20 INFO DAGScheduler: Job 6 finished: print at Main.scala:59, 
> took 0,034922 s
> 15/03/31 10:47:20 INFO JobScheduler: Finished job streaming job 1427791640000 
> ms.0 from job set of time 1427791640000 ms
> 15/03/31 10:47:20 INFO SparkContext: Starting job: foreachRDD at Main.scala:70
> 15/03/31 10:47:20 INFO DAGScheduler: Got job 7 (foreachRDD at Main.scala:70) 
> with 10 output partitions (allowLocal=false)
> 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 7(foreachRDD at 
> Main.scala:70)
> 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List()
> 15/03/31 10:47:20 INFO JobScheduler: Starting job streaming job 1427791640000 
> ms.1 from job set of time 1427791640000 ms
> 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List()
> 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 7 (MappedRDD[7] at map 
> at Main.scala:62), which has no missing parents
> 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(5064) called with 
> curMem=24394, maxMem=278019440
> 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_7 stored as values in 
> memory (estimated size 4.9 KB, free 265.1 MB)
> 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(2792) called with 
> curMem=29458, maxMem=278019440
> 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes 
> in memory (estimated size 2.7 KB, free 265.1 MB)
> 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory 
> on localhost:40630 (size: 2.7 KB, free: 265.1 MB)
> 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block 
> broadcast_7_piece0
> 15/03/31 10:47:20 INFO SparkContext: Created broadcast 7 from getCallSite at 
> DStream.scala:294
> 15/03/31 10:47:20 INFO DAGScheduler: Submitting 10 missing tasks from Stage 7 
> (MappedRDD[7] at map at Main.scala:62)
> 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 7.0 with 10 tasks
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 7.0 (TID 
> 30, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 1.0 in stage 7.0 (TID 
> 31, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 2.0 in stage 7.0 (TID 
> 32, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 3.0 in stage 7.0 (TID 
> 33, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 7.0 (TID 30)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 6
> 15/03/31 10:47:20 INFO Executor: Running task 1.0 in stage 7.0 (TID 31)
> 15/03/31 10:47:20 INFO Executor: Running task 2.0 in stage 7.0 (TID 32)
> 15/03/31 10:47:20 INFO Executor: Running task 3.0 in stage 7.0 (TID 33)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 9
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 0
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 3
> 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 7.0 (TID 30). 569 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO Executor: Finished task 2.0 in stage 7.0 (TID 32). 569 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO Executor: Finished task 1.0 in stage 7.0 (TID 31). 569 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 4.0 in stage 7.0 (TID 
> 34, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO Executor: Running task 4.0 in stage 7.0 (TID 34)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 5.0 in stage 7.0 (TID 
> 35, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 6.0 in stage 7.0 (TID 
> 36, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 8
> 15/03/31 10:47:20 INFO Executor: Running task 6.0 in stage 7.0 (TID 36)
> 15/03/31 10:47:20 INFO Executor: Finished task 3.0 in stage 7.0 (TID 33). 569 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO Executor: Running task 5.0 in stage 7.0 (TID 35)
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID 
> 30) in 34 ms on localhost (1/10)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 5
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 1.0 in stage 7.0 (TID 
> 31) in 42 ms on localhost (2/10)
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 7.0 in stage 7.0 (TID 
> 37, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 2
> 15/03/31 10:47:20 INFO Executor: Running task 7.0 in stage 7.0 (TID 37)
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 2.0 in stage 7.0 (TID 
> 32) in 45 ms on localhost (3/10)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 1
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 3.0 in stage 7.0 (TID 
> 33) in 46 ms on localhost (4/10)
> 15/03/31 10:47:20 INFO Executor: Finished task 4.0 in stage 7.0 (TID 34). 569 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 8.0 in stage 7.0 (TID 
> 38, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO Executor: Running task 8.0 in stage 7.0 (TID 38)
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 4.0 in stage 7.0 (TID 
> 34) in 31 ms on localhost (5/10)
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 7
> 15/03/31 10:47:20 INFO Executor: Finished task 6.0 in stage 7.0 (TID 36). 569 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Starting task 9.0 in stage 7.0 (TID 
> 39, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 6.0 in stage 7.0 (TID 
> 36) in 35 ms on localhost (6/10)
> 15/03/31 10:47:20 INFO Executor: Running task 9.0 in stage 7.0 (TID 39)
> 15/03/31 10:47:20 INFO Executor: Finished task 5.0 in stage 7.0 (TID 35). 569 
> bytes result sent to driver
> 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 4
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 5.0 in stage 7.0 (TID 
> 35) in 39 ms on localhost (7/10)
> 15/03/31 10:47:20 INFO Executor: Finished task 7.0 in stage 7.0 (TID 37). 569 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 7.0 in stage 7.0 (TID 
> 37) in 36 ms on localhost (8/10)
> 15/03/31 10:47:20 INFO Executor: Finished task 8.0 in stage 7.0 (TID 38). 569 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 8.0 in stage 7.0 (TID 
> 38) in 26 ms on localhost (9/10)
> 15/03/31 10:47:20 INFO Executor: Finished task 9.0 in stage 7.0 (TID 39). 569 
> bytes result sent to driver
> 15/03/31 10:47:20 INFO TaskSetManager: Finished task 9.0 in stage 7.0 (TID 
> 39) in 27 ms on localhost (10/10)
> 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 7.0, whose tasks 
> have all completed, from pool
> 15/03/31 10:47:20 INFO DAGScheduler: Stage 7 (foreachRDD at Main.scala:70) 
> finished in 0,098 s
> 15/03/31 10:47:20 INFO DAGScheduler: Job 7 finished: foreachRDD at 
> Main.scala:70, took 0,111899 s
> 15/03/31 10:47:20 INFO JobScheduler: Finished job streaming job 1427791640000 
> ms.1 from job set of time 1427791640000 ms
> 15/03/31 10:47:20 INFO JobScheduler: Total delay: 0,231 s for time 
> 1427791640000 ms (execution: 0,213 s)
> 15/03/31 10:47:20 INFO MappedRDD: Removing RDD 2 from persistence list
> 15/03/31 10:47:20 INFO BlockManager: Removing RDD 2
> 15/03/31 10:47:20 INFO MappedRDD: Removing RDD 1 from persistence list
> 15/03/31 10:47:20 INFO KafkaRDD: Removing RDD 0 from persistence list
> 15/03/31 10:47:20 INFO BlockManager: Removing RDD 0
> 15/03/31 10:47:20 INFO MappedRDD: Removing RDD 3 from persistence list
> 15/03/31 10:47:20 INFO BlockManager: Removing RDD 3
> 15/03/31 10:47:20 INFO BlockManager: Removing RDD 1
> 15/03/31 10:47:20 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer()
> 15/03/31 10:47:22 INFO JobScheduler: Added jobs for time 1427791642000 ms
> 15/03/31 10:47:22 INFO JobScheduler: Starting job streaming job 1427791642000 
> ms.0 from job set of time 1427791642000 ms
> 15/03/31 10:47:22 INFO SparkContext: Starting job: print at Main.scala:59
> 15/03/31 10:47:22 INFO DAGScheduler: Got job 8 (print at Main.scala:59) with 
> 1 output partitions (allowLocal=true)
> 15/03/31 10:47:22 INFO DAGScheduler: Final stage: Stage 8(print at 
> Main.scala:59)
> 15/03/31 10:47:22 INFO DAGScheduler: Parents of final stage: List()
> 15/03/31 10:47:22 INFO DAGScheduler: Missing parents: List()
> 15/03/31 10:47:22 INFO DAGScheduler: Submitting Stage 8 (MappedRDD[10] at map 
> at Main.scala:50), which has no missing parents
> 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(3568) called with 
> curMem=32250, maxMem=278019440
> 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_8 stored as values in 
> memory (estimated size 3.5 KB, free 265.1 MB)
> 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(1949) called with 
> curMem=35818, maxMem=278019440
> 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes 
> in memory (estimated size 1949.0 B, free 265.1 MB)
> 15/03/31 10:47:22 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory 
> on localhost:40630 (size: 1949.0 B, free: 265.1 MB)
> 15/03/31 10:47:22 INFO BlockManagerMaster: Updated info of block 
> broadcast_8_piece0
> 15/03/31 10:47:22 INFO SparkContext: Created broadcast 8 from broadcast at 
> DAGScheduler.scala:838
> 15/03/31 10:47:22 INFO DAGScheduler: Submitting 1 missing tasks from Stage 8 
> (MappedRDD[10] at map at Main.scala:50)
> 15/03/31 10:47:22 INFO TaskSchedulerImpl: Adding task set 8.0 with 1 tasks
> 15/03/31 10:47:22 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 
> 40, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:22 INFO Executor: Running task 0.0 in stage 8.0 (TID 40)
> 15/03/31 10:47:22 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 6
> 15/03/31 10:47:22 INFO Executor: Finished task 0.0 in stage 8.0 (TID 40). 570 
> bytes result sent to driver
> 15/03/31 10:47:22 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 
> 40) in 6 ms on localhost (1/1)
> 15/03/31 10:47:22 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks 
> have all completed, from pool
> 15/03/31 10:47:22 INFO DAGScheduler: Stage 8 (print at Main.scala:59) 
> finished in 0,008 s
> 15/03/31 10:47:22 INFO DAGScheduler: Job 8 finished: print at Main.scala:59, 
> took 0,021082 s
> 15/03/31 10:47:22 INFO SparkContext: Starting job: print at Main.scala:59
> 15/03/31 10:47:22 INFO DAGScheduler: Got job 9 (print at Main.scala:59) with 
> 4 output partitions (allowLocal=true)
> 15/03/31 10:47:22 INFO DAGScheduler: Final stage: Stage 9(print at 
> Main.scala:59)
> 15/03/31 10:47:22 INFO DAGScheduler: Parents of final stage: List()
> 15/03/31 10:47:22 INFO DAGScheduler: Missing parents: List()
> 15/03/31 10:47:22 INFO DAGScheduler: Submitting Stage 9 (MappedRDD[10] at map 
> at Main.scala:50), which has no missing parents
> 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(3568) called with 
> curMem=37767, maxMem=278019440
> 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_9 stored as values in 
> memory (estimated size 3.5 KB, free 265.1 MB)
> 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(1949) called with 
> curMem=41335, maxMem=278019440
> 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes 
> in memory (estimated size 1949.0 B, free 265.1 MB)
> 15/03/31 10:47:22 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory 
> on localhost:40630 (size: 1949.0 B, free: 265.1 MB)
> 15/03/31 10:47:22 INFO BlockManagerMaster: Updated info of block 
> broadcast_9_piece0
> 15/03/31 10:47:22 INFO SparkContext: Created broadcast 9 from broadcast at 
> DAGScheduler.scala:838
> 15/03/31 10:47:22 INFO DAGScheduler: Submitting 4 missing tasks from Stage 9 
> (MappedRDD[10] at map at Main.scala:50)
> 15/03/31 10:47:22 INFO TaskSchedulerImpl: Adding task set 9.0 with 4 tasks
> 15/03/31 10:47:22 INFO TaskSetManager: Starting task 0.0 in stage 9.0 (TID 
> 41, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:22 INFO TaskSetManager: Starting task 1.0 in stage 9.0 (TID 
> 42, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:22 INFO TaskSetManager: Starting task 2.0 in stage 9.0 (TID 
> 43, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:22 INFO TaskSetManager: Starting task 3.0 in stage 9.0 (TID 
> 44, localhost, ANY, 1266 bytes)
> 15/03/31 10:47:22 INFO Executor: Running task 2.0 in stage 9.0 (TID 43)
> 15/03/31 10:47:22 INFO Executor: Running task 1.0 in stage 9.0 (TID 42)
> 15/03/31 10:47:22 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 0
> 15/03/31 10:47:22 INFO Executor: Running task 0.0 in stage 9.0 (TID 41)
> 15/03/31 10:47:22 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the 
> same as ending offset skipping toto 3
> 15/03/31 10:47:22 INFO Executor: Finished task 2.0 in stage 9.0 (TID 43). 570 
> bytes result sent to driver
> 15/03/31 10:47:22 INFO Executor: Finished task 0.0 in stage 9.0 (TID 41). 570 
> bytes result sent to driver
> 15/03/31 10:47:22 INFO Executor: Running task 3.0 in stage 9.0 (TID 44)
> 15/03/31 10:47:22 INFO TaskSetManager: Finished task 2.0 in stage 9.0 (TID 
> 43) in 5 ms on localhost (1/4)
> 15/03/31 10:47:22 INFO KafkaRDD: Computing topic toto, partition 9 offsets 0 
> -> 1
> 15/03/31 10:47:22 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 
> 41) in 9 ms on localhost (2/4)
> 15/03/31 10:47:22 INFO VerifiableProperties: Verifying properties
> 15/03/31 10:47:22 INFO VerifiableProperties: Property group.id is overridden 
> to
> 15/03/31 10:47:22 INFO VerifiableProperties: Property zookeeper.connect is 
> overridden to
> 15/03/31 10:47:22 INFO KafkaRDD: Computing topic toto, partition 8 offsets 0 
> -> 1
> 15/03/31 10:47:22 ERROR TaskContextImpl: Error in TaskCompletionListener
> java.lang.NullPointerException
>    at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
>    at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
>    at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
>    at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
>    at 
> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
>    at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
>    at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
>    at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>    at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
>    at org.apache.spark.scheduler.Task.run(Task.scala:58)
>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>    at java.lang.Thread.run(Thread.java:745)
> 15/03/31 10:47:22 INFO VerifiableProperties: Verifying properties
> 15/03/31 10:47:22 INFO VerifiableProperties: Property group.id is overridden 
> to
> 15/03/31 10:47:22 INFO VerifiableProperties: Property zookeeper.connect is 
> overridden to
> 15/03/31 10:47:22 ERROR TaskContextImpl: Error in TaskCompletionListener
> java.lang.NullPointerException
>    at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
>    at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
>    at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
>    at 
> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
>    at 
> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
>    at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
>    at 
> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
>    at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>    at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
>    at org.apache.spark.scheduler.Task.run(Task.scala:58)
>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>    at java.lang.Thread.run(Thread.java:745)
> 15/03/31 10:47:22 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 42)
> org.apache.spark.util.TaskCompletionListenerException
>    at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
>    at org.apache.spark.scheduler.Task.run(Task.scala:58)
>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>    at java.lang.Thread.run(Thread.java:745)
> 15/03/31 10:47:22 ERROR Executor: Exception in task 3.0 in stage 9.0 (TID 44)
> org.apache.spark.util.TaskCompletionListenerException
>    at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
>    at org.apache.spark.scheduler.Task.run(Task.scala:58)
>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>    at java.lang.Thread.run(Thread.java:745)
> 15/03/31 10:47:22 WARN TaskSetManager: Lost task 3.0 in stage 9.0 (TID 44, 
> localhost): org.apache.spark.util.TaskCompletionListenerException
>    at 
> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
>    at org.apache.spark.scheduler.Task.run(Task.scala:58)
>    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>    at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>    at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>    at java.lang.Thread.run(Thread.java:745)
>
> 15/03/31 10:47:22 ERROR TaskSetManager: Task 3 in stage 9.0 failed 1
> times; aborting job"
>
> I don't know if this could help but using the Spark UI. I find some
> additional log when the tasks failed :
>
> org.apache.spark.streaming.dstream.DStream.print(DStream.scala:624)
>
> which corresponds to
> https://github.com/cloudera/spark/blob/cdh5.3.2-release/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala
>
> For integration, I'm using the Spark version ship with cloudera 5.3.2
> whereas in development Spark 1.3.0. Can this cause some troubles ?
>
> Thanks for your answer.
> Regards,
> Nicolas PHUNG
>
> On Mon, Mar 30, 2015 at 7:05 PM, Ted Yu <yuzhih...@gmail.com> wrote:
>
>> Nicolas:
>> See if there was occurrence of the following exception in the log:
>>           errs => throw new SparkException(
>>             s"Couldn't connect to leader for topic ${part.topic}
>> ${part.partition}: " +
>>               errs.mkString("\n")),
>>
>> Cheers
>>
>> On Mon, Mar 30, 2015 at 9:40 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> This line
>>>
>>> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(
>>> KafkaRDD.scala:158)
>>>
>>> is the attempt to close the underlying kafka simple consumer.
>>>
>>> We can add a null pointer check, but the underlying issue of the
>>> consumer being null probably indicates a problem earlier.  Do you see any
>>> previous error messages?
>>>
>>> Also, can you clarify for the successful and failed cases which topics
>>> you are attempting this on, how many partitions there are, and whether
>>> there are messages in the partitions?  There's an existing jira regarding
>>> empty partitions.
>>>
>>>
>>>
>>>
>>> On Mon, Mar 30, 2015 at 11:05 AM, Nicolas Phung <nicolas.ph...@gmail.com
>>> > wrote:
>>>
>>>> Hello,
>>>>
>>>> I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach
>>>> 2: Direct Approach (No Receivers)" (
>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html).
>>>> I'm using the following code snippets :
>>>>
>>>> // Create direct kafka stream with brokers and topics
>>>> val messages = KafkaUtils.createDirectStream[String, Array[Byte],
>>>> StringDecoder, DefaultDecoder](
>>>> ssc, kafkaParams, topicsSet)
>>>>
>>>> // Get the stuff from Kafka and print them
>>>> val raw = messages.map(_._2)
>>>> val dStream: DStream[RawScala] = raw.map(
>>>> byte => {
>>>> // Avro Decoder
>>>> println("Byte length: " + byte.length)
>>>> val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema)
>>>> RawScala.toScala(rawDecoder.fromBytes(byte))
>>>> }
>>>> )
>>>> // Useful for debug
>>>> dStream.print()
>>>>
>>>> I launch my Spark Streaming and everything is fine if there's no
>>>> incoming logs from Kafka. When I'm sending a log, I got the following error
>>>> :
>>>>
>>>> 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener
>>>> java.lang.NullPointerException
>>>> at
>>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158)
>>>> at
>>>> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63)
>>>> at
>>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
>>>> at
>>>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101)
>>>> at
>>>> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49)
>>>> at
>>>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68)
>>>> at
>>>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66)
>>>> at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>> at
>>>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:58)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0
>>>> (TID 94) in 12 ms on localhost (2/4)
>>>> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0
>>>> (TID 93) in 13 ms on localhost (3/4)
>>>> 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0
>>>> (TID 91)
>>>> org.apache.spark.util.TaskCompletionListenerException
>>>> at
>>>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:58)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>> 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID
>>>> 91, localhost): org.apache.spark.util.TaskCompletionListenerException
>>>> at
>>>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:58)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1
>>>> times; aborting job
>>>> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose
>>>> tasks have all completed, from pool
>>>> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28
>>>> 15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at
>>>> HotFCANextGen.scala:63, took 0,041068 s
>>>> 15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job
>>>> 1427729680000 ms.1 from job set of time 1427729680000 ms
>>>> 15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job
>>>> 1427729680000 ms.0
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>>>> 28.0 (TID 91, localhost):
>>>> org.apache.spark.util.TaskCompletionListenerException
>>>> at
>>>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76)
>>>> at org.apache.spark.scheduler.Task.run(Task.scala:58)
>>>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> at
>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> at java.lang.Thread.run(Thread.java:745)
>>>>
>>>> Driver stacktrace:
>>>> at org.apache.spark.scheduler.DAGScheduler.org
>>>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202)
>>>> at
>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696)
>>>> at scala.Option.foreach(Option.scala:236)
>>>> at
>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696)
>>>> at
>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420)
>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>> at
>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>> at
>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>
>>>> The same code snippet works well if the topic is with a single
>>>> partition. The issue happens when the topic is using multiple partitions.
>>>> Am I doing something wrong ? Can you help me find the right way to write
>>>> this with kafka topic with multiple partitions.
>>>>
>>>> Regards,
>>>> Nicolas PHUNG
>>>>
>>>
>>>
>>
>

Reply via email to