Hi Tim,

I think maybe you can try this way:

create Receiver per executor and specify thread for each topic large than
1, and the total number of consumer thread will be: total consumer =
(receiver number) * (thread number), and make sure this total consumer is
less than or equal to Kafka partition number. In this case, I think the
parallelism is enough, received blocks are distributed to each executor. So
you don't need to repartition to increase the parallelism.

Besides for Kafka's high-level API, Kafka partitions may not be equally
distributed to all the receivers, so some tasks may process more data than
other tasks. another way you can try DirectKafkaInputDStream in Spark 1.3,
that will be more balanced because each Kafka partition mapping to Spark
partition.


Besides "set partition count to 1 for each dStream" means
dstream.repartition(1) ? If so I think it will still introduce shuffle and
move all the data into one partition.

Thanks
Saisai

2015-02-13 13:54 GMT+08:00 Tim Smith <secs...@gmail.com>:

> TD - I will try count() and report back. Meanwhile, attached is the entire
> driver log that includes the error logs about missing blocks.
>
> Cody - Let me research a bit about how to do connection pooling. Sorry, I
> am not really a programmer. I did see the connection pooling advise in the
> Spark Streaming Programming guide as an optimization but wasn't sure how to
> implement it. But do you think it will have a significant impact on
> performance?
>
> Saisai - I think, ideally, I'd rather not do any dStream partitioning.
> Instead have 1 receiver for each kafka partition (so in this case 23
> receivers for 23 kafka partitions) and then have as many or more executors
> to handle processing of the dStreams. Right? Trouble is, I tried this
> approach and didn't work. Even If I set 23 receivers, and set partition
> count to 1 for each dStream (effectively, no stream splitting), my
> performance is extremely poor/laggy. Should I modify my code to remove
> dStream partitioning altogether and then try setting as many receivers as
> kafka partitions?
>
>
>
>
>
> On Thu, Feb 12, 2015 at 9:45 PM, Saisai Shao <sai.sai.s...@gmail.com>
> wrote:
>
>> Hi Tim,
>>
>> I think this code will still introduce shuffle even when you call
>> repartition on each input stream. Actually this style of implementation
>> will generate more jobs (job per each input stream) than union into one
>> stream as called DStream.union(), and union normally has no special
>> overhead as I understood.
>>
>> Also as Cody said, creating Producer per partition could be a potential
>> overhead, producer pool or sharing the Producer for one executor might be
>> better :).
>>
>>
>>          // Process stream from each receiver separately
>>          // (do not attempt to merge all streams and then re-partition,
>> this causes un-necessary and high amount of shuffle in the job)
>>          for (k <- kInStreams)
>>                 {
>>                  // Re-partition stream from each receiver across all
>> compute nodes to spread out processing load and allows per partition
>> processing
>>                  // and, set persistence level to spill to disk along
>> with serialization
>>                  val kInMsgParts =
>> k.repartition(otherConf("dStreamPartitions").toInt).
>>
>> 2015-02-13 13:27 GMT+08:00 Cody Koeninger <c...@koeninger.org>:
>>
>>> outdata.foreachRDD( rdd => rdd.foreachPartition(rec => {
>>>
>>>  val writer = new
>>> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>>>
>>>  writer.output(rec)
>>>
>>> }) )
>>>
>>>
>>> So this is creating a new kafka producer for every new output partition,
>>> right?  Have you tried pooling the producers?
>>>
>>> On Thu, Feb 12, 2015 at 10:52 PM, Tim Smith <secs...@gmail.com> wrote:
>>>
>>>> 1) Yes, if I disable writing out to kafka and replace it with some very
>>>> light weight action is rdd.take(1), the app is stable.
>>>>
>>>> 2) The partitions I spoke of in the previous mail are the number of
>>>> partitions I create from each dStream. But yes, since I do processing and
>>>> writing out, per partition, each dStream partition ends up getting written
>>>> to a kafka partition. Flow is, broadly:
>>>> 5 Spark/Kafka Receivers -> Split each dStream into 30 partitions (150
>>>> partitions) -> Apply some transformation logic to each partition -> write
>>>> out each partition to kafka (kafka has 23 partitions). Let me increase the
>>>> number of partitions on the kafka side and see if that helps.
>>>>
>>>> Here's what the main block of code looks like (I added persistence
>>>> back):
>>>>
>>>> val kInStreams = (1 to otherConf("inStreamCount").toInt).map{_ =>
>>>> KafkaUtils.createStream(ssc,otherConf("kafkaConsumerZk").toString,otherConf("kafkaConsumerGroupId").toString,
>>>> Map(otherConf("kafkaConsumerTopic").toString -> 1),
>>>> StorageLevel.MEMORY_AND_DISK_SER) }
>>>>
>>>> if (!configMap.keySet.isEmpty)
>>>>         {
>>>>          // Process stream from each receiver separately
>>>>          // (do not attempt to merge all streams and then re-partition,
>>>> this causes un-necessary and high amount of shuffle in the job)
>>>>          for (k <- kInStreams)
>>>>                 {
>>>>                  // Re-partition stream from each receiver across all
>>>> compute nodes to spread out processing load and allows per partition
>>>> processing
>>>>                  // and, set persistence level to spill to disk along
>>>> with serialization
>>>>                  val kInMsgParts =
>>>> k.repartition(otherConf("dStreamPartitions").toInt).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>
>>>>                  val outdata =
>>>> kInMsgParts.map(x=>myfunc(x._2,configMap)).persist(org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK_SER)
>>>>
>>>>                  // Write each transformed partition to Kafka via the
>>>> "writer" object
>>>>                  outdata.foreachRDD( rdd => rdd.foreachPartition(rec =>
>>>> {
>>>>
>>>>  val writer = new
>>>> KafkaOutputService(otherConf("kafkaProducerTopic").toString, propsMap)
>>>>
>>>>  writer.output(rec)
>>>>
>>>> }) )
>>>>                 }
>>>>         }
>>>>
>>>>
>>>> Here's the life-cycle of a lost block:
>>>>
>>>> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
>>>> memory on nodedn1-23-acme.com:34526 (size: 164.7 KB, free: 379.5 MB)
>>>> 15/02/12 16:26:12 INFO BlockManagerInfo: Added input-4-1423758372200 in
>>>> memory on nodedn1-22-acme.com:42084 (size: 164.7 KB, free: 366.5 MB)
>>>> 15/02/12 16:31:21 INFO BlockManagerInfo: Added input-4-1423758372200 on
>>>> disk on nodedn1-22-acme.com:42084 (size: 164.7 KB)
>>>> 15/02/12 16:31:23 INFO BlockManagerInfo: Added input-4-1423758372200 on
>>>> disk on nodedn1-23-acme.com:34526 (size: 164.7 KB)
>>>> 15/02/12 16:32:27 WARN TaskSetManager: Lost task 54.0 in stage 16291.0
>>>> (TID 1042569, nodedn1-13-acme.com): java.lang.Exception: Could not
>>>> compute split, block input-4-1423758372200 not found
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.1 in stage 16291.0
>>>> (TID 1042575) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 1]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.2 in stage 16291.0
>>>> (TID 1042581) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 2]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.3 in stage 16291.0
>>>> (TID 1042584) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 3]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.4 in stage 16291.0
>>>> (TID 1042586) on executor nodedn1-11-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 4]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.5 in stage 16291.0
>>>> (TID 1042589) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 5]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.6 in stage 16291.0
>>>> (TID 1042594) on executor nodedn1-15-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 6]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.7 in stage 16291.0
>>>> (TID 1042597) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 7]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.8 in stage 16291.0
>>>> (TID 1042600) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 8]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.9 in stage 16291.0
>>>> (TID 1042606) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 9]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.10 in stage 16291.0
>>>> (TID 1042609) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 10]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.11 in stage 16291.0
>>>> (TID 1042611) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 11]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.12 in stage 16291.0
>>>> (TID 1042615) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 12]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.13 in stage 16291.0
>>>> (TID 1042618) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 13]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.14 in stage 16291.0
>>>> (TID 1042622) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 14]
>>>> 15/02/12 16:32:27 INFO TaskSetManager: Lost task 54.15 in stage 16291.0
>>>> (TID 1042625) on executor nodedn1-16-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 15]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.16 in stage 16291.0
>>>> (TID 1042628) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 16]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.17 in stage 16291.0
>>>> (TID 1042633) on executor nodedn1-16-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 17]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.18 in stage 16291.0
>>>> (TID 1042636) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 18]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.19 in stage 16291.0
>>>> (TID 1042640) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 19]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.20 in stage 16291.0
>>>> (TID 1042642) on executor nodedn1-16-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 20]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.21 in stage 16291.0
>>>> (TID 1042645) on executor nodedn1-16-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 21]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.22 in stage 16291.0
>>>> (TID 1042648) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 22]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.23 in stage 16291.0
>>>> (TID 1042654) on executor nodedn1-23-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 23]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.24 in stage 16291.0
>>>> (TID 1042660) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 24]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.25 in stage 16291.0
>>>> (TID 1042662) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 25]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.26 in stage 16291.0
>>>> (TID 1042663) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 26]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.27 in stage 16291.0
>>>> (TID 1042665) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 27]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.28 in stage 16291.0
>>>> (TID 1042667) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 28]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.29 in stage 16291.0
>>>> (TID 1042671) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 29]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.30 in stage 16291.0
>>>> (TID 1042672) on executor nodedn1-12-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 30]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.31 in stage 16291.0
>>>> (TID 1042674) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 31]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.32 in stage 16291.0
>>>> (TID 1042677) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 32]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.33 in stage 16291.0
>>>> (TID 1042680) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 33]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.34 in stage 16291.0
>>>> (TID 1042681) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 34]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.35 in stage 16291.0
>>>> (TID 1042682) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 35]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.36 in stage 16291.0
>>>> (TID 1042687) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 36]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.37 in stage 16291.0
>>>> (TID 1042689) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 37]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.38 in stage 16291.0
>>>> (TID 1042690) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 38]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.39 in stage 16291.0
>>>> (TID 1042693) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 39]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.40 in stage 16291.0
>>>> (TID 1042697) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 40]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.41 in stage 16291.0
>>>> (TID 1042700) on executor nodedn1-13-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 41]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.42 in stage 16291.0
>>>> (TID 1042706) on executor nodedn1-15-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 42]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.43 in stage 16291.0
>>>> (TID 1042710) on executor nodedn1-14-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 43]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.44 in stage 16291.0
>>>> (TID 1042713) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 44]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.45 in stage 16291.0
>>>> (TID 1042716) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 45]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.46 in stage 16291.0
>>>> (TID 1042718) on executor nodedn1-15-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 46]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.47 in stage 16291.0
>>>> (TID 1042720) on executor nodedn1-12-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 47]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.48 in stage 16291.0
>>>> (TID 1042724) on executor nodedn1-12-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 48]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.49 in stage 16291.0
>>>> (TID 1042729) on executor nodedn1-16-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 49]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.50 in stage 16291.0
>>>> (TID 1042730) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 50]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.51 in stage 16291.0
>>>> (TID 1042733) on executor nodedn1-21-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 51]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.52 in stage 16291.0
>>>> (TID 1042736) on executor nodedn1-19-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 52]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.53 in stage 16291.0
>>>> (TID 1042740) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 53]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.54 in stage 16291.0
>>>> (TID 1042743) on executor nodedn1-22-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 54]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.55 in stage 16291.0
>>>> (TID 1042745) on executor nodedn1-18-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 55]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.56 in stage 16291.0
>>>> (TID 1042754) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 56]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.57 in stage 16291.0
>>>> (TID 1042758) on executor nodedn1-17-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 57]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.58 in stage 16291.0
>>>> (TID 1042762) on executor nodedn1-12-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 58]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.59 in stage 16291.0
>>>> (TID 1042766) on executor nodedn1-23-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 59]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.60 in stage 16291.0
>>>> (TID 1042774) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 60]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.61 in stage 16291.0
>>>> (TID 1042779) on executor nodedn1-13-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 61]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.62 in stage 16291.0
>>>> (TID 1042789) on executor nodedn1-20-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 62]
>>>> 15/02/12 16:32:28 INFO TaskSetManager: Lost task 54.63 in stage 16291.0
>>>> (TID 1042793) on executor nodedn1-15-acme.com: java.lang.Exception
>>>> (Could not compute split, block input-4-1423758372200 not found) [duplicate
>>>> 63]
>>>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>>> 54 in stage 16291.0 failed 64 times, most recent failure: Lost task 54.63
>>>> in stage 16291.0 (TID 1042793, nodedn1-15-acme.com):
>>>> java.lang.Exception: Could not compute split, block input-4-1423758372200
>>>> not found
>>>> Exception in thread "main" org.apache.spark.SparkException: Job aborted
>>>> due to stage failure: Task 54 in stage 16291.0 failed 64 times, most recent
>>>> failure: Lost task 54.63 in stage 16291.0 (TID 1042793,
>>>> nodedn1-15-acme.com): java.lang.Exception: Could not compute split,
>>>> block input-4-1423758372200 not found
>>>>
>>>>
>>>> Thanks for looking into it.
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Thu, Feb 12, 2015 at 8:10 PM, Tathagata Das <t...@databricks.com>
>>>> wrote:
>>>>
>>>>> Hey Tim,
>>>>>
>>>>> Let me get the key points.
>>>>> 1. If you are not writing back to Kafka, the delay is stable? That is,
>>>>> instead of "foreachRDD { // write to kafka }"  if you do "dstream.count",
>>>>> then the delay is stable. Right?
>>>>> 2. If so, then Kafka is the bottleneck. Is the number of partitions,
>>>>> that you spoke of the in the second mail, that determines the parallelism
>>>>> in writes? Is it stable with 30 partitions?
>>>>>
>>>>> Regarding the block exception, could you give me a trace of info level
>>>>> logging that leads to this error? Basically I want trace the lifecycle of
>>>>> the block.
>>>>>
>>>>> TD
>>>>>
>>>>>
>>>>>
>>>>> On Thu, Feb 12, 2015 at 6:29 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>>
>>>>>> Hi Gerard,
>>>>>>
>>>>>> Great write-up and really good guidance in there.
>>>>>>
>>>>>> I have to be honest, I don't know why but setting # of partitions for
>>>>>> each dStream to a low number (5-10) just causes the app to choke/crash.
>>>>>> Setting it to 20 gets the app going but with not so great delays. Bump it
>>>>>> up to 30 and I start winning the war where processing time is 
>>>>>> consistently
>>>>>> below batch time window (20 seconds) except for a batch every few batches
>>>>>> where the compute time spikes 10x the usual.
>>>>>>
>>>>>> Following your guide, I took out some "logInfo" statements I had in
>>>>>> the app but didn't seem to make much difference :(
>>>>>>
>>>>>> With a higher time window (20 seconds), I got the app to run stably
>>>>>> for a few hours but then ran into the dreaded "java.lang.Exception: Could
>>>>>> not compute split, block input-0-1423761240800 not found". Wonder if I 
>>>>>> need
>>>>>> to add RDD persistence back?
>>>>>>
>>>>>> Also, I am reaching out to Virdata with some ProServ inquiries.
>>>>>>
>>>>>> Thanks
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Feb 12, 2015 at 4:30 AM, Gerard Maas <gerard.m...@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Tim,
>>>>>>>
>>>>>>> From this: " There are 5 kafka receivers and each incoming stream
>>>>>>> is split into 40 partitions"  I suspect that you're creating too
>>>>>>> many tasks for Spark to process on time.
>>>>>>> Could you try some of the 'knobs' I describe here to see if that
>>>>>>> would help?
>>>>>>>
>>>>>>> http://www.virdata.com/tuning-spark/
>>>>>>>
>>>>>>> -kr, Gerard.
>>>>>>>
>>>>>>> On Thu, Feb 12, 2015 at 8:44 AM, Tim Smith <secs...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Just read the thread "Are these numbers abnormal for spark
>>>>>>>> streaming?" and I think I am seeing similar results - that is - 
>>>>>>>> increasing
>>>>>>>> the window seems to be the trick here. I will have to monitor for a few
>>>>>>>> hours/days before I can conclude (there are so many knobs/dials).
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Wed, Feb 11, 2015 at 11:16 PM, Tim Smith <secs...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> On Spark 1.2 (have been seeing this behaviour since 1.0), I have a
>>>>>>>>> streaming app that consumes data from Kafka and writes it back to 
>>>>>>>>> Kafka
>>>>>>>>> (different topic). My big problem has been Total Delay. While 
>>>>>>>>> execution
>>>>>>>>> time is usually <window size (in seconds), the total delay ranges 
>>>>>>>>> from a
>>>>>>>>> minutes to hours(s) (keeps going up).
>>>>>>>>>
>>>>>>>>> For a little while, I thought I had solved the issue by bumping up
>>>>>>>>> the driver memory. Then I expanded my Kafka cluster to add more nodes 
>>>>>>>>> and
>>>>>>>>> the issue came up again. I tried a few things to smoke out the issue 
>>>>>>>>> and
>>>>>>>>> something tells me the driver is the bottleneck again:
>>>>>>>>>
>>>>>>>>> 1) From my app, I took out the entire write-out-to-kafka piece.
>>>>>>>>> Sure enough, execution, scheduling delay and hence total delay fell 
>>>>>>>>> to sub
>>>>>>>>> second. This assured me that whatever processing I do before writing 
>>>>>>>>> back
>>>>>>>>> to kafka isn't the bottleneck.
>>>>>>>>>
>>>>>>>>> 2) In my app, I had RDD persistence set at different points but my
>>>>>>>>> code wasn't really re-using any RDDs so I took out all explicit 
>>>>>>>>> persist()
>>>>>>>>> statements. And added, "spar...unpersist" to "true" in the context. 
>>>>>>>>> After
>>>>>>>>> this, it doesn't seem to matter how much memory I give my executor, 
>>>>>>>>> the
>>>>>>>>> total delay seems to be in the same range. I tried per executor 
>>>>>>>>> memory from
>>>>>>>>> 2G to 12G with no change in total delay so executors aren't memory 
>>>>>>>>> starved.
>>>>>>>>> Also, in the SparkUI, under the Executors tab, all executors show 
>>>>>>>>> 0/1060MB
>>>>>>>>> used when per executor memory is set to 2GB, for example.
>>>>>>>>>
>>>>>>>>> 3) Input rate in the kafka consumer restricts spikes in incoming
>>>>>>>>> data.
>>>>>>>>>
>>>>>>>>> 4) Tried FIFO and FAIR but didn't make any difference.
>>>>>>>>>
>>>>>>>>> 5) Adding executors beyond a certain points seems useless (I guess
>>>>>>>>> excess ones just sit idle).
>>>>>>>>>
>>>>>>>>> At any given point in time, the SparkUI shows only one batch
>>>>>>>>> pending processing. So with just one batch pending processing, why 
>>>>>>>>> would
>>>>>>>>> the scheduling delay run into minutes/hours if execution time is 
>>>>>>>>> within the
>>>>>>>>> batch window duration? There aren't any failed stages or jobs.
>>>>>>>>>
>>>>>>>>> Right now, I have 100 executors ( i have tried setting executors
>>>>>>>>> from 50-150), each with 2GB and 4 cores and the driver running with 
>>>>>>>>> 16GB.
>>>>>>>>> There are 5 kafka receivers and each incoming stream is split into 40
>>>>>>>>> partitions. Per receiver, input rate is restricted to 20000 messages 
>>>>>>>>> per
>>>>>>>>> second.
>>>>>>>>>
>>>>>>>>> Can anyone help me with clues or areas to look into, for
>>>>>>>>> troubleshooting the issue?
>>>>>>>>>
>>>>>>>>> One nugget I found buried in the code says:
>>>>>>>>> "The scheduler delay includes the network delay to send the task
>>>>>>>>> to the worker machine and to send back the result (but not the time to
>>>>>>>>> fetch the task result, if it needed to be fetched from the block 
>>>>>>>>> manager on
>>>>>>>>> the worker)."
>>>>>>>>>
>>>>>>>>> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
>>>>>>>>>
>>>>>>>>> Could this be an issue with the driver being a bottlneck? All the
>>>>>>>>> executors posting their logs/stats to the driver?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>>
>>>>>>>>> Tim
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>

Reply via email to