Hello, @Akhil Das I'm trying to use the experimental API https://github.com/apache/spark/blob/master/examples/scala-2.10/src/main/scala/org/apache/spark/examples/streaming/DirectKafkaWordCount.scala <https://www.google.com/url?q=https%3A%2F%2Fgithub.com%2Fapache%2Fspark%2Fblob%2Fmaster%2Fexamples%2Fscala-2.10%2Fsrc%2Fmain%2Fscala%2Forg%2Fapache%2Fspark%2Fexamples%2Fstreaming%2FDirectKafkaWordCount.scala&sa=D&sntz=1&usg=AFQjCNFOmScaSfP-2J4Zn56k86-jHUkYaw>. I'm reusing the same code snippet to initialize my topicSet.
@Cody Koeninger I don't see any previous error messages (see the full log at the end). To create the topic, I'm doing the following : "kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 10 --topic toto" "kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic toto-single" I'm launching my Spark Streaming in local mode. @Ted Yu There's no log "Couldn't connect to leader for topic", here's the full version : "spark-submit --conf config.resource=application-integration.conf --class nextgen.Main assembly-0.1-SNAPSHOT.jar 15/03/31 10:47:12 INFO SecurityManager: Changing view acls to: nphung 15/03/31 10:47:12 INFO SecurityManager: Changing modify acls to: nphung 15/03/31 10:47:12 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(nphung); users with modify permissions: Set(nphung) 15/03/31 10:47:13 INFO Slf4jLogger: Slf4jLogger started 15/03/31 10:47:13 INFO Remoting: Starting remoting 15/03/31 10:47:13 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@int.local:44180] 15/03/31 10:47:13 INFO Remoting: Remoting now listens on addresses: [akka.tcp://sparkDriver@int.local:44180] 15/03/31 10:47:13 INFO Utils: Successfully started service 'sparkDriver' on port 44180. 15/03/31 10:47:13 INFO SparkEnv: Registering MapOutputTracker 15/03/31 10:47:13 INFO SparkEnv: Registering BlockManagerMaster 15/03/31 10:47:13 INFO DiskBlockManager: Created local directory at /tmp/spark-local-20150331104713-2238 15/03/31 10:47:13 INFO MemoryStore: MemoryStore started with capacity 265.1 MB 15/03/31 10:47:15 INFO HttpFileServer: HTTP File server directory is /tmp/spark-2c8e34a0-bec3-4f1e-9fe7-83e08efc4f53 15/03/31 10:47:15 INFO HttpServer: Starting HTTP Server 15/03/31 10:47:15 INFO Utils: Successfully started service 'HTTP file server' on port 50204. 15/03/31 10:47:15 INFO Utils: Successfully started service 'SparkUI' on port 4040. 15/03/31 10:47:15 INFO SparkUI: Started SparkUI at http://int.local:4040 15/03/31 10:47:16 INFO SparkContext: Added JAR file:/home/nphung/assembly-0.1-SNAPSHOT.jar at http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with timestamp 1427791636151 15/03/31 10:47:16 INFO AkkaUtils: Connecting to HeartbeatReceiver: akka.tcp://sparkDriver@int.local:44180/user/HeartbeatReceiver 15/03/31 10:47:16 INFO NettyBlockTransferService: Server created on 40630 15/03/31 10:47:16 INFO BlockManagerMaster: Trying to register BlockManager 15/03/31 10:47:16 INFO BlockManagerMasterActor: Registering block manager localhost:40630 with 265.1 MB RAM, BlockManagerId(<driver>, localhost, 40630) 15/03/31 10:47:16 INFO BlockManagerMaster: Registered BlockManager 15/03/31 10:47:17 INFO EventLoggingListener: Logging events to hdfs://int.local:8020/user/spark/applicationHistory/local-1427791636195 15/03/31 10:47:17 INFO VerifiableProperties: Verifying properties 15/03/31 10:47:17 INFO VerifiableProperties: Property group.id is overridden to 15/03/31 10:47:17 INFO VerifiableProperties: Property zookeeper.connect is overridden to 15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@7fd8c559 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@44c13103 15/03/31 10:47:17 INFO ForEachDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO ForEachDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO ForEachDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@8f2098e 15/03/31 10:47:17 INFO ForEachDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO MappedDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO DirectKafkaInputDStream: metadataCleanupDelay = -1 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO DirectKafkaInputDStream: Initialized and validated org.apache.spark.streaming.kafka.DirectKafkaInputDStream@1daf3b44 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@7fd8c559 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@44c13103 15/03/31 10:47:17 INFO MappedDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO MappedDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO MappedDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO MappedDStream: Initialized and validated org.apache.spark.streaming.dstream.MappedDStream@6c6366cf 15/03/31 10:47:17 INFO ForEachDStream: Slide time = 2000 ms 15/03/31 10:47:17 INFO ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 15/03/31 10:47:17 INFO ForEachDStream: Checkpoint interval = null 15/03/31 10:47:17 INFO ForEachDStream: Remember duration = 2000 ms 15/03/31 10:47:17 INFO ForEachDStream: Initialized and validated org.apache.spark.streaming.dstream.ForEachDStream@55a88417 15/03/31 10:47:17 INFO RecurringTimer: Started timer for JobGenerator at time 1427791638000 15/03/31 10:47:17 INFO JobGenerator: Started JobGenerator at 1427791638000 ms 15/03/31 10:47:17 INFO JobScheduler: Started JobScheduler 15/03/31 10:47:18 INFO VerifiableProperties: Verifying properties 15/03/31 10:47:18 INFO VerifiableProperties: Property group.id is overridden to 15/03/31 10:47:18 INFO VerifiableProperties: Property zookeeper.connect is overridden to 15/03/31 10:47:18 INFO JobScheduler: Added jobs for time 1427791638000 ms 15/03/31 10:47:18 INFO JobScheduler: Starting job streaming job 1427791638000 ms.0 from job set of time 1427791638000 ms 15/03/31 10:47:18 INFO SparkContext: Starting job: print at Main.scala:59 15/03/31 10:47:18 INFO DAGScheduler: Got job 0 (print at Main.scala:59) with 1 output partitions (allowLocal=true) 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 0(print at Main.scala:59) 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List() 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List() 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[2] at map at Main.scala:50), which has no missing parents 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(3568) called with curMem=0, maxMem=278019440 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 3.5 KB, free 265.1 MB) 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(1946) called with curMem=3568, maxMem=278019440 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1946.0 B, free 265.1 MB) 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:40630 (size: 1946.0 B, free: 265.1 MB) 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/31 10:47:18 INFO SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:838 15/03/31 10:47:18 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MappedRDD[2] at map at Main.scala:50) 15/03/31 10:47:18 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 15/03/31 10:47:18 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 1266 bytes) 15/03/31 10:47:18 INFO Executor: Running task 0.0 in stage 0.0 (TID 0) 15/03/31 10:47:18 INFO Executor: Fetching http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar with timestamp 1427791636151 15/03/31 10:47:18 INFO Utils: Fetching http://10.153.165.98:50204/jars/assembly-0.1-SNAPSHOT.jar to /tmp/fetchFileTemp4524847716079927807.tmp 15/03/31 10:47:18 INFO Executor: Adding file:/tmp/spark-d295d121-f359-4c7d-8725-e1f958ebff5a/assembly-0.1-SNAPSHOT.jar to class loader 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 6 15/03/31 10:47:18 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 570 bytes result sent to driver 15/03/31 10:47:18 INFO DAGScheduler: Stage 0 (print at Main.scala:59) finished in 0,371 s 15/03/31 10:47:18 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 350 ms on localhost (1/1) 15/03/31 10:47:18 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 15/03/31 10:47:18 INFO DAGScheduler: Job 0 finished: print at Main.scala:59, took 0,717929 s 15/03/31 10:47:18 INFO SparkContext: Starting job: print at Main.scala:59 15/03/31 10:47:18 INFO DAGScheduler: Got job 1 (print at Main.scala:59) with 4 output partitions (allowLocal=true) 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 1(print at Main.scala:59) 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List() 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List() 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 1 (MappedRDD[2] at map at Main.scala:50), which has no missing parents 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(3568) called with curMem=5514, maxMem=278019440 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 3.5 KB, free 265.1 MB) 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(1946) called with curMem=9082, maxMem=278019440 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1946.0 B, free 265.1 MB) 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:40630 (size: 1946.0 B, free: 265.1 MB) 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/31 10:47:18 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:838 15/03/31 10:47:18 INFO DAGScheduler: Submitting 4 missing tasks from Stage 1 (MappedRDD[2] at map at Main.scala:50) 15/03/31 10:47:18 INFO TaskSchedulerImpl: Adding task set 1.0 with 4 tasks 15/03/31 10:47:18 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, ANY, 1266 bytes) 15/03/31 10:47:18 INFO TaskSetManager: Starting task 1.0 in stage 1.0 (TID 2, localhost, ANY, 1266 bytes) 15/03/31 10:47:18 INFO TaskSetManager: Starting task 2.0 in stage 1.0 (TID 3, localhost, ANY, 1266 bytes) 15/03/31 10:47:18 INFO TaskSetManager: Starting task 3.0 in stage 1.0 (TID 4, localhost, ANY, 1266 bytes) 15/03/31 10:47:18 INFO Executor: Running task 0.0 in stage 1.0 (TID 1) 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 3 15/03/31 10:47:18 INFO Executor: Running task 1.0 in stage 1.0 (TID 2) 15/03/31 10:47:18 INFO Executor: Running task 2.0 in stage 1.0 (TID 3) 15/03/31 10:47:18 INFO Executor: Running task 3.0 in stage 1.0 (TID 4) 15/03/31 10:47:18 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 570 bytes result sent to driver 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 9 15/03/31 10:47:18 INFO Executor: Finished task 1.0 in stage 1.0 (TID 2). 570 bytes result sent to driver 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 8 15/03/31 10:47:18 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 21 ms on localhost (1/4) 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 0 15/03/31 10:47:18 INFO Executor: Finished task 3.0 in stage 1.0 (TID 4). 570 bytes result sent to driver 15/03/31 10:47:18 INFO Executor: Finished task 2.0 in stage 1.0 (TID 3). 570 bytes result sent to driver 15/03/31 10:47:18 INFO TaskSetManager: Finished task 1.0 in stage 1.0 (TID 2) in 31 ms on localhost (2/4) 15/03/31 10:47:18 INFO TaskSetManager: Finished task 3.0 in stage 1.0 (TID 4) in 39 ms on localhost (3/4) 15/03/31 10:47:18 INFO TaskSetManager: Finished task 2.0 in stage 1.0 (TID 3) in 43 ms on localhost (4/4) 15/03/31 10:47:18 INFO DAGScheduler: Stage 1 (print at Main.scala:59) finished in 0,048 s 15/03/31 10:47:18 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 15/03/31 10:47:18 INFO DAGScheduler: Job 1 finished: print at Main.scala:59, took 0,073960 s 15/03/31 10:47:18 INFO SparkContext: Starting job: print at Main.scala:59 15/03/31 10:47:18 INFO DAGScheduler: Got job 2 (print at Main.scala:59) with 5 output partitions (allowLocal=true) 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 2(print at Main.scala:59) 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List() 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List() 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 2 (MappedRDD[2] at map at Main.scala:50), which has no missing parents 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(3568) called with curMem=11028, maxMem=278019440 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 3.5 KB, free 265.1 MB) 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(1946) called with curMem=14596, maxMem=278019440 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1946.0 B, free 265.1 MB) 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:40630 (size: 1946.0 B, free: 265.1 MB) 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 15/03/31 10:47:18 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:838 15/03/31 10:47:18 INFO DAGScheduler: Submitting 5 missing tasks from Stage 2 (MappedRDD[2] at map at Main.scala:50) 15/03/31 10:47:18 INFO TaskSchedulerImpl: Adding task set 2.0 with 5 tasks 15/03/31 10:47:18 INFO TaskSetManager: Starting task 0.0 in stage 2.0 (TID 5, localhost, ANY, 1266 bytes) 15/03/31 10:47:18 INFO TaskSetManager: Starting task 1.0 in stage 2.0 (TID 6, localhost, ANY, 1266 bytes) 15/03/31 10:47:18 INFO TaskSetManager: Starting task 2.0 in stage 2.0 (TID 7, localhost, ANY, 1266 bytes) 15/03/31 10:47:18 INFO TaskSetManager: Starting task 3.0 in stage 2.0 (TID 8, localhost, ANY, 1266 bytes) 15/03/31 10:47:18 INFO Executor: Running task 0.0 in stage 2.0 (TID 5) 15/03/31 10:47:18 INFO Executor: Running task 2.0 in stage 2.0 (TID 7) 15/03/31 10:47:18 INFO Executor: Running task 1.0 in stage 2.0 (TID 6) 15/03/31 10:47:18 INFO Executor: Running task 3.0 in stage 2.0 (TID 8) 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 2 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 5 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 7 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 1 15/03/31 10:47:18 INFO Executor: Finished task 0.0 in stage 2.0 (TID 5). 570 bytes result sent to driver 15/03/31 10:47:18 INFO Executor: Finished task 1.0 in stage 2.0 (TID 6). 570 bytes result sent to driver 15/03/31 10:47:18 INFO TaskSetManager: Starting task 4.0 in stage 2.0 (TID 9, localhost, ANY, 1266 bytes) 15/03/31 10:47:18 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID 5) in 12 ms on localhost (1/5) 15/03/31 10:47:18 INFO Executor: Running task 4.0 in stage 2.0 (TID 9) 15/03/31 10:47:18 INFO Executor: Finished task 3.0 in stage 2.0 (TID 8). 570 bytes result sent to driver 15/03/31 10:47:18 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 4 15/03/31 10:47:18 INFO Executor: Finished task 2.0 in stage 2.0 (TID 7). 570 bytes result sent to driver 15/03/31 10:47:18 INFO TaskSetManager: Finished task 1.0 in stage 2.0 (TID 6) in 18 ms on localhost (2/5) 15/03/31 10:47:18 INFO Executor: Finished task 4.0 in stage 2.0 (TID 9). 570 bytes result sent to driver 15/03/31 10:47:18 INFO TaskSetManager: Finished task 3.0 in stage 2.0 (TID 8) in 23 ms on localhost (3/5) 15/03/31 10:47:18 INFO TaskSetManager: Finished task 2.0 in stage 2.0 (TID 7) in 27 ms on localhost (4/5) 15/03/31 10:47:18 INFO TaskSetManager: Finished task 4.0 in stage 2.0 (TID 9) in 21 ms on localhost (5/5) 15/03/31 10:47:18 INFO DAGScheduler: Stage 2 (print at Main.scala:59) finished in 0,033 s 15/03/31 10:47:18 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks have all completed, from pool ------------------------------------------- Time: 1427791638000 ms ------------------------------------------- 15/03/31 10:47:18 INFO DAGScheduler: Job 2 finished: print at Main.scala:59, took 0,057381 s 15/03/31 10:47:18 INFO JobScheduler: Finished job streaming job 1427791638000 ms.0 from job set of time 1427791638000 ms 15/03/31 10:47:18 INFO JobScheduler: Starting job streaming job 1427791638000 ms.1 from job set of time 1427791638000 ms 15/03/31 10:47:18 INFO SparkContext: Starting job: foreachRDD at Main.scala:70 15/03/31 10:47:18 INFO DAGScheduler: Got job 3 (foreachRDD at Main.scala:70) with 10 output partitions (allowLocal=false) 15/03/31 10:47:18 INFO DAGScheduler: Final stage: Stage 3(foreachRDD at Main.scala:70) 15/03/31 10:47:18 INFO DAGScheduler: Parents of final stage: List() 15/03/31 10:47:18 INFO DAGScheduler: Missing parents: List() 15/03/31 10:47:18 INFO DAGScheduler: Submitting Stage 3 (MappedRDD[3] at map at Main.scala:62), which has no missing parents 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(5064) called with curMem=16542, maxMem=278019440 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_3 stored as values in memory (estimated size 4.9 KB, free 265.1 MB) 15/03/31 10:47:18 INFO MemoryStore: ensureFreeSpace(2791) called with curMem=21606, maxMem=278019440 15/03/31 10:47:18 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory (estimated size 2.7 KB, free 265.1 MB) 15/03/31 10:47:18 INFO BlockManagerInfo: Added broadcast_3_piece0 in memory on localhost:40630 (size: 2.7 KB, free: 265.1 MB) 15/03/31 10:47:18 INFO BlockManagerMaster: Updated info of block broadcast_3_piece0 15/03/31 10:47:19 INFO SparkContext: Created broadcast 3 from broadcast at DAGScheduler.scala:838 15/03/31 10:47:19 INFO DAGScheduler: Submitting 10 missing tasks from Stage 3 (MappedRDD[3] at map at Main.scala:62) 15/03/31 10:47:19 INFO TaskSchedulerImpl: Adding task set 3.0 with 10 tasks 15/03/31 10:47:19 INFO TaskSetManager: Starting task 0.0 in stage 3.0 (TID 10, localhost, ANY, 1266 bytes) 15/03/31 10:47:19 INFO TaskSetManager: Starting task 1.0 in stage 3.0 (TID 11, localhost, ANY, 1266 bytes) 15/03/31 10:47:19 INFO TaskSetManager: Starting task 2.0 in stage 3.0 (TID 12, localhost, ANY, 1266 bytes) 15/03/31 10:47:19 INFO TaskSetManager: Starting task 3.0 in stage 3.0 (TID 13, localhost, ANY, 1266 bytes) 15/03/31 10:47:19 INFO Executor: Running task 0.0 in stage 3.0 (TID 10) 15/03/31 10:47:19 INFO Executor: Running task 2.0 in stage 3.0 (TID 12) 15/03/31 10:47:19 INFO Executor: Running task 1.0 in stage 3.0 (TID 11) 15/03/31 10:47:19 INFO Executor: Running task 3.0 in stage 3.0 (TID 13) 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 6 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 3 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 9 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 0 15/03/31 10:47:19 INFO BlockManager: Removing broadcast 2 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_2_piece0 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_2_piece0 of size 1946 dropped from memory (free 277996989) 15/03/31 10:47:19 INFO BlockManagerInfo: Removed broadcast_2_piece0 on localhost:40630 in memory (size: 1946.0 B, free: 265.1 MB) 15/03/31 10:47:19 INFO BlockManagerMaster: Updated info of block broadcast_2_piece0 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_2 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_2 of size 3568 dropped from memory (free 278000557) 15/03/31 10:47:19 INFO ContextCleaner: Cleaned broadcast 2 15/03/31 10:47:19 INFO BlockManager: Removing broadcast 1 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_1 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_1 of size 3568 dropped from memory (free 278004125) 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_1_piece0 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_1_piece0 of size 1946 dropped from memory (free 278006071) 15/03/31 10:47:19 INFO BlockManagerInfo: Removed broadcast_1_piece0 on localhost:40630 in memory (size: 1946.0 B, free: 265.1 MB) 15/03/31 10:47:19 INFO BlockManagerMaster: Updated info of block broadcast_1_piece0 15/03/31 10:47:19 INFO ContextCleaner: Cleaned broadcast 1 15/03/31 10:47:19 INFO BlockManager: Removing broadcast 0 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_0 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_0 of size 3568 dropped from memory (free 278009639) 15/03/31 10:47:19 INFO BlockManager: Removing block broadcast_0_piece0 15/03/31 10:47:19 INFO MemoryStore: Block broadcast_0_piece0 of size 1946 dropped from memory (free 278011585) 15/03/31 10:47:19 INFO BlockManagerInfo: Removed broadcast_0_piece0 on localhost:40630 in memory (size: 1946.0 B, free: 265.1 MB) 15/03/31 10:47:19 INFO BlockManagerMaster: Updated info of block broadcast_0_piece0 15/03/31 10:47:19 INFO ContextCleaner: Cleaned broadcast 0 15/03/31 10:47:19 INFO Executor: Finished task 3.0 in stage 3.0 (TID 13). 569 bytes result sent to driver 15/03/31 10:47:19 INFO Executor: Finished task 0.0 in stage 3.0 (TID 10). 569 bytes result sent to driver 15/03/31 10:47:19 INFO TaskSetManager: Starting task 4.0 in stage 3.0 (TID 14, localhost, ANY, 1266 bytes) 15/03/31 10:47:19 INFO Executor: Running task 4.0 in stage 3.0 (TID 14) 15/03/31 10:47:19 INFO TaskSetManager: Finished task 3.0 in stage 3.0 (TID 13) in 303 ms on localhost (1/10) 15/03/31 10:47:19 INFO Executor: Finished task 1.0 in stage 3.0 (TID 11). 569 bytes result sent to driver 15/03/31 10:47:19 INFO Executor: Finished task 2.0 in stage 3.0 (TID 12). 569 bytes result sent to driver 15/03/31 10:47:19 INFO TaskSetManager: Starting task 5.0 in stage 3.0 (TID 15, localhost, ANY, 1266 bytes) 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 8 15/03/31 10:47:19 INFO Executor: Running task 5.0 in stage 3.0 (TID 15) 15/03/31 10:47:19 INFO TaskSetManager: Finished task 0.0 in stage 3.0 (TID 10) in 312 ms on localhost (2/10) 15/03/31 10:47:19 INFO TaskSetManager: Finished task 1.0 in stage 3.0 (TID 11) in 314 ms on localhost (3/10) 15/03/31 10:47:19 INFO TaskSetManager: Starting task 6.0 in stage 3.0 (TID 16, localhost, ANY, 1266 bytes) 15/03/31 10:47:19 INFO TaskSetManager: Starting task 7.0 in stage 3.0 (TID 17, localhost, ANY, 1266 bytes) 15/03/31 10:47:19 INFO Executor: Running task 6.0 in stage 3.0 (TID 16) 15/03/31 10:47:19 INFO Executor: Running task 7.0 in stage 3.0 (TID 17) 15/03/31 10:47:19 INFO TaskSetManager: Finished task 2.0 in stage 3.0 (TID 12) in 317 ms on localhost (4/10) 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 5 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 1 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 2 15/03/31 10:47:19 INFO Executor: Finished task 5.0 in stage 3.0 (TID 15). 569 bytes result sent to driver 15/03/31 10:47:19 INFO TaskSetManager: Starting task 8.0 in stage 3.0 (TID 18, localhost, ANY, 1266 bytes) 15/03/31 10:47:19 INFO Executor: Finished task 7.0 in stage 3.0 (TID 17). 569 bytes result sent to driver 15/03/31 10:47:19 INFO TaskSetManager: Starting task 9.0 in stage 3.0 (TID 19, localhost, ANY, 1266 bytes) 15/03/31 10:47:19 INFO Executor: Finished task 4.0 in stage 3.0 (TID 14). 569 bytes result sent to driver 15/03/31 10:47:19 INFO Executor: Running task 9.0 in stage 3.0 (TID 19) 15/03/31 10:47:19 INFO TaskSetManager: Finished task 7.0 in stage 3.0 (TID 17) in 40 ms on localhost (5/10) 15/03/31 10:47:19 INFO Executor: Finished task 6.0 in stage 3.0 (TID 16). 569 bytes result sent to driver 15/03/31 10:47:19 INFO Executor: Running task 8.0 in stage 3.0 (TID 18) 15/03/31 10:47:19 INFO TaskSetManager: Finished task 5.0 in stage 3.0 (TID 15) in 53 ms on localhost (6/10) 15/03/31 10:47:19 INFO TaskSetManager: Finished task 4.0 in stage 3.0 (TID 14) in 59 ms on localhost (7/10) 15/03/31 10:47:19 INFO TaskSetManager: Finished task 6.0 in stage 3.0 (TID 16) in 50 ms on localhost (8/10) 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 4 15/03/31 10:47:19 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 7 15/03/31 10:47:19 INFO Executor: Finished task 8.0 in stage 3.0 (TID 18). 569 bytes result sent to driver 15/03/31 10:47:19 INFO Executor: Finished task 9.0 in stage 3.0 (TID 19). 569 bytes result sent to driver 15/03/31 10:47:19 INFO TaskSetManager: Finished task 8.0 in stage 3.0 (TID 18) in 49 ms on localhost (9/10) 15/03/31 10:47:19 INFO TaskSetManager: Finished task 9.0 in stage 3.0 (TID 19) in 50 ms on localhost (10/10) 15/03/31 10:47:19 INFO TaskSchedulerImpl: Removed TaskSet 3.0, whose tasks have all completed, from pool 15/03/31 10:47:19 INFO DAGScheduler: Stage 3 (foreachRDD at Main.scala:70) finished in 0,410 s 15/03/31 10:47:19 INFO DAGScheduler: Job 3 finished: foreachRDD at Main.scala:70, took 0,438664 s 15/03/31 10:47:19 INFO JobScheduler: Finished job streaming job 1427791638000 ms.1 from job set of time 1427791638000 ms 15/03/31 10:47:19 INFO JobScheduler: Total delay: 1,420 s for time 1427791638000 ms (execution: 1,372 s) 15/03/31 10:47:19 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/03/31 10:47:20 INFO JobScheduler: Starting job streaming job 1427791640000 ms.0 from job set of time 1427791640000 ms 15/03/31 10:47:20 INFO JobScheduler: Added jobs for time 1427791640000 ms 15/03/31 10:47:20 INFO SparkContext: Starting job: print at Main.scala:59 15/03/31 10:47:20 INFO DAGScheduler: Got job 4 (print at Main.scala:59) with 1 output partitions (allowLocal=true) 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 4(print at Main.scala:59) 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List() 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List() 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 4 (MappedRDD[6] at map at Main.scala:50), which has no missing parents 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(3568) called with curMem=7855, maxMem=278019440 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_4 stored as values in memory (estimated size 3.5 KB, free 265.1 MB) 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(1945) called with curMem=11423, maxMem=278019440 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_4_piece0 stored as bytes in memory (estimated size 1945.0 B, free 265.1 MB) 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_4_piece0 in memory on localhost:40630 (size: 1945.0 B, free: 265.1 MB) 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block broadcast_4_piece0 15/03/31 10:47:20 INFO SparkContext: Created broadcast 4 from getCallSite at DStream.scala:294 15/03/31 10:47:20 INFO DAGScheduler: Submitting 1 missing tasks from Stage 4 (MappedRDD[6] at map at Main.scala:50) 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 4.0 with 1 tasks 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 4.0 (TID 20, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 4.0 (TID 20) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 6 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 4.0 (TID 20). 570 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 4.0 (TID 20) in 6 ms on localhost (1/1) 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 4.0, whose tasks have all completed, from pool 15/03/31 10:47:20 INFO DAGScheduler: Stage 4 (print at Main.scala:59) finished in 0,007 s 15/03/31 10:47:20 INFO DAGScheduler: Job 4 finished: print at Main.scala:59, took 0,021293 s 15/03/31 10:47:20 INFO SparkContext: Starting job: print at Main.scala:59 15/03/31 10:47:20 INFO DAGScheduler: Got job 5 (print at Main.scala:59) with 4 output partitions (allowLocal=true) 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 5(print at Main.scala:59) 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List() 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List() 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 5 (MappedRDD[6] at map at Main.scala:50), which has no missing parents 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(3568) called with curMem=13368, maxMem=278019440 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_5 stored as values in memory (estimated size 3.5 KB, free 265.1 MB) 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(1945) called with curMem=16936, maxMem=278019440 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 1945.0 B, free 265.1 MB) 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:40630 (size: 1945.0 B, free: 265.1 MB) 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block broadcast_5_piece0 15/03/31 10:47:20 INFO SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:838 15/03/31 10:47:20 INFO DAGScheduler: Submitting 4 missing tasks from Stage 5 (MappedRDD[6] at map at Main.scala:50) 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 5.0 with 4 tasks 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 5.0 (TID 21, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 1.0 in stage 5.0 (TID 22, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 2.0 in stage 5.0 (TID 23, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 3.0 in stage 5.0 (TID 24, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 5.0 (TID 21) 15/03/31 10:47:20 INFO Executor: Running task 2.0 in stage 5.0 (TID 23) 15/03/31 10:47:20 INFO Executor: Running task 3.0 in stage 5.0 (TID 24) 15/03/31 10:47:20 INFO Executor: Running task 1.0 in stage 5.0 (TID 22) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 0 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 8 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 9 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 3 15/03/31 10:47:20 INFO Executor: Finished task 2.0 in stage 5.0 (TID 23). 570 bytes result sent to driver 15/03/31 10:47:20 INFO Executor: Finished task 3.0 in stage 5.0 (TID 24). 570 bytes result sent to driver 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 5.0 (TID 21). 570 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Finished task 2.0 in stage 5.0 (TID 23) in 7 ms on localhost (1/4) 15/03/31 10:47:20 INFO Executor: Finished task 1.0 in stage 5.0 (TID 22). 570 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Finished task 3.0 in stage 5.0 (TID 24) in 10 ms on localhost (2/4) 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 5.0 (TID 21) in 14 ms on localhost (3/4) 15/03/31 10:47:20 INFO TaskSetManager: Finished task 1.0 in stage 5.0 (TID 22) in 16 ms on localhost (4/4) 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 5.0, whose tasks have all completed, from pool 15/03/31 10:47:20 INFO DAGScheduler: Stage 5 (print at Main.scala:59) finished in 0,020 s 15/03/31 10:47:20 INFO DAGScheduler: Job 5 finished: print at Main.scala:59, took 0,034514 s 15/03/31 10:47:20 INFO SparkContext: Starting job: print at Main.scala:59 15/03/31 10:47:20 INFO DAGScheduler: Got job 6 (print at Main.scala:59) with 5 output partitions (allowLocal=true) 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 6(print at Main.scala:59) 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List() 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List() 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 6 (MappedRDD[6] at map at Main.scala:50), which has no missing parents 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(3568) called with curMem=18881, maxMem=278019440 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 3.5 KB, free 265.1 MB) 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(1945) called with curMem=22449, maxMem=278019440 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 1945.0 B, free 265.1 MB) 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:40630 (size: 1945.0 B, free: 265.1 MB) 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block broadcast_6_piece0 15/03/31 10:47:20 INFO SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:838 15/03/31 10:47:20 INFO DAGScheduler: Submitting 5 missing tasks from Stage 6 (MappedRDD[6] at map at Main.scala:50) 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 6.0 with 5 tasks 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 6.0 (TID 25, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 1.0 in stage 6.0 (TID 26, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 2.0 in stage 6.0 (TID 27, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 3.0 in stage 6.0 (TID 28, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 6.0 (TID 25) 15/03/31 10:47:20 INFO Executor: Running task 2.0 in stage 6.0 (TID 27) 15/03/31 10:47:20 INFO Executor: Running task 3.0 in stage 6.0 (TID 28) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 2 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 1 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 7 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 6.0 (TID 25). 570 bytes result sent to driver 15/03/31 10:47:20 INFO Executor: Running task 1.0 in stage 6.0 (TID 26) 15/03/31 10:47:20 INFO Executor: Finished task 3.0 in stage 6.0 (TID 28). 570 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Starting task 4.0 in stage 6.0 (TID 29, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 5 15/03/31 10:47:20 INFO Executor: Finished task 2.0 in stage 6.0 (TID 27). 570 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 6.0 (TID 25) in 8 ms on localhost (1/5) 15/03/31 10:47:20 INFO Executor: Finished task 1.0 in stage 6.0 (TID 26). 570 bytes result sent to driver 15/03/31 10:47:20 INFO Executor: Running task 4.0 in stage 6.0 (TID 29) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 4 15/03/31 10:47:20 INFO TaskSetManager: Finished task 3.0 in stage 6.0 (TID 28) in 10 ms on localhost (2/5) 15/03/31 10:47:20 INFO Executor: Finished task 4.0 in stage 6.0 (TID 29). 570 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Finished task 2.0 in stage 6.0 (TID 27) in 13 ms on localhost (3/5) 15/03/31 10:47:20 INFO TaskSetManager: Finished task 1.0 in stage 6.0 (TID 26) in 15 ms on localhost (4/5) 15/03/31 10:47:20 INFO TaskSetManager: Finished task 4.0 in stage 6.0 (TID 29) in 12 ms on localhost (5/5) 15/03/31 10:47:20 INFO DAGScheduler: Stage 6 (print at Main.scala:59) finished in 0,019 s 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 6.0, whose tasks have all completed, from pool ------------------------------------------- Time: 1427791640000 ms ------------------------------------------- 15/03/31 10:47:20 INFO DAGScheduler: Job 6 finished: print at Main.scala:59, took 0,034922 s 15/03/31 10:47:20 INFO JobScheduler: Finished job streaming job 1427791640000 ms.0 from job set of time 1427791640000 ms 15/03/31 10:47:20 INFO SparkContext: Starting job: foreachRDD at Main.scala:70 15/03/31 10:47:20 INFO DAGScheduler: Got job 7 (foreachRDD at Main.scala:70) with 10 output partitions (allowLocal=false) 15/03/31 10:47:20 INFO DAGScheduler: Final stage: Stage 7(foreachRDD at Main.scala:70) 15/03/31 10:47:20 INFO DAGScheduler: Parents of final stage: List() 15/03/31 10:47:20 INFO JobScheduler: Starting job streaming job 1427791640000 ms.1 from job set of time 1427791640000 ms 15/03/31 10:47:20 INFO DAGScheduler: Missing parents: List() 15/03/31 10:47:20 INFO DAGScheduler: Submitting Stage 7 (MappedRDD[7] at map at Main.scala:62), which has no missing parents 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(5064) called with curMem=24394, maxMem=278019440 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 4.9 KB, free 265.1 MB) 15/03/31 10:47:20 INFO MemoryStore: ensureFreeSpace(2792) called with curMem=29458, maxMem=278019440 15/03/31 10:47:20 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 2.7 KB, free 265.1 MB) 15/03/31 10:47:20 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:40630 (size: 2.7 KB, free: 265.1 MB) 15/03/31 10:47:20 INFO BlockManagerMaster: Updated info of block broadcast_7_piece0 15/03/31 10:47:20 INFO SparkContext: Created broadcast 7 from getCallSite at DStream.scala:294 15/03/31 10:47:20 INFO DAGScheduler: Submitting 10 missing tasks from Stage 7 (MappedRDD[7] at map at Main.scala:62) 15/03/31 10:47:20 INFO TaskSchedulerImpl: Adding task set 7.0 with 10 tasks 15/03/31 10:47:20 INFO TaskSetManager: Starting task 0.0 in stage 7.0 (TID 30, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 1.0 in stage 7.0 (TID 31, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 2.0 in stage 7.0 (TID 32, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 3.0 in stage 7.0 (TID 33, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO Executor: Running task 0.0 in stage 7.0 (TID 30) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 6 15/03/31 10:47:20 INFO Executor: Running task 1.0 in stage 7.0 (TID 31) 15/03/31 10:47:20 INFO Executor: Running task 2.0 in stage 7.0 (TID 32) 15/03/31 10:47:20 INFO Executor: Running task 3.0 in stage 7.0 (TID 33) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 9 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 0 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 3 15/03/31 10:47:20 INFO Executor: Finished task 0.0 in stage 7.0 (TID 30). 569 bytes result sent to driver 15/03/31 10:47:20 INFO Executor: Finished task 2.0 in stage 7.0 (TID 32). 569 bytes result sent to driver 15/03/31 10:47:20 INFO Executor: Finished task 1.0 in stage 7.0 (TID 31). 569 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Starting task 4.0 in stage 7.0 (TID 34, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO Executor: Running task 4.0 in stage 7.0 (TID 34) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 5.0 in stage 7.0 (TID 35, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 6.0 in stage 7.0 (TID 36, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 8 15/03/31 10:47:20 INFO Executor: Running task 6.0 in stage 7.0 (TID 36) 15/03/31 10:47:20 INFO Executor: Finished task 3.0 in stage 7.0 (TID 33). 569 bytes result sent to driver 15/03/31 10:47:20 INFO Executor: Running task 5.0 in stage 7.0 (TID 35) 15/03/31 10:47:20 INFO TaskSetManager: Finished task 0.0 in stage 7.0 (TID 30) in 34 ms on localhost (1/10) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 5 15/03/31 10:47:20 INFO TaskSetManager: Finished task 1.0 in stage 7.0 (TID 31) in 42 ms on localhost (2/10) 15/03/31 10:47:20 INFO TaskSetManager: Starting task 7.0 in stage 7.0 (TID 37, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 2 15/03/31 10:47:20 INFO Executor: Running task 7.0 in stage 7.0 (TID 37) 15/03/31 10:47:20 INFO TaskSetManager: Finished task 2.0 in stage 7.0 (TID 32) in 45 ms on localhost (3/10) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 1 15/03/31 10:47:20 INFO TaskSetManager: Finished task 3.0 in stage 7.0 (TID 33) in 46 ms on localhost (4/10) 15/03/31 10:47:20 INFO Executor: Finished task 4.0 in stage 7.0 (TID 34). 569 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Starting task 8.0 in stage 7.0 (TID 38, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO Executor: Running task 8.0 in stage 7.0 (TID 38) 15/03/31 10:47:20 INFO TaskSetManager: Finished task 4.0 in stage 7.0 (TID 34) in 31 ms on localhost (5/10) 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 7 15/03/31 10:47:20 INFO Executor: Finished task 6.0 in stage 7.0 (TID 36). 569 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Starting task 9.0 in stage 7.0 (TID 39, localhost, ANY, 1266 bytes) 15/03/31 10:47:20 INFO TaskSetManager: Finished task 6.0 in stage 7.0 (TID 36) in 35 ms on localhost (6/10) 15/03/31 10:47:20 INFO Executor: Running task 9.0 in stage 7.0 (TID 39) 15/03/31 10:47:20 INFO Executor: Finished task 5.0 in stage 7.0 (TID 35). 569 bytes result sent to driver 15/03/31 10:47:20 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 4 15/03/31 10:47:20 INFO TaskSetManager: Finished task 5.0 in stage 7.0 (TID 35) in 39 ms on localhost (7/10) 15/03/31 10:47:20 INFO Executor: Finished task 7.0 in stage 7.0 (TID 37). 569 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Finished task 7.0 in stage 7.0 (TID 37) in 36 ms on localhost (8/10) 15/03/31 10:47:20 INFO Executor: Finished task 8.0 in stage 7.0 (TID 38). 569 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Finished task 8.0 in stage 7.0 (TID 38) in 26 ms on localhost (9/10) 15/03/31 10:47:20 INFO Executor: Finished task 9.0 in stage 7.0 (TID 39). 569 bytes result sent to driver 15/03/31 10:47:20 INFO TaskSetManager: Finished task 9.0 in stage 7.0 (TID 39) in 27 ms on localhost (10/10) 15/03/31 10:47:20 INFO TaskSchedulerImpl: Removed TaskSet 7.0, whose tasks have all completed, from pool 15/03/31 10:47:20 INFO DAGScheduler: Stage 7 (foreachRDD at Main.scala:70) finished in 0,098 s 15/03/31 10:47:20 INFO DAGScheduler: Job 7 finished: foreachRDD at Main.scala:70, took 0,111899 s 15/03/31 10:47:20 INFO JobScheduler: Finished job streaming job 1427791640000 ms.1 from job set of time 1427791640000 ms 15/03/31 10:47:20 INFO JobScheduler: Total delay: 0,231 s for time 1427791640000 ms (execution: 0,213 s) 15/03/31 10:47:20 INFO MappedRDD: Removing RDD 2 from persistence list 15/03/31 10:47:20 INFO BlockManager: Removing RDD 2 15/03/31 10:47:20 INFO MappedRDD: Removing RDD 1 from persistence list 15/03/31 10:47:20 INFO KafkaRDD: Removing RDD 0 from persistence list 15/03/31 10:47:20 INFO BlockManager: Removing RDD 0 15/03/31 10:47:20 INFO MappedRDD: Removing RDD 3 from persistence list 15/03/31 10:47:20 INFO BlockManager: Removing RDD 3 15/03/31 10:47:20 INFO BlockManager: Removing RDD 1 15/03/31 10:47:20 INFO ReceivedBlockTracker: Deleting batches ArrayBuffer() 15/03/31 10:47:22 INFO JobScheduler: Added jobs for time 1427791642000 ms 15/03/31 10:47:22 INFO JobScheduler: Starting job streaming job 1427791642000 ms.0 from job set of time 1427791642000 ms 15/03/31 10:47:22 INFO SparkContext: Starting job: print at Main.scala:59 15/03/31 10:47:22 INFO DAGScheduler: Got job 8 (print at Main.scala:59) with 1 output partitions (allowLocal=true) 15/03/31 10:47:22 INFO DAGScheduler: Final stage: Stage 8(print at Main.scala:59) 15/03/31 10:47:22 INFO DAGScheduler: Parents of final stage: List() 15/03/31 10:47:22 INFO DAGScheduler: Missing parents: List() 15/03/31 10:47:22 INFO DAGScheduler: Submitting Stage 8 (MappedRDD[10] at map at Main.scala:50), which has no missing parents 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(3568) called with curMem=32250, maxMem=278019440 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 3.5 KB, free 265.1 MB) 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(1949) called with curMem=35818, maxMem=278019440 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 1949.0 B, free 265.1 MB) 15/03/31 10:47:22 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:40630 (size: 1949.0 B, free: 265.1 MB) 15/03/31 10:47:22 INFO BlockManagerMaster: Updated info of block broadcast_8_piece0 15/03/31 10:47:22 INFO SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:838 15/03/31 10:47:22 INFO DAGScheduler: Submitting 1 missing tasks from Stage 8 (MappedRDD[10] at map at Main.scala:50) 15/03/31 10:47:22 INFO TaskSchedulerImpl: Adding task set 8.0 with 1 tasks 15/03/31 10:47:22 INFO TaskSetManager: Starting task 0.0 in stage 8.0 (TID 40, localhost, ANY, 1266 bytes) 15/03/31 10:47:22 INFO Executor: Running task 0.0 in stage 8.0 (TID 40) 15/03/31 10:47:22 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 6 15/03/31 10:47:22 INFO Executor: Finished task 0.0 in stage 8.0 (TID 40). 570 bytes result sent to driver 15/03/31 10:47:22 INFO TaskSetManager: Finished task 0.0 in stage 8.0 (TID 40) in 6 ms on localhost (1/1) 15/03/31 10:47:22 INFO TaskSchedulerImpl: Removed TaskSet 8.0, whose tasks have all completed, from pool 15/03/31 10:47:22 INFO DAGScheduler: Stage 8 (print at Main.scala:59) finished in 0,008 s 15/03/31 10:47:22 INFO DAGScheduler: Job 8 finished: print at Main.scala:59, took 0,021082 s 15/03/31 10:47:22 INFO SparkContext: Starting job: print at Main.scala:59 15/03/31 10:47:22 INFO DAGScheduler: Got job 9 (print at Main.scala:59) with 4 output partitions (allowLocal=true) 15/03/31 10:47:22 INFO DAGScheduler: Final stage: Stage 9(print at Main.scala:59) 15/03/31 10:47:22 INFO DAGScheduler: Parents of final stage: List() 15/03/31 10:47:22 INFO DAGScheduler: Missing parents: List() 15/03/31 10:47:22 INFO DAGScheduler: Submitting Stage 9 (MappedRDD[10] at map at Main.scala:50), which has no missing parents 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(3568) called with curMem=37767, maxMem=278019440 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_9 stored as values in memory (estimated size 3.5 KB, free 265.1 MB) 15/03/31 10:47:22 INFO MemoryStore: ensureFreeSpace(1949) called with curMem=41335, maxMem=278019440 15/03/31 10:47:22 INFO MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 1949.0 B, free 265.1 MB) 15/03/31 10:47:22 INFO BlockManagerInfo: Added broadcast_9_piece0 in memory on localhost:40630 (size: 1949.0 B, free: 265.1 MB) 15/03/31 10:47:22 INFO BlockManagerMaster: Updated info of block broadcast_9_piece0 15/03/31 10:47:22 INFO SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:838 15/03/31 10:47:22 INFO DAGScheduler: Submitting 4 missing tasks from Stage 9 (MappedRDD[10] at map at Main.scala:50) 15/03/31 10:47:22 INFO TaskSchedulerImpl: Adding task set 9.0 with 4 tasks 15/03/31 10:47:22 INFO TaskSetManager: Starting task 0.0 in stage 9.0 (TID 41, localhost, ANY, 1266 bytes) 15/03/31 10:47:22 INFO TaskSetManager: Starting task 1.0 in stage 9.0 (TID 42, localhost, ANY, 1266 bytes) 15/03/31 10:47:22 INFO TaskSetManager: Starting task 2.0 in stage 9.0 (TID 43, localhost, ANY, 1266 bytes) 15/03/31 10:47:22 INFO TaskSetManager: Starting task 3.0 in stage 9.0 (TID 44, localhost, ANY, 1266 bytes) 15/03/31 10:47:22 INFO Executor: Running task 2.0 in stage 9.0 (TID 43) 15/03/31 10:47:22 INFO Executor: Running task 1.0 in stage 9.0 (TID 42) 15/03/31 10:47:22 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 0 15/03/31 10:47:22 INFO Executor: Running task 0.0 in stage 9.0 (TID 41) 15/03/31 10:47:22 WARN KafkaRDD: Beginning offset ${part.fromOffset} is the same as ending offset skipping toto 3 15/03/31 10:47:22 INFO Executor: Finished task 2.0 in stage 9.0 (TID 43). 570 bytes result sent to driver 15/03/31 10:47:22 INFO Executor: Finished task 0.0 in stage 9.0 (TID 41). 570 bytes result sent to driver 15/03/31 10:47:22 INFO Executor: Running task 3.0 in stage 9.0 (TID 44) 15/03/31 10:47:22 INFO TaskSetManager: Finished task 2.0 in stage 9.0 (TID 43) in 5 ms on localhost (1/4) 15/03/31 10:47:22 INFO KafkaRDD: Computing topic toto, partition 9 offsets 0 -> 1 15/03/31 10:47:22 INFO TaskSetManager: Finished task 0.0 in stage 9.0 (TID 41) in 9 ms on localhost (2/4) 15/03/31 10:47:22 INFO VerifiableProperties: Verifying properties 15/03/31 10:47:22 INFO VerifiableProperties: Property group.id is overridden to 15/03/31 10:47:22 INFO VerifiableProperties: Property zookeeper.connect is overridden to 15/03/31 10:47:22 INFO KafkaRDD: Computing topic toto, partition 8 offsets 0 -> 1 15/03/31 10:47:22 ERROR TaskContextImpl: Error in TaskCompletionListener java.lang.NullPointerException at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158) at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:58) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/03/31 10:47:22 INFO VerifiableProperties: Verifying properties 15/03/31 10:47:22 INFO VerifiableProperties: Property group.id is overridden to 15/03/31 10:47:22 INFO VerifiableProperties: Property zookeeper.connect is overridden to 15/03/31 10:47:22 ERROR TaskContextImpl: Error in TaskCompletionListener java.lang.NullPointerException at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158) at org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) at org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) at org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:58) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/03/31 10:47:22 ERROR Executor: Exception in task 1.0 in stage 9.0 (TID 42) org.apache.spark.util.TaskCompletionListenerException at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) at org.apache.spark.scheduler.Task.run(Task.scala:58) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/03/31 10:47:22 ERROR Executor: Exception in task 3.0 in stage 9.0 (TID 44) org.apache.spark.util.TaskCompletionListenerException at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) at org.apache.spark.scheduler.Task.run(Task.scala:58) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/03/31 10:47:22 WARN TaskSetManager: Lost task 3.0 in stage 9.0 (TID 44, localhost): org.apache.spark.util.TaskCompletionListenerException at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) at org.apache.spark.scheduler.Task.run(Task.scala:58) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 15/03/31 10:47:22 ERROR TaskSetManager: Task 3 in stage 9.0 failed 1 times; aborting job" I don't know if this could help but using the Spark UI. I find some additional log when the tasks failed : org.apache.spark.streaming.dstream.DStream.print(DStream.scala:624) which corresponds to https://github.com/cloudera/spark/blob/cdh5.3.2-release/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala For integration, I'm using the Spark version ship with cloudera 5.3.2 whereas in development Spark 1.3.0. Can this cause some troubles ? Thanks for your answer. Regards, Nicolas PHUNG On Mon, Mar 30, 2015 at 7:05 PM, Ted Yu <yuzhih...@gmail.com> wrote: > Nicolas: > See if there was occurrence of the following exception in the log: > errs => throw new SparkException( > s"Couldn't connect to leader for topic ${part.topic} > ${part.partition}: " + > errs.mkString("\n")), > > Cheers > > On Mon, Mar 30, 2015 at 9:40 AM, Cody Koeninger <c...@koeninger.org> > wrote: > >> This line >> >> at org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close( >> KafkaRDD.scala:158) >> >> is the attempt to close the underlying kafka simple consumer. >> >> We can add a null pointer check, but the underlying issue of the consumer >> being null probably indicates a problem earlier. Do you see any previous >> error messages? >> >> Also, can you clarify for the successful and failed cases which topics >> you are attempting this on, how many partitions there are, and whether >> there are messages in the partitions? There's an existing jira regarding >> empty partitions. >> >> >> >> >> On Mon, Mar 30, 2015 at 11:05 AM, Nicolas Phung <nicolas.ph...@gmail.com> >> wrote: >> >>> Hello, >>> >>> I'm using spark-streaming-kafka 1.3.0 with the new consumer "Approach 2: >>> Direct Approach (No Receivers)" ( >>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html). >>> I'm using the following code snippets : >>> >>> // Create direct kafka stream with brokers and topics >>> val messages = KafkaUtils.createDirectStream[String, Array[Byte], >>> StringDecoder, DefaultDecoder]( >>> ssc, kafkaParams, topicsSet) >>> >>> // Get the stuff from Kafka and print them >>> val raw = messages.map(_._2) >>> val dStream: DStream[RawScala] = raw.map( >>> byte => { >>> // Avro Decoder >>> println("Byte length: " + byte.length) >>> val rawDecoder = new AvroDecoder[Raw](schema = Raw.getClassSchema) >>> RawScala.toScala(rawDecoder.fromBytes(byte)) >>> } >>> ) >>> // Useful for debug >>> dStream.print() >>> >>> I launch my Spark Streaming and everything is fine if there's no >>> incoming logs from Kafka. When I'm sending a log, I got the following error >>> : >>> >>> 15/03/30 17:34:40 ERROR TaskContextImpl: Error in TaskCompletionListener >>> java.lang.NullPointerException >>> at >>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator.close(KafkaRDD.scala:158) >>> at >>> org.apache.spark.util.NextIterator.closeIfNeeded(NextIterator.scala:63) >>> at >>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) >>> at >>> org.apache.spark.streaming.kafka.KafkaRDD$KafkaRDDIterator$$anonfun$1.apply(KafkaRDD.scala:101) >>> at >>> org.apache.spark.TaskContextImpl$$anon$1.onTaskCompletion(TaskContextImpl.scala:49) >>> at >>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:68) >>> at >>> org.apache.spark.TaskContextImpl$$anonfun$markTaskCompleted$1.apply(TaskContextImpl.scala:66) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:66) >>> at org.apache.spark.scheduler.Task.run(Task.scala:58) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 3.0 in stage 28.0 >>> (TID 94) in 12 ms on localhost (2/4) >>> 15/03/30 17:34:40 INFO TaskSetManager: Finished task 2.0 in stage 28.0 >>> (TID 93) in 13 ms on localhost (3/4) >>> 15/03/30 17:34:40 ERROR Executor: Exception in task 0.0 in stage 28.0 >>> (TID 91) >>> org.apache.spark.util.TaskCompletionListenerException >>> at >>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) >>> at org.apache.spark.scheduler.Task.run(Task.scala:58) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> 15/03/30 17:34:40 WARN TaskSetManager: Lost task 0.0 in stage 28.0 (TID >>> 91, localhost): org.apache.spark.util.TaskCompletionListenerException >>> at >>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) >>> at org.apache.spark.scheduler.Task.run(Task.scala:58) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> 15/03/30 17:34:40 ERROR TaskSetManager: Task 0 in stage 28.0 failed 1 >>> times; aborting job >>> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Removed TaskSet 28.0, whose >>> tasks have all completed, from pool >>> 15/03/30 17:34:40 INFO TaskSchedulerImpl: Cancelling stage 28 >>> 15/03/30 17:34:40 INFO DAGScheduler: Job 28 failed: print at >>> HotFCANextGen.scala:63, took 0,041068 s >>> 15/03/30 17:34:40 INFO JobScheduler: Starting job streaming job >>> 1427729680000 ms.1 from job set of time 1427729680000 ms >>> 15/03/30 17:34:40 ERROR JobScheduler: Error running job streaming job >>> 1427729680000 ms.0 >>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>> 0 in stage 28.0 failed 1 times, most recent failure: Lost task 0.0 in stage >>> 28.0 (TID 91, localhost): >>> org.apache.spark.util.TaskCompletionListenerException >>> at >>> org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:76) >>> at org.apache.spark.scheduler.Task.run(Task.scala:58) >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196) >>> at >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) >>> at >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) >>> at java.lang.Thread.run(Thread.java:745) >>> >>> Driver stacktrace: >>> at org.apache.spark.scheduler.DAGScheduler.org >>> $apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1214) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1203) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1202) >>> at >>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) >>> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) >>> at >>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1202) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >>> at >>> org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:696) >>> at scala.Option.foreach(Option.scala:236) >>> at >>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:696) >>> at >>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1420) >>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) >>> at akka.actor.ActorCell.invoke(ActorCell.scala:456) >>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) >>> at akka.dispatch.Mailbox.run(Mailbox.scala:219) >>> at >>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) >>> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) >>> at >>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) >>> at >>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) >>> >>> The same code snippet works well if the topic is with a single >>> partition. The issue happens when the topic is using multiple partitions. >>> Am I doing something wrong ? Can you help me find the right way to write >>> this with kafka topic with multiple partitions. >>> >>> Regards, >>> Nicolas PHUNG >>> >> >> >