I replaced the writeToKafka statements with a rdd.count() and sure enough, I have a stable app with total delay well within the batch window (20 seconds). Here's the total delay lines from the driver log: 15/02/13 06:14:26 INFO JobScheduler: Total delay: 6.521 s for time 1423808060000 ms (execution: 6.404 s) 15/02/13 06:15:22 INFO JobScheduler: Total delay: 42.396 s for time 1423808080000 ms (execution: 42.338 s) 15/02/13 06:16:21 INFO JobScheduler: Total delay: 81.879 s for time 1423808100000 ms (execution: 59.483 s) 15/02/13 06:16:40 INFO JobScheduler: Total delay: 80.242 s for time 1423808120000 ms (execution: 18.363 s) 15/02/13 06:16:50 INFO JobScheduler: Total delay: 70.342 s for time 1423808140000 ms (execution: 10.100 s) 15/02/13 06:16:56 INFO JobScheduler: Total delay: 56.551 s for time 1423808160000 ms (execution: 6.209 s) 15/02/13 06:17:06 INFO JobScheduler: Total delay: 46.405 s for time 1423808180000 ms (execution: 9.854 s) 15/02/13 06:17:13 INFO JobScheduler: Total delay: 33.443 s for time 1423808200000 ms (execution: 7.038 s) 15/02/13 06:17:21 INFO JobScheduler: Total delay: 21.483 s for time 1423808220000 ms (execution: 8.039 s) 15/02/13 06:17:26 INFO JobScheduler: Total delay: 6.697 s for time 1423808240000 ms (execution: 5.213 s) 15/02/13 06:17:45 INFO JobScheduler: Total delay: 5.814 s for time 1423808260000 ms (execution: 5.767 s) 15/02/13 06:18:06 INFO JobScheduler: Total delay: 6.905 s for time 1423808280000 ms (execution: 6.858 s) 15/02/13 06:18:28 INFO JobScheduler: Total delay: 8.604 s for time 1423808300000 ms (execution: 8.556 s) 15/02/13 06:18:45 INFO JobScheduler: Total delay: 5.631 s for time 1423808320000 ms (execution: 5.583 s) 15/02/13 06:19:04 INFO JobScheduler: Total delay: 4.838 s for time 1423808340000 ms (execution: 4.791 s) 15/02/13 06:19:24 INFO JobScheduler: Total delay: 4.467 s for time 1423808360000 ms (execution: 4.422 s) 15/02/13 06:19:45 INFO JobScheduler: Total delay: 5.779 s for time 1423808380000 ms (execution: 5.733 s) 15/02/13 06:20:04 INFO JobScheduler: Total delay: 4.747 s for time 1423808400000 ms (execution: 4.701 s) 15/02/13 06:20:24 INFO JobScheduler: Total delay: 4.829 s for time 1423808420000 ms (execution: 4.782 s) 15/02/13 06:20:44 INFO JobScheduler: Total delay: 4.724 s for time 1423808440000 ms (execution: 4.678 s) 15/02/13 06:21:04 INFO JobScheduler: Total delay: 4.110 s for time 1423808460000 ms (execution: 4.064 s) 15/02/13 06:21:24 INFO JobScheduler: Total delay: 4.562 s for time 1423808480000 ms (execution: 4.514 s) 15/02/13 06:21:43 INFO JobScheduler: Total delay: 3.999 s for time 1423808500000 ms (execution: 3.954 s) 15/02/13 06:22:04 INFO JobScheduler: Total delay: 4.353 s for time 1423808520000 ms (execution: 4.309 s) 15/02/13 06:22:24 INFO JobScheduler: Total delay: 4.712 s for time 1423808540000 ms (execution: 4.667 s) 15/02/13 06:22:44 INFO JobScheduler: Total delay: 4.726 s for time 1423808560000 ms (execution: 4.681 s) 15/02/13 06:23:07 INFO JobScheduler: Total delay: 7.860 s for time 1423808580000 ms (execution: 7.816 s) 15/02/13 06:23:28 INFO JobScheduler: Total delay: 8.426 s for time 1423808600000 ms (execution: 8.383 s) 15/02/13 06:23:43 INFO JobScheduler: Total delay: 3.857 s for time 1423808620000 ms (execution: 3.814 s) 15/02/13 06:24:03 INFO JobScheduler: Total delay: 3.936 s for time 1423808640000 ms (execution: 3.892 s) 15/02/13 06:24:23 INFO JobScheduler: Total delay: 3.810 s for time 1423808660000 ms (execution: 3.767 s) 15/02/13 06:24:43 INFO JobScheduler: Total delay: 3.889 s for time 1423808680000 ms (execution: 3.845 s) 15/02/13 06:25:03 INFO JobScheduler: Total delay: 3.553 s for time 1423808700000 ms (execution: 3.510 s) 15/02/13 06:25:27 INFO JobScheduler: Total delay: 7.031 s for time 1423808720000 ms (execution: 6.989 s) 15/02/13 06:25:43 INFO JobScheduler: Total delay: 3.636 s for time 1423808740000 ms (execution: 3.594 s) 15/02/13 06:26:03 INFO JobScheduler: Total delay: 3.425 s for time 1423808760000 ms (execution: 3.383 s) 15/02/13 06:26:23 INFO JobScheduler: Total delay: 3.939 s for time 1423808780000 ms (execution: 3.897 s) 15/02/13 06:26:43 INFO JobScheduler: Total delay: 3.640 s for time 1423808800000 ms (execution: 3.596 s) 15/02/13 06:27:03 INFO JobScheduler: Total delay: 3.905 s for time 1423808820000 ms (execution: 3.861 s) 15/02/13 06:27:24 INFO JobScheduler: Total delay: 4.068 s for time 1423808840000 ms (execution: 4.026 s)
On Thu, Feb 12, 2015 at 9:54 PM, Tim Smith <secs...@gmail.com> wrote: > TD - I will try count() and report back. Meanwhile, attached is the entire > driver log that includes the error logs about missing blocks. > > Cody - Let me research a bit about how to do connection pooling. Sorry, I > am not really a programmer. I did see the connection pooling advise in the > Spark Streaming Programming guide as an optimization but wasn't sure how to > implement it. But do you think it will have a significant impact on > performance? > > Saisai - I think, ideally, I'd rather not do any dStream partitioning. > Instead have 1 receiver for each kafka partition (so in this case 23 > receivers for 23 kafka partitions) and then have as many or more executors > to handle processing of the dStreams. Right? Trouble is, I tried this > approach and didn't work. Even If I set 23 receivers, and set partition > count to 1 for each dStream (effectively, no stream splitting), my > performance is extremely poor/laggy. Should I modify my code to remove > dStream partitioning altogether and then try setting as many receivers as > kafka partitions? > > > > > > On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao <sai.sai.s...@gmail.com> > wrote: > >> Hi Tim, >> >> I think this code will still introduce shuffle even when you call >> repartition on each input stream. Actually this style of implementation >> will generate more jobs (job per each input stream) than union into one >> stream as called DStream.union(), and union normally has no special >> overhead as I understood. >> >> Also as Cody said, creating Producer per partition could be a potential >> overhead, producer pool or sharing the Producer for one executor might be >> better :). >> >> >> // Process stream from each receiver separately >> // (do not attempt to merge all streams and then re-partition, >> this causes un-necessary and high amount of shuffle in the job) >> for (k <- kInStreams) >> { >> // Re-partition stream from each receiver across all >> compute nodes to spread out processing load and allows per partition >> processing >> // and, set persistence level to spill to disk along >> with serialization >> val kInMsgParts = >> k.repartition(otherConf("dStreamPartitions").toInt). >> >> 2015-02-13 13:27 GMT+08:00 Cody Koeninger <c...@koeninger.org>: >> >>> outdata.foreachRDD( rdd => rdd.foreachPartition(rec => { >>> >>> val writer = new >>> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap) >>> >>> writer.output(rec) >>> >>> }) ) >>> >>> >>> So this is creating a new kafka producer for every new output partition, >>> right? Have you tried pooling the producers? >>> >>> On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith <secs...@gmail.com> wrote: >>> >>>> 1) Yes, if I disable writing out to kafka and replace it with some very >>>> light weight action is rdd.take(1), the app is stable. >>>> >>>> 2) The partitions I spoke of in the previous mail are the number of >>>> partitions I create from each dStream. But yes, since I do processing and >>>> writing out, per partition, each dStream partition ends up getting written >>>> to a kafka partition. Flow is, broadly: >>>> 5 Spark/Kafka Receivers -> Split each dStream into 30 partitions (150 >>>> partitions) -> Apply some transformation logic to each partition -> write >>>> out each partition to kafka (kafka has 23 partitions). Let me increase the >>>> number of partitions on the kafka side and see if that helps. >>>> >>>> Here's what the main block of code looks like (I added persistence >>>> back): >>>> >>>> val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ => >>>> KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString, >>>> Map(otherConf("kafkaConsumerTopic").toString -> 1), >>>> StorageLevel.MEMORY_AND_DISK_SER) } >>>> >>>> if (!configMap.keySet.isEmpty) >>>> { >>>> // Process stream from each receiver separately >>>> // (do not attempt to merge all streams and then re-partition, >>>> this causes un-necessary and high amount of shuffle in the job) >>>> for (k <- kInStreams) >>>> { >>>> // Re-partition stream from each receiver across all >>>> compute nodes to spread out processing load and allows per partition >>>> processing >>>> // and, set persistence level to spill to disk along >>>> with serialization >>>> val kInMsgParts = >>>> k.repartition(otherConf("dStreamPartitions").toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) >>>> >>>> val outdata = >>>> kInMsgParts.map(x=>myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER) >>>> >>>> // Write each transformed partition to Kafka via the >>>> "writer" object >>>> outdata.foreachRDD( rdd => rdd.foreachPartition(rec => >>>> { >>>> >>>> val writer = new >>>> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap) >>>> >>>> writer.output(rec) >>>> >>>> }) ) >>>> } >>>> } >>>> >>>> >>>> Here's the life-cycle of a lost block: >>>> >>>> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in >>>> memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB) >>>> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in >>>> memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB) >>>> 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on >>>> disk on nodedn1-22-acme.com:42084 (size: 164.7 KB) >>>> 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on >>>> disk on nodedn1-23-acme.com:34526 (size: 164.7 KB) >>>> 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0 >>>> (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not >>>> compute split, block input-4-1423758372200 not found >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0 >>>> (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 1] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0 >>>> (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 2] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0 >>>> (TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 3] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0 >>>> (TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 4] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0 >>>> (TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 5] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0 >>>> (TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 6] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0 >>>> (TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 7] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0 >>>> (TID 1042600) on executor nodedn1-17-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 8] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.9 in stage 16291.0 >>>> (TID 1042606) on executor nodedn1-20-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 9] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.10 in stage 16291.0 >>>> (TID 1042609) on executor nodedn1-19-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 10] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.11 in stage 16291.0 >>>> (TID 1042611) on executor nodedn1-14-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 11] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.12 in stage 16291.0 >>>> (TID 1042615) on executor nodedn1-22-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 12] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.13 in stage 16291.0 >>>> (TID 1042618) on executor nodedn1-19-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 13] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.14 in stage 16291.0 >>>> (TID 1042622) on executor nodedn1-21-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 14] >>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.15 in stage 16291.0 >>>> (TID 1042625) on executor nodedn1-16-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 15] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.16 in stage 16291.0 >>>> (TID 1042628) on executor nodedn1-20-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 16] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.17 in stage 16291.0 >>>> (TID 1042633) on executor nodedn1-16-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 17] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.18 in stage 16291.0 >>>> (TID 1042636) on executor nodedn1-17-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 18] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.19 in stage 16291.0 >>>> (TID 1042640) on executor nodedn1-17-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 19] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.20 in stage 16291.0 >>>> (TID 1042642) on executor nodedn1-16-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 20] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.21 in stage 16291.0 >>>> (TID 1042645) on executor nodedn1-16-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 21] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.22 in stage 16291.0 >>>> (TID 1042648) on executor nodedn1-14-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 22] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.23 in stage 16291.0 >>>> (TID 1042654) on executor nodedn1-23-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 23] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.24 in stage 16291.0 >>>> (TID 1042660) on executor nodedn1-22-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 24] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.25 in stage 16291.0 >>>> (TID 1042662) on executor nodedn1-21-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 25] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.26 in stage 16291.0 >>>> (TID 1042663) on executor nodedn1-14-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 26] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.27 in stage 16291.0 >>>> (TID 1042665) on executor nodedn1-17-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 27] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.28 in stage 16291.0 >>>> (TID 1042667) on executor nodedn1-19-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 28] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.29 in stage 16291.0 >>>> (TID 1042671) on executor nodedn1-14-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 29] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.30 in stage 16291.0 >>>> (TID 1042672) on executor nodedn1-12-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 30] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.31 in stage 16291.0 >>>> (TID 1042674) on executor nodedn1-14-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 31] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.32 in stage 16291.0 >>>> (TID 1042677) on executor nodedn1-19-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 32] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.33 in stage 16291.0 >>>> (TID 1042680) on executor nodedn1-22-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 33] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.34 in stage 16291.0 >>>> (TID 1042681) on executor nodedn1-20-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 34] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.35 in stage 16291.0 >>>> (TID 1042682) on executor nodedn1-17-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 35] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.36 in stage 16291.0 >>>> (TID 1042687) on executor nodedn1-20-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 36] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.37 in stage 16291.0 >>>> (TID 1042689) on executor nodedn1-17-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 37] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.38 in stage 16291.0 >>>> (TID 1042690) on executor nodedn1-17-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 38] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.39 in stage 16291.0 >>>> (TID 1042693) on executor nodedn1-22-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 39] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.40 in stage 16291.0 >>>> (TID 1042697) on executor nodedn1-22-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 40] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.41 in stage 16291.0 >>>> (TID 1042700) on executor nodedn1-13-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 41] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.42 in stage 16291.0 >>>> (TID 1042706) on executor nodedn1-15-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 42] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.43 in stage 16291.0 >>>> (TID 1042710) on executor nodedn1-14-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 43] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.44 in stage 16291.0 >>>> (TID 1042713) on executor nodedn1-17-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 44] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.45 in stage 16291.0 >>>> (TID 1042716) on executor nodedn1-19-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 45] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.46 in stage 16291.0 >>>> (TID 1042718) on executor nodedn1-15-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 46] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.47 in stage 16291.0 >>>> (TID 1042720) on executor nodedn1-12-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 47] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.48 in stage 16291.0 >>>> (TID 1042724) on executor nodedn1-12-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 48] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.49 in stage 16291.0 >>>> (TID 1042729) on executor nodedn1-16-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 49] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.50 in stage 16291.0 >>>> (TID 1042730) on executor nodedn1-21-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 50] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.51 in stage 16291.0 >>>> (TID 1042733) on executor nodedn1-21-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 51] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.52 in stage 16291.0 >>>> (TID 1042736) on executor nodedn1-19-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 52] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.53 in stage 16291.0 >>>> (TID 1042740) on executor nodedn1-20-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 53] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.54 in stage 16291.0 >>>> (TID 1042743) on executor nodedn1-22-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 54] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.55 in stage 16291.0 >>>> (TID 1042745) on executor nodedn1-18-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 55] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.56 in stage 16291.0 >>>> (TID 1042754) on executor nodedn1-17-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 56] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.57 in stage 16291.0 >>>> (TID 1042758) on executor nodedn1-17-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 57] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.58 in stage 16291.0 >>>> (TID 1042762) on executor nodedn1-12-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 58] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.59 in stage 16291.0 >>>> (TID 1042766) on executor nodedn1-23-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 59] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.60 in stage 16291.0 >>>> (TID 1042774) on executor nodedn1-20-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 60] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.61 in stage 16291.0 >>>> (TID 1042779) on executor nodedn1-13-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 61] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.62 in stage 16291.0 >>>> (TID 1042789) on executor nodedn1-20-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 62] >>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.63 in stage 16291.0 >>>> (TID 1042793) on executor nodedn1-15-acme.com: java.lang.Exception >>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate >>>> 63] >>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task >>>> 54 in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63 >>>> in stage 16291.0 (TID 1042793, nodedn1-15-acme.com): >>>> java.lang.Exception: Could not compute split, block input-4-1423758372200 >>>> not found >>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted >>>> due to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent >>>> failure: Lost task 54.63 in stage 16291.0 (TID 1042793, >>>> nodedn1-15-acme.com): java.lang.Exception: Could not compute split, >>>> block input-4-1423758372200 not found >>>> >>>> >>>> Thanks for looking into it. >>>> >>>> >>>> >>>> >>>> >>>> On Thu, Feb 12, 2015 at 8:10 PM, Tathagata Das <t...@databricks.com> >>>> wrote: >>>> >>>>> Hey Tim, >>>>> >>>>> Let me get the key points. >>>>> 1. If you are not writing back to Kafka, the delay is stable? That is, >>>>> instead of "foreachRDD { // write to kafka }" if you do "dstream.count", >>>>> then the delay is stable. Right? >>>>> 2. If so, then Kafka is the bottleneck. Is the number of partitions, >>>>> that you spoke of the in the second mail, that determines the parallelism >>>>> in writes? Is it stable with 30 partitions? >>>>> >>>>> Regarding the block exception, could you give me a trace of info level >>>>> logging that leads to this error? Basically I want trace the lifecycle of >>>>> the block. >>>>> >>>>> TD >>>>> >>>>> >>>>> >>>>> On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith <secs...@gmail.com> wrote: >>>>> >>>>>> Hi Gerard, >>>>>> >>>>>> Great write-up and really good guidance in there. >>>>>> >>>>>> I have to be honest, I don't know why but setting # of partitions for >>>>>> each dStream to a low number (5-10) just causes the app to choke/crash. >>>>>> Setting it to 20 gets the app going but with not so great delays. Bump it >>>>>> up to 30 and I start winning the war where processing time is >>>>>> consistently >>>>>> below batch time window (20 seconds) except for a batch every few batches >>>>>> where the compute time spikes 10x the usual. >>>>>> >>>>>> Following your guide, I took out some "logInfo" statements I had in >>>>>> the app but didn't seem to make much difference :( >>>>>> >>>>>> With a higher time window (20 seconds), I got the app to run stably >>>>>> for a few hours but then ran into the dreaded "java.lang.Exception: Could >>>>>> not compute split, block input-0-1423761240800 not found". Wonder if I >>>>>> need >>>>>> to add RDD persistence back? >>>>>> >>>>>> Also, I am reaching out to Virdata with some ProServ inquiries. >>>>>> >>>>>> Thanks >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas <gerard.m...@gmail.com> >>>>>> wrote: >>>>>> >>>>>>> Hi Tim, >>>>>>> >>>>>>> From this: " There are 5 kafka receivers and each incoming stream >>>>>>> is split into 40 partitions" I suspect that you're creating too >>>>>>> many tasks for Spark to process on time. >>>>>>> Could you try some of the 'knobs' I describe here to see if that >>>>>>> would help? >>>>>>> >>>>>>> http://www.virdata.com/tuning-spark/ >>>>>>> >>>>>>> -kr, Gerard. >>>>>>> >>>>>>> On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith <secs...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Just read the thread "Are these numbers abnormal for spark >>>>>>>> streaming?" and I think I am seeing similar results - that is - >>>>>>>> increasing >>>>>>>> the window seems to be the trick here. I will have to monitor for a few >>>>>>>> hours/days before I can conclude (there are so many knobs/dials). >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith <secs...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> On Spark 1.2 (have been seeing this behaviour since 1.0), I have a >>>>>>>>> streaming app that consumes data from Kafka and writes it back to >>>>>>>>> Kafka >>>>>>>>> (different topic). My big problem has been Total Delay. While >>>>>>>>> execution >>>>>>>>> time is usually <window size (in seconds), the total delay ranges >>>>>>>>> from a >>>>>>>>> minutes to hours(s) (keeps going up). >>>>>>>>> >>>>>>>>> For a little while, I thought I had solved the issue by bumping up >>>>>>>>> the driver memory. Then I expanded my Kafka cluster to add more nodes >>>>>>>>> and >>>>>>>>> the issue came up again. I tried a few things to smoke out the issue >>>>>>>>> and >>>>>>>>> something tells me the driver is the bottleneck again: >>>>>>>>> >>>>>>>>> 1) From my app, I took out the entire write-out-to-kafka piece. >>>>>>>>> Sure enough, execution, scheduling delay and hence total delay fell >>>>>>>>> to sub >>>>>>>>> second. This assured me that whatever processing I do before writing >>>>>>>>> back >>>>>>>>> to kafka isn't the bottleneck. >>>>>>>>> >>>>>>>>> 2) In my app, I had RDD persistence set at different points but my >>>>>>>>> code wasn't really re-using any RDDs so I took out all explicit >>>>>>>>> persist() >>>>>>>>> statements. And added, "spar...unpersist" to "true" in the context. >>>>>>>>> After >>>>>>>>> this, it doesn't seem to matter how much memory I give my executor, >>>>>>>>> the >>>>>>>>> total delay seems to be in the same range. I tried per executor >>>>>>>>> memory from >>>>>>>>> 2G to 12G with no change in total delay so executors aren't memory >>>>>>>>> starved. >>>>>>>>> Also, in the SparkUI, under the Executors tab, all executors show >>>>>>>>> 0/1060MB >>>>>>>>> used when per executor memory is set to 2GB, for example. >>>>>>>>> >>>>>>>>> 3) Input rate in the kafka consumer restricts spikes in incoming >>>>>>>>> data. >>>>>>>>> >>>>>>>>> 4) Tried FIFO and FAIR but didn't make any difference. >>>>>>>>> >>>>>>>>> 5) Adding executors beyond a certain points seems useless (I guess >>>>>>>>> excess ones just sit idle). >>>>>>>>> >>>>>>>>> At any given point in time, the SparkUI shows only one batch >>>>>>>>> pending processing. So with just one batch pending processing, why >>>>>>>>> would >>>>>>>>> the scheduling delay run into minutes/hours if execution time is >>>>>>>>> within the >>>>>>>>> batch window duration? There aren't any failed stages or jobs. >>>>>>>>> >>>>>>>>> Right now, I have 100 executors ( i have tried setting executors >>>>>>>>> from 50-150), each with 2GB and 4 cores and the driver running with >>>>>>>>> 16GB. >>>>>>>>> There are 5 kafka receivers and each incoming stream is split into 40 >>>>>>>>> partitions. Per receiver, input rate is restricted to 20000 messages >>>>>>>>> per >>>>>>>>> second. >>>>>>>>> >>>>>>>>> Can anyone help me with clues or areas to look into, for >>>>>>>>> troubleshooting the issue? >>>>>>>>> >>>>>>>>> One nugget I found buried in the code says: >>>>>>>>> "The scheduler delay includes the network delay to send the task >>>>>>>>> to the worker machine and to send back the result (but not the time to >>>>>>>>> fetch the task result, if it needed to be fetched from the block >>>>>>>>> manager on >>>>>>>>> the worker)." >>>>>>>>> >>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala >>>>>>>>> >>>>>>>>> Could this be an issue with the driver being a bottlneck? All the >>>>>>>>> executors posting their logs/stats to the driver? >>>>>>>>> >>>>>>>>> Thanks, >>>>>>>>> >>>>>>>>> Tim >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >