Scaling Kafka Streaming to Thousands of Partitions

2019-05-25 Thread Charles Chao
Hi,

We have been using Spark Kafka streaming for real time processing with
success. The scale of this stream has been increasing with data growth, and
we have been able to scale up by adding more brokers to the Kafka cluster,
adding more partitions to the topic, and adding more executors to the spark
streaming app.

At this time our biggest topic has about 750 partitions. And in every mini
batch of the streaming app, the driver will fetch the metadata from Kafka
regarding this topic and arrange the tasks. I wonder will this step become
a bottleneck, if we continue to scale in this way? Is there any best
practices in scaling up the streaming job?

Thanks,

Charles


Re: Shuffle memory woes

2016-02-07 Thread Charles Chao
 "The dataset is 100gb at most, the spills can up to 10T-100T"

-- I have had the same experiences, although not to this extreme (the
spills were < 10T while the input was ~ 100s gb) and haven't found any
solution yet. I don't believe this is related to input data format. in my
case, I got my input data by loading from Hive tables.

On Sun, Feb 7, 2016 at 6:28 AM, Sea <261810...@qq.com> wrote:

> Hi,Corey:
>"The dataset is 100gb at most, the spills can up to 10T-100T", Are your
> input files lzo format, and you use sc.text() ? If memory is not enough,
> spark will spill 3-4x of input data to disk.
>
>
> -- 原始邮件 --
> *发件人:* "Corey Nolet";;
> *发送时间:* 2016年2月7日(星期天) 晚上8:56
> *收件人:* "Igor Berman";
> *抄送:* "user";
> *主题:* Re: Shuffle memory woes
>
> As for the second part of your questions- we have a fairly complex join
> process which requires a ton of stage orchestration from our driver. I've
> written some code to be able to walk down our DAG tree and execute siblings
> in the tree concurrently where possible (forcing cache to disk on children
> that that have multiple chiildren themselves so that they can be run
> concurrently). Ultimatey, we have seen significant speedup in our jobs by
> keeping tasks as busy as possible processing concurrent stages. Funny
> enough though, the stage that is causing problems with shuffling for us has
> a lot of children and doesn't even run concurrently with any other stages
> so I ruled out the concurrency of the stages as a culprit for the
> shuffliing problem we're seeing.
>
> On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet  wrote:
>
>> Igor,
>>
>> I don't think the question is "why can't it fit stuff in memory". I know
>> why it can't fit stuff in memory- because it's a large dataset that needs
>> to have a reduceByKey() run on it. My understanding is that when it doesn't
>> fit into memory it needs to spill in order to consolidate intermediary
>> files into a single file. The more data you need to run through this, the
>> more it will need to spill. My findings is that once it gets stuck in this
>> spill chain with our dataset it's all over @ that point because it will
>> spill and spill and spill and spill and spill. If I give the shuffle enough
>> memory it won't- irrespective of the number of partitions we have (i've
>> done everything from repartition(500) to repartition(2500)). It's not a
>> matter of running out of memory on a single node because the data is
>> skewed. It's more a matter of the shuffle buffer filling up and needing to
>> spill. I think what may be happening is that it gets to a point where it's
>> spending more time reading/writing from disk while doing the spills then it
>> is actually processing any data. I can tell this because I can see that the
>> spills sometimes get up into the 10's to 100's of TB where the input data
>> was maybe acquireExecutionMemory at most. Unfortunately my code is on a
>> private internal network and I'm not able to share it.
>>
>> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman 
>> wrote:
>>
>>> so can you provide code snippets: especially it's interesting to see
>>> what are your transformation chain, how many partitions are there on each
>>> side of shuffle operation
>>>
>>> the question is why it can't fit stuff in memory when you are shuffling
>>> - maybe your partitioner on "reduce" side is not configured properly? I
>>> mean if map side is ok, and you just reducing by key or something it should
>>> be ok, so some detail is missing...skewed data? aggregate by key?
>>>
>>> On 6 February 2016 at 20:13, Corey Nolet  wrote:
>>>
 Igor,

 Thank you for the response but unfortunately, the problem I'm referring
 to goes beyond this. I have set the shuffle memory fraction to be 90% and
 set the cache memory to be 0. Repartitioning the RDD helped a tad on the
 map side but didn't do much for the spilling when there was no longer any
 memory left for the shuffle. Also the new auto-memory management doesn't
 seem like it'll have too much of an effect after i've already given most
 the memory i've allocated to the shuffle. The problem I'm having is most
 specifically related to the shuffle performing declining by several orders
 of magnitude when it needs to spill multiple times (it ends up spilling
 several hundred for me when it can't fit stuff into memory).



 On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman 
 wrote:

> Hi,
> usually you can solve this by 2 steps
> make rdd to have more partitions
> play with shuffle memory fraction
>
> in spark 1.6 cache vs shuffle memory fractions are adjusted
> automatically
>
> On 5 February 2016 at 23:07, Corey Nolet  wrote:
>
>> I just recently had a discovery that my jobs were taking several
>> hours to completely because of excess shuffle spills. What I found was 
>> that
>> when I hit the high point where I didn't have enough mem

Re: Use KafkaRDD to Batch Process Messages from Kafka

2016-01-22 Thread Charles Chao
Thanks a lot for the help! I'll definately check out the
KafkaCluster.scala. I probably first try use that api from java, and later
try to build the subproject.

thanks,

Charles

On Fri, Jan 22, 2016 at 12:26 PM, Cody Koeninger  wrote:

> Yes, you should query Kafka if you want to know the latest available
> offsets.
>
> There's code to make this straightforward in KafkaCluster.scala, but the
> interface isnt public.  There's an outstanding pull request to expose the
> api at
>
> https://issues.apache.org/jira/browse/SPARK-10963
>
> but frankly it appears unlikely that a committer will merge it.
>
> Your options are:
>  - use that api from java (since javac apparently doesn't respect scala
> privacy)
> - apply that patch or its equivalent and rebuild (just the
> spark-streaming-kafka subproject, you don't have to redeploy spark)
> - write / find equivalent code yourself
>
> If you want to build a patched version of the subproject and need a hand,
> just ask on the list.
>
>
> On Fri, Jan 22, 2016 at 1:30 PM, Charles Chao 
> wrote:
>
>> Hi,
>>
>> I have been using DirectKafkaInputDStream in Spark Streaming to consumer
>> kafka messages and it’s been working very well. Now I have the need to
>> batch process messages from Kafka, for example, retrieve all messages every
>> hour and process them, output to destinations like Hive or HDFS. I would
>> like to use KafkaRDD and normal Spark job to achieve this, so that many of
>> the logics in my streaming code can be reused.
>>
>> In the excellent blog post *Exactly-Once Spark Streaming from Apache
>> Kafka*, there are code examples about using KafkaRDD. However, it
>> requires an array of OffsetRange, which needs specify the start and end
>> offset.
>>
>> My question is, should I write additional code to talk to Kafka and
>> retrieve the latest offset for each partition every time this hourly job is
>> run? Or is there any way to let KafkaUtils know to “read till latest” when
>> creating the KafkaRDD?
>>
>> Thanks,
>>
>> Charles
>>
>>
>


Use KafkaRDD to Batch Process Messages from Kafka

2016-01-22 Thread Charles Chao
Hi,

I have been using DirectKafkaInputDStream in Spark Streaming to consumer kafka 
messages and it's been working very well. Now I have the need to batch process 
messages from Kafka, for example, retrieve all messages every hour and process 
them, output to destinations like Hive or HDFS. I would like to use KafkaRDD 
and normal Spark job to achieve this, so that many of the logics in my 
streaming code can be reused.

In the excellent blog post Exactly-Once Spark Streaming from Apache Kafka, 
there are code examples about using KafkaRDD. However, it requires an array of 
OffsetRange, which needs specify the start and end offset.

My question is, should I write additional code to talk to Kafka and retrieve 
the latest offset for each partition every time this hourly job is run? Or is 
there any way to let KafkaUtils know to "read till latest" when creating the 
KafkaRDD?

Thanks,

Charles



Re: Event logging not working when worker machine terminated

2015-09-09 Thread Charles Chao
Fixed in 1.3.1

https://issues.apache.org/jira/browse/SPARK-6950

Thanks, 

Charles





On 9/9/15, 8:54 AM, "David Rosenstrauch"  wrote:

>Thanks for the info.  Do you know if there's a ticket already open for
>this issue?  If so, I'd like to monitor it.
>
>Thanks,
>
>DR
>
>On 09/09/2015 11:50 AM, Charles Chao wrote:
>> I have encountered the same problem after migrating from 1.2.2 to 1.3.0.
>> After some searching this appears to be a bug introduced in 1.3.
>>Hopefully
>> it¹s fixed in 1.4.
>>
>> Thanks,
>>
>> Charles
>>
>>
>>
>>
>>
>> On 9/9/15, 7:30 AM, "David Rosenstrauch"  wrote:
>>
>>> Standalone.
>>>
>>> On 09/08/2015 11:18 PM, Jeff Zhang wrote:
>>>> What cluster mode do you use ? Standalone/Yarn/Mesos ?
>>>>
>>>>
>>>> On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch
>>>>
>>>> wrote:
>>>>
>>>>> Our Spark cluster is configured to write application history event
>>>>> logging
>>>>> to a directory on HDFS.  This all works fine.  (I've tested it with
>>>>> Spark
>>>>> shell.)
>>>>>
>>>>> However, on a large, long-running job that we ran tonight, one of our
>>>>> machines at the cloud provider had issues and had to be terminated
>>>>>and
>>>>> replaced in the middle of the job.
>>>>>
>>>>> The job completed correctly, and shows in state FINISHED in the
>>>>> "Completed
>>>>> Applications" section of the Spark GUI.  However, when I try to look
>>>>> at the
>>>>> application's history, the GUI says "Application history not found"
>>>>>and
>>>>> "Application ... is still in progress".
>>>>>
>>>>> The reason appears to be the machine that was terminated.  When I
>>>>> click on
>>>>> the executor list for that job, Spark is showing the executor from
>>>>>the
>>>>> terminated machine as still in state RUNNING.
>>>>>
>>>>> Any solution/workaround for this?  BTW, I'm running Spark v1.3.0.
>>>>>
>>>>> Thanks,
>>>>>
>>>>> DR
>>>>>
>>>>> -
>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>
>>>>>
>>>>
>>>>
>>>
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>
>>
>> -
>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>> For additional commands, e-mail: user-h...@spark.apache.org
>>
>
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Re: Event logging not working when worker machine terminated

2015-09-09 Thread Charles Chao
I have encountered the same problem after migrating from 1.2.2 to 1.3.0.
After some searching this appears to be a bug introduced in 1.3. Hopefully
it¹s fixed in 1.4.

Thanks, 

Charles





On 9/9/15, 7:30 AM, "David Rosenstrauch"  wrote:

>Standalone.
>
>On 09/08/2015 11:18 PM, Jeff Zhang wrote:
>> What cluster mode do you use ? Standalone/Yarn/Mesos ?
>>
>>
>> On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch 
>> wrote:
>>
>>> Our Spark cluster is configured to write application history event
>>>logging
>>> to a directory on HDFS.  This all works fine.  (I've tested it with
>>>Spark
>>> shell.)
>>>
>>> However, on a large, long-running job that we ran tonight, one of our
>>> machines at the cloud provider had issues and had to be terminated and
>>> replaced in the middle of the job.
>>>
>>> The job completed correctly, and shows in state FINISHED in the
>>>"Completed
>>> Applications" section of the Spark GUI.  However, when I try to look
>>>at the
>>> application's history, the GUI says "Application history not found" and
>>> "Application ... is still in progress".
>>>
>>> The reason appears to be the machine that was terminated.  When I
>>>click on
>>> the executor list for that job, Spark is showing the executor from the
>>> terminated machine as still in state RUNNING.
>>>
>>> Any solution/workaround for this?  BTW, I'm running Spark v1.3.0.
>>>
>>> Thanks,
>>>
>>> DR
>>>
>>> -
>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>
>>>
>>
>>
>
>
>-
>To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>For additional commands, e-mail: user-h...@spark.apache.org
>


-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org