I replaced the writeToKafka statements with a rdd.count() and sure enough,
I have a stable app with total delay well within the batch window (20
seconds). Here's the total delay lines from the driver log:
15/02/13 06:14:26 INFO JobScheduler: Total delay: 6.521 s for time
1423808060000 ms (execution: 6.404 s)
15/02/13 06:15:22 INFO JobScheduler: Total delay: 42.396 s for time
1423808080000 ms (execution: 42.338 s)
15/02/13 06:16:21 INFO JobScheduler: Total delay: 81.879 s for time
1423808100000 ms (execution: 59.483 s)
15/02/13 06:16:40 INFO JobScheduler: Total delay: 80.242 s for time
1423808120000 ms (execution: 18.363 s)
15/02/13 06:16:50 INFO JobScheduler: Total delay: 70.342 s for time
1423808140000 ms (execution: 10.100 s)
15/02/13 06:16:56 INFO JobScheduler: Total delay: 56.551 s for time
1423808160000 ms (execution: 6.209 s)
15/02/13 06:17:06 INFO JobScheduler: Total delay: 46.405 s for time
1423808180000 ms (execution: 9.854 s)
15/02/13 06:17:13 INFO JobScheduler: Total delay: 33.443 s for time
1423808200000 ms (execution: 7.038 s)
15/02/13 06:17:21 INFO JobScheduler: Total delay: 21.483 s for time
1423808220000 ms (execution: 8.039 s)
15/02/13 06:17:26 INFO JobScheduler: Total delay: 6.697 s for time
1423808240000 ms (execution: 5.213 s)
15/02/13 06:17:45 INFO JobScheduler: Total delay: 5.814 s for time
1423808260000 ms (execution: 5.767 s)
15/02/13 06:18:06 INFO JobScheduler: Total delay: 6.905 s for time
1423808280000 ms (execution: 6.858 s)
15/02/13 06:18:28 INFO JobScheduler: Total delay: 8.604 s for time
1423808300000 ms (execution: 8.556 s)
15/02/13 06:18:45 INFO JobScheduler: Total delay: 5.631 s for time
1423808320000 ms (execution: 5.583 s)
15/02/13 06:19:04 INFO JobScheduler: Total delay: 4.838 s for time
1423808340000 ms (execution: 4.791 s)
15/02/13 06:19:24 INFO JobScheduler: Total delay: 4.467 s for time
1423808360000 ms (execution: 4.422 s)
15/02/13 06:19:45 INFO JobScheduler: Total delay: 5.779 s for time
1423808380000 ms (execution: 5.733 s)
15/02/13 06:20:04 INFO JobScheduler: Total delay: 4.747 s for time
1423808400000 ms (execution: 4.701 s)
15/02/13 06:20:24 INFO JobScheduler: Total delay: 4.829 s for time
1423808420000 ms (execution: 4.782 s)
15/02/13 06:20:44 INFO JobScheduler: Total delay: 4.724 s for time
1423808440000 ms (execution: 4.678 s)
15/02/13 06:21:04 INFO JobScheduler: Total delay: 4.110 s for time
1423808460000 ms (execution: 4.064 s)
15/02/13 06:21:24 INFO JobScheduler: Total delay: 4.562 s for time
1423808480000 ms (execution: 4.514 s)
15/02/13 06:21:43 INFO JobScheduler: Total delay: 3.999 s for time
1423808500000 ms (execution: 3.954 s)
15/02/13 06:22:04 INFO JobScheduler: Total delay: 4.353 s for time
1423808520000 ms (execution: 4.309 s)
15/02/13 06:22:24 INFO JobScheduler: Total delay: 4.712 s for time
1423808540000 ms (execution: 4.667 s)
15/02/13 06:22:44 INFO JobScheduler: Total delay: 4.726 s for time
1423808560000 ms (execution: 4.681 s)
15/02/13 06:23:07 INFO JobScheduler: Total delay: 7.860 s for time
1423808580000 ms (execution: 7.816 s)
15/02/13 06:23:28 INFO JobScheduler: Total delay: 8.426 s for time
1423808600000 ms (execution: 8.383 s)
15/02/13 06:23:43 INFO JobScheduler: Total delay: 3.857 s for time
1423808620000 ms (execution: 3.814 s)
15/02/13 06:24:03 INFO JobScheduler: Total delay: 3.936 s for time
1423808640000 ms (execution: 3.892 s)
15/02/13 06:24:23 INFO JobScheduler: Total delay: 3.810 s for time
1423808660000 ms (execution: 3.767 s)
15/02/13 06:24:43 INFO JobScheduler: Total delay: 3.889 s for time
1423808680000 ms (execution: 3.845 s)
15/02/13 06:25:03 INFO JobScheduler: Total delay: 3.553 s for time
1423808700000 ms (execution: 3.510 s)
15/02/13 06:25:27 INFO JobScheduler: Total delay: 7.031 s for time
1423808720000 ms (execution: 6.989 s)
15/02/13 06:25:43 INFO JobScheduler: Total delay: 3.636 s for time
1423808740000 ms (execution: 3.594 s)
15/02/13 06:26:03 INFO JobScheduler: Total delay: 3.425 s for time
1423808760000 ms (execution: 3.383 s)
15/02/13 06:26:23 INFO JobScheduler: Total delay: 3.939 s for time
1423808780000 ms (execution: 3.897 s)
15/02/13 06:26:43 INFO JobScheduler: Total delay: 3.640 s for time
1423808800000 ms (execution: 3.596 s)
15/02/13 06:27:03 INFO JobScheduler: Total delay: 3.905 s for time
1423808820000 ms (execution: 3.861 s)
15/02/13 06:27:24 INFO JobScheduler: Total delay: 4.068 s for time
1423808840000 ms (execution: 4.026 s)




On Thu, Feb 12, 2015 at 9:54 PM, Tim Smith <secs...@gmail.com> wrote:

> TD - I will try count() and report back. Meanwhile, attached is the entire
> driver log that includes the error logs about missing blocks.
>
> Cody - Let me research a bit about how to do connection pooling. Sorry, I
> am not really a programmer. I did see the connection pooling advise in the
> Spark Streaming Programming guide as an optimization but wasn't sure how to
> implement it. But do you think it will have a significant impact on
> performance?
>
> Saisai - I think, ideally, I'd rather not do any dStream partitioning.
> Instead have 1 receiver for each kafka partition (so in this case 23
> receivers for 23 kafka partitions) and then have as many or more executors
> to handle processing of the dStreams. Right? Trouble is, I tried this
> approach and didn't work. Even If I set 23 receivers, and set partition
> count to 1 for each dStream (effectively, no stream splitting), my
> performance is extremely poor/laggy. Should I modify my code to remove
> dStream partitioning altogether and then try setting as many receivers as
> kafka partitions?
>
>
>
>
>
> On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> Hi Tim,
>>
>> I think this code will still introduce shuffle even when you call
>> repartition on each input stream. Actually this style of implementation
>> will generate more jobs (job per each input stream) than union into one
>> stream as called DStream.union(), and union normally has no special
>> overhead as I understood.
>>
>> Also as Cody said, creating Producer per partition could be a potential
>> overhead, producer pool or sharing the Producer for one executor might be
>> better :).
>>
>>
>>          // Process stream from each receiver separately
>>          // (do not attempt to merge all streams and then re-partition,
>> this causes un-necessary and high amount of shuffle in the job)
>>          for (k <- kInStreams)
>>                 {
>>                  // Re-partition stream from each receiver across all
>> compute nodes to spread out processing load and allows per partition
>> processing
>>                  // and, set persistence level to spill to disk along
>> with serialization
>>                  val kInMsgParts =
>> k.repartition(otherConf("dStreamPartitions").toInt).
>>
>> 2015-02-13 13:27 GMT+08:00 Cody Koeninger <c...@koeninger.org>:
>>
>>> outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {
>>>
>>>  val writer = new
>>> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>>>
>>>  writer.output(rec)
>>>
>>> }) )
>>>
>>>
>>> So this is creating a new kafka producer for every new output partition,
>>> right?  Have you tried pooling the producers?
>>>
>>> On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith <secs...@gmail.com> wrote:
>>>
>>>> 1) Yes, if I disable writing out to kafka and replace it with some very
>>>> light weight action is rdd.take(1), the app is stable.
>>>>
>>>> 2) The partitions I spoke of in the previous mail are the number of
>>>> partitions I create from each dStream. But yes, since I do processing and
>>>> writing out, per partition, each dStream partition ends up getting written
>>>> to a kafka partition. Flow is, broadly:
>>>> 5 Spark/Kafka Receivers -> Split each dStream into 30 partitions (150
>>>> partitions) -> Apply some transformation logic to each partition -> write
>>>> out each partition to kafka (kafka has 23 partitions). Let me increase the
>>>> number of partitions on the kafka side and see if that helps.
>>>>
>>>> Here's what the main block of code looks like (I added persistence
>>>> back):
>>>>
>>>> val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ =>
>>>> KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString,
>>>> Map(otherConf("kafkaConsumerTopic").toString -> 1),
>>>> StorageLevel.MEMORY_AND_DISK_SER) }
>>>>
>>>> if (!configMap.keySet.isEmpty)
>>>>         {
>>>>          // Process stream from each receiver separately
>>>>          // (do not attempt to merge all streams and then re-partition,
>>>> this causes un-necessary and high amount of shuffle in the job)
>>>>          for (k <- kInStreams)
>>>>                 {
>>>>                  // Re-partition stream from each receiver across all
>>>> compute nodes to spread out processing load and allows per partition
>>>> processing
>>>>                  // and, set persistence level to spill to disk along
>>>> with serialization
>>>>                  val kInMsgParts =
>>>> k.repartition(otherConf("dStreamPartitions").toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>
>>>>                  val outdata =
>>>> kInMsgParts.map(x=>myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>
>>>>                  // Write each transformed partition to Kafka via the
>>>> "writer" object
>>>>                  outdata.foreachRDD( rdd => rdd.foreachPartition(rec =>
>>>> {
>>>>
>>>>  val writer = new
>>>> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>>>>
>>>>  writer.output(rec)
>>>>
>>>> }) )
>>>>                 }
>>>>         }
>>>>
>>>>
>>>> Here's the life-cycle of a lost block:
>>>>
>>>> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
>>>> memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
>>>> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
>>>> memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
>>>> 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
>>>> disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
>>>> 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
>>>> disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
>>>> 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
>>>> (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not
>>>> compute split, block input-4-1423758372200 not found
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
>>>> (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 1]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
>>>> (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 2]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0
>>>> (TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 3]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0
>>>> (TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 4]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0
>>>> (TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 5]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0
>>>> (TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 6]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0
>>>> (TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 7]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0
>>>> (TID 1042600) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 8]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.9 in stage 16291.0
>>>> (TID 1042606) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 9]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.10 in stage 16291.0
>>>> (TID 1042609) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 10]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.11 in stage 16291.0
>>>> (TID 1042611) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 11]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.12 in stage 16291.0
>>>> (TID 1042615) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 12]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.13 in stage 16291.0
>>>> (TID 1042618) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 13]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.14 in stage 16291.0
>>>> (TID 1042622) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 14]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.15 in stage 16291.0
>>>> (TID 1042625) on executor nodedn1-16-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 15]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.16 in stage 16291.0
>>>> (TID 1042628) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 16]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.17 in stage 16291.0
>>>> (TID 1042633) on executor nodedn1-16-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 17]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.18 in stage 16291.0
>>>> (TID 1042636) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 18]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.19 in stage 16291.0
>>>> (TID 1042640) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 19]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.20 in stage 16291.0
>>>> (TID 1042642) on executor nodedn1-16-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 20]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.21 in stage 16291.0
>>>> (TID 1042645) on executor nodedn1-16-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 21]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.22 in stage 16291.0
>>>> (TID 1042648) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 22]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.23 in stage 16291.0
>>>> (TID 1042654) on executor nodedn1-23-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 23]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.24 in stage 16291.0
>>>> (TID 1042660) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 24]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.25 in stage 16291.0
>>>> (TID 1042662) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 25]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.26 in stage 16291.0
>>>> (TID 1042663) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 26]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.27 in stage 16291.0
>>>> (TID 1042665) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 27]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.28 in stage 16291.0
>>>> (TID 1042667) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 28]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.29 in stage 16291.0
>>>> (TID 1042671) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 29]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.30 in stage 16291.0
>>>> (TID 1042672) on executor nodedn1-12-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 30]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.31 in stage 16291.0
>>>> (TID 1042674) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 31]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.32 in stage 16291.0
>>>> (TID 1042677) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 32]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.33 in stage 16291.0
>>>> (TID 1042680) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 33]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.34 in stage 16291.0
>>>> (TID 1042681) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 34]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.35 in stage 16291.0
>>>> (TID 1042682) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 35]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.36 in stage 16291.0
>>>> (TID 1042687) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 36]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.37 in stage 16291.0
>>>> (TID 1042689) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 37]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.38 in stage 16291.0
>>>> (TID 1042690) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 38]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.39 in stage 16291.0
>>>> (TID 1042693) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 39]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.40 in stage 16291.0
>>>> (TID 1042697) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 40]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.41 in stage 16291.0
>>>> (TID 1042700) on executor nodedn1-13-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 41]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.42 in stage 16291.0
>>>> (TID 1042706) on executor nodedn1-15-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 42]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.43 in stage 16291.0
>>>> (TID 1042710) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 43]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.44 in stage 16291.0
>>>> (TID 1042713) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 44]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.45 in stage 16291.0
>>>> (TID 1042716) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 45]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.46 in stage 16291.0
>>>> (TID 1042718) on executor nodedn1-15-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 46]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.47 in stage 16291.0
>>>> (TID 1042720) on executor nodedn1-12-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 47]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.48 in stage 16291.0
>>>> (TID 1042724) on executor nodedn1-12-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 48]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.49 in stage 16291.0
>>>> (TID 1042729) on executor nodedn1-16-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 49]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.50 in stage 16291.0
>>>> (TID 1042730) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 50]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.51 in stage 16291.0
>>>> (TID 1042733) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 51]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.52 in stage 16291.0
>>>> (TID 1042736) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 52]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.53 in stage 16291.0
>>>> (TID 1042740) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 53]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.54 in stage 16291.0
>>>> (TID 1042743) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 54]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.55 in stage 16291.0
>>>> (TID 1042745) on executor nodedn1-18-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 55]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.56 in stage 16291.0
>>>> (TID 1042754) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 56]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.57 in stage 16291.0
>>>> (TID 1042758) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 57]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.58 in stage 16291.0
>>>> (TID 1042762) on executor nodedn1-12-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 58]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.59 in stage 16291.0
>>>> (TID 1042766) on executor nodedn1-23-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 59]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.60 in stage 16291.0
>>>> (TID 1042774) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 60]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.61 in stage 16291.0
>>>> (TID 1042779) on executor nodedn1-13-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 61]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.62 in stage 16291.0
>>>> (TID 1042789) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 62]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.63 in stage 16291.0
>>>> (TID 1042793) on executor nodedn1-15-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 63]
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 54 in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63
>>>> in stage 16291.0 (TID 1042793, nodedn1-15-acme.com):
>>>> java.lang.Exception: Could not compute split, block input-4-1423758372200
>>>> not found
>>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>>> due to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent
>>>> failure: Lost task 54.63 in stage 16291.0 (TID 1042793,
>>>> nodedn1-15-acme.com): java.lang.Exception: Could not compute split,
>>>> block input-4-1423758372200 not found
>>>>
>>>>
>>>> Thanks for looking into it.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Feb 12, 2015 at 8:10 PM, Tathagata Das <t...@databricks.com>
>>>> wrote:
>>>>
>>>>> Hey Tim,
>>>>>
>>>>> Let me get the key points.
>>>>> 1. If you are not writing back to Kafka, the delay is stable? That is,
>>>>> instead of "foreachRDD { // write to kafka }"  if you do "dstream.count",
>>>>> then the delay is stable. Right?
>>>>> 2. If so, then Kafka is the bottleneck. Is the number of partitions,
>>>>> that you spoke of the in the second mail, that determines the parallelism
>>>>> in writes? Is it stable with 30 partitions?
>>>>>
>>>>> Regarding the block exception, could you give me a trace of info level
>>>>> logging that leads to this error? Basically I want trace the lifecycle of
>>>>> the block.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>>
>>>>>> Hi Gerard,
>>>>>>
>>>>>> Great write-up and really good guidance in there.
>>>>>>
>>>>>> I have to be honest, I don't know why but setting # of partitions for
>>>>>> each dStream to a low number (5-10) just causes the app to choke/crash.
>>>>>> Setting it to 20 gets the app going but with not so great delays. Bump it
>>>>>> up to 30 and I start winning the war where processing time is 
>>>>>> consistently
>>>>>> below batch time window (20 seconds) except for a batch every few batches
>>>>>> where the compute time spikes 10x the usual.
>>>>>>
>>>>>> Following your guide, I took out some "logInfo" statements I had in
>>>>>> the app but didn't seem to make much difference :(
>>>>>>
>>>>>> With a higher time window (20 seconds), I got the app to run stably
>>>>>> for a few hours but then ran into the dreaded "java.lang.Exception: Could
>>>>>> not compute split, block input-0-1423761240800 not found". Wonder if I 
>>>>>> need
>>>>>> to add RDD persistence back?
>>>>>>
>>>>>> Also, I am reaching out to Virdata with some ProServ inquiries.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas <gerard.m...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Tim,
>>>>>>>
>>>>>>> From this: " There are 5 kafka receivers and each incoming stream
>>>>>>> is split into 40 partitions"  I suspect that you're creating too
>>>>>>> many tasks for Spark to process on time.
>>>>>>> Could you try some of the 'knobs' I describe here to see if that
>>>>>>> would help?
>>>>>>>
>>>>>>> http://www.virdata.com/tuning-spark/
>>>>>>>
>>>>>>> -kr, Gerard.
>>>>>>>
>>>>>>> On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith <secs...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Just read the thread "Are these numbers abnormal for spark
>>>>>>>> streaming?" and I think I am seeing similar results - that is - 
>>>>>>>> increasing
>>>>>>>> the window seems to be the trick here. I will have to monitor for a few
>>>>>>>> hours/days before I can conclude (there are so many knobs/dials).
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith <secs...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
>>>>>>>>> streaming app that consumes data from Kafka and writes it back to 
>>>>>>>>> Kafka
>>>>>>>>> (different topic). My big problem has been Total Delay. While 
>>>>>>>>> execution
>>>>>>>>> time is usually <window size (in seconds), the total delay ranges 
>>>>>>>>> from a
>>>>>>>>> minutes to hours(s) (keeps going up).
>>>>>>>>>
>>>>>>>>> For a little while, I thought I had solved the issue by bumping up
>>>>>>>>> the driver memory. Then I expanded my Kafka cluster to add more nodes 
>>>>>>>>> and
>>>>>>>>> the issue came up again. I tried a few things to smoke out the issue 
>>>>>>>>> and
>>>>>>>>> something tells me the driver is the bottleneck again:
>>>>>>>>>
>>>>>>>>> 1) From my app, I took out the entire write-out-to-kafka piece.
>>>>>>>>> Sure enough, execution, scheduling delay and hence total delay fell 
>>>>>>>>> to sub
>>>>>>>>> second. This assured me that whatever processing I do before writing 
>>>>>>>>> back
>>>>>>>>> to kafka isn't the bottleneck.
>>>>>>>>>
>>>>>>>>> 2) In my app, I had RDD persistence set at different points but my
>>>>>>>>> code wasn't really re-using any RDDs so I took out all explicit 
>>>>>>>>> persist()
>>>>>>>>> statements. And added, "spar...unpersist" to "true" in the context. 
>>>>>>>>> After
>>>>>>>>> this, it doesn't seem to matter how much memory I give my executor, 
>>>>>>>>> the
>>>>>>>>> total delay seems to be in the same range. I tried per executor 
>>>>>>>>> memory from
>>>>>>>>> 2G to 12G with no change in total delay so executors aren't memory 
>>>>>>>>> starved.
>>>>>>>>> Also, in the SparkUI, under the Executors tab, all executors show 
>>>>>>>>> 0/1060MB
>>>>>>>>> used when per executor memory is set to 2GB, for example.
>>>>>>>>>
>>>>>>>>> 3) Input rate in the kafka consumer restricts spikes in incoming
>>>>>>>>> data.
>>>>>>>>>
>>>>>>>>> 4) Tried FIFO and FAIR but didn't make any difference.
>>>>>>>>>
>>>>>>>>> 5) Adding executors beyond a certain points seems useless (I guess
>>>>>>>>> excess ones just sit idle).
>>>>>>>>>
>>>>>>>>> At any given point in time, the SparkUI shows only one batch
>>>>>>>>> pending processing. So with just one batch pending processing, why 
>>>>>>>>> would
>>>>>>>>> the scheduling delay run into minutes/hours if execution time is 
>>>>>>>>> within the
>>>>>>>>> batch window duration? There aren't any failed stages or jobs.
>>>>>>>>>
>>>>>>>>> Right now, I have 100 executors ( i have tried setting executors
>>>>>>>>> from 50-150), each with 2GB and 4 cores and the driver running with 
>>>>>>>>> 16GB.
>>>>>>>>> There are 5 kafka receivers and each incoming stream is split into 40
>>>>>>>>> partitions. Per receiver, input rate is restricted to 20000 messages 
>>>>>>>>> per
>>>>>>>>> second.
>>>>>>>>>
>>>>>>>>> Can anyone help me with clues or areas to look into, for
>>>>>>>>> troubleshooting the issue?
>>>>>>>>>
>>>>>>>>> One nugget I found buried in the code says:
>>>>>>>>> "The scheduler delay includes the network delay to send the task
>>>>>>>>> to the worker machine and to send back the result (but not the time to
>>>>>>>>> fetch the task result, if it needed to be fetched from the block 
>>>>>>>>> manager on
>>>>>>>>> the worker)."
>>>>>>>>>
>>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
>>>>>>>>>
>>>>>>>>> Could this be an issue with the driver being a bottlneck? All the
>>>>>>>>> executors posting their logs/stats to the driver?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Tim
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to