I did try without repartition, initially, but that was even more horrible
because instead of the allocated 100 executors, only 30 (which is the
number of kafka partitions) would have to do the work. The "MyFunc" is a
CPU bound task so adding more memory per executor wouldn't help and I saw
that each of the 30 executors was only using one thread/core on each Spark
box. I could go and play with threading in MyFunc but I don't want to mess
with threading with all the parallelism already involved and I don't think
in-app threading outside of what the framework does is really desirable.

With repartition, there is shuffle involved, but at least the computation
load spreads across all 100 executors instead of just 30.




On Fri, Jun 19, 2015 at 7:14 PM, Cody Koeninger <c...@koeninger.org> wrote:

> If that's the case, you're still only using as many read executors as
> there are kafka partitions.
>
> I'd remove the repartition. If you weren't doing any shuffles in the old
> job, and are doing a shuffle in the new job, it's not really comparable.
>
> On Fri, Jun 19, 2015 at 8:16 PM, Tim Smith <secs...@gmail.com> wrote:
>
>> On Fri, Jun 19, 2015 at 5:15 PM, Tathagata Das <t...@databricks.com>
>> wrote:
>>
>>> Also, can you find from the spark UI the break up of the stages in each
>>> batch's jobs, and find which stage is taking more time after a while?
>>>
>>
>> Sure, will try to debug/troubleshoot. Are there enhancements to this
>> specific API between 1.3 and 1.4 that can substantially change it's
>> behaviour?
>>
>>
>>> On Fri, Jun 19, 2015 at 4:51 PM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>> when you say your old version was
>>>>
>>>> k = createStream .....
>>>>
>>>> were you manually creating multiple receivers?  Because otherwise
>>>> you're only using one receiver on one executor...
>>>>
>>>
>> Yes, sorry, the earlier/stable version was more like:
>> kInStreams = (1 to n).map{_ => KafkaUtils.createStream ............ // n
>> being the number of kafka partitions, 1 receiver per partition
>> val k = ssc.union(kInStreams)
>> val dataout = k.map(x=>myFunc(x._2,someParams))
>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>> myOutputFunc.write(rec) })
>>
>> Thanks,
>>
>> Tim
>>
>>
>>
>>
>>
>>>
>>>> If that's the case I'd try direct stream without the repartitioning.
>>>>
>>>>
>>>> On Fri, Jun 19, 2015 at 6:43 PM, Tim Smith <secs...@gmail.com> wrote:
>>>>
>>>>> Essentially, I went from:
>>>>> k = createStream .....
>>>>> val dataout = k.map(x=>myFunc(x._2,someParams))
>>>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>>>>> myOutputFunc.write(rec) })
>>>>>
>>>>> To:
>>>>> kIn = createDirectStream .....
>>>>> k = kIn.repartition(numberOfExecutors) //since #kafka partitions <
>>>>> #spark-executors
>>>>> val dataout = k.map(x=>myFunc(x._2,someParams))
>>>>> dataout.foreachRDD ( rdd => rdd.foreachPartition(rec => {
>>>>> myOutputFunc.write(rec) })
>>>>>
>>>>> With the new API, the app starts up and works fine for a while but I
>>>>> guess starts to deteriorate after a while. With the existing API
>>>>> "createStream", the app does deteriorate but over a much longer period,
>>>>> hours vs days.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Fri, Jun 19, 2015 at 1:40 PM, Tathagata Das <t...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Yes, please tell us what operation are you using.
>>>>>>
>>>>>> TD
>>>>>>
>>>>>> On Fri, Jun 19, 2015 at 11:42 AM, Cody Koeninger <c...@koeninger.org>
>>>>>> wrote:
>>>>>>
>>>>>>> Is there any more info you can provide / relevant code?
>>>>>>>
>>>>>>> On Fri, Jun 19, 2015 at 1:23 PM, Tim Smith <secs...@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Update on performance of the new API: the new code using the
>>>>>>>> createDirectStream API ran overnight and when I checked the app state 
>>>>>>>> in
>>>>>>>> the morning, there were massive scheduling delays :(
>>>>>>>>
>>>>>>>> Not sure why and haven't investigated a whole lot. For now,
>>>>>>>> switched back to the createStream API build of my app. Yes, for the 
>>>>>>>> record,
>>>>>>>> this is with CDH 5.4.1 and Spark 1.3.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Jun 18, 2015 at 7:05 PM, Tim Smith <secs...@gmail.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> Thanks for the super-fast response, TD :)
>>>>>>>>>
>>>>>>>>> I will now go bug my hadoop vendor to upgrade from 1.3 to 1.4.
>>>>>>>>> Cloudera, are you listening? :D
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Thu, Jun 18, 2015 at 7:02 PM, Tathagata Das <
>>>>>>>>> tathagata.das1...@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Are you using Spark 1.3.x ? That explains. This issue has been
>>>>>>>>>> fixed in Spark 1.4.0. Bonus you get a fancy new streaming UI with 
>>>>>>>>>> more
>>>>>>>>>> awesome stats. :)
>>>>>>>>>>
>>>>>>>>>> On Thu, Jun 18, 2015 at 7:01 PM, Tim Smith <secs...@gmail.com>
>>>>>>>>>> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> I just switched from "createStream" to the "createDirectStream"
>>>>>>>>>>> API for kafka and while things otherwise seem happy, the first 
>>>>>>>>>>> thing I
>>>>>>>>>>> noticed is that stream/receiver stats are gone from the Spark UI :( 
>>>>>>>>>>> Those
>>>>>>>>>>> stats were very handy for keeping an eye on health of the app.
>>>>>>>>>>>
>>>>>>>>>>> What's the best way to re-create those in the Spark UI? Maintain
>>>>>>>>>>> Accumulators? Would be really nice to get back receiver-like stats 
>>>>>>>>>>> even
>>>>>>>>>>> though I understand that "createDirectStream" is a receiver-less 
>>>>>>>>>>> design.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>>
>>>>>>>>>>> Tim
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to