Scaling Kafka Streaming to Thousands of Partitions
Hi, We have been using Spark Kafka streaming for real time processing with success. The scale of this stream has been increasing with data growth, and we have been able to scale up by adding more brokers to the Kafka cluster, adding more partitions to the topic, and adding more executors to the spark streaming app. At this time our biggest topic has about 750 partitions. And in every mini batch of the streaming app, the driver will fetch the metadata from Kafka regarding this topic and arrange the tasks. I wonder will this step become a bottleneck, if we continue to scale in this way? Is there any best practices in scaling up the streaming job? Thanks, Charles
Re: Shuffle memory woes
"The dataset is 100gb at most, the spills can up to 10T-100T" -- I have had the same experiences, although not to this extreme (the spills were < 10T while the input was ~ 100s gb) and haven't found any solution yet. I don't believe this is related to input data format. in my case, I got my input data by loading from Hive tables. On Sun, Feb 7, 2016 at 6:28 AM, Sea <261810...@qq.com> wrote: > Hi,Corey: >"The dataset is 100gb at most, the spills can up to 10T-100T", Are your > input files lzo format, and you use sc.text() ? If memory is not enough, > spark will spill 3-4x of input data to disk. > > > -- 原始邮件 -- > *发件人:* "Corey Nolet";; > *发送时间:* 2016年2月7日(星期天) 晚上8:56 > *收件人:* "Igor Berman"; > *抄送:* "user"; > *主题:* Re: Shuffle memory woes > > As for the second part of your questions- we have a fairly complex join > process which requires a ton of stage orchestration from our driver. I've > written some code to be able to walk down our DAG tree and execute siblings > in the tree concurrently where possible (forcing cache to disk on children > that that have multiple chiildren themselves so that they can be run > concurrently). Ultimatey, we have seen significant speedup in our jobs by > keeping tasks as busy as possible processing concurrent stages. Funny > enough though, the stage that is causing problems with shuffling for us has > a lot of children and doesn't even run concurrently with any other stages > so I ruled out the concurrency of the stages as a culprit for the > shuffliing problem we're seeing. > > On Sun, Feb 7, 2016 at 7:49 AM, Corey Nolet wrote: > >> Igor, >> >> I don't think the question is "why can't it fit stuff in memory". I know >> why it can't fit stuff in memory- because it's a large dataset that needs >> to have a reduceByKey() run on it. My understanding is that when it doesn't >> fit into memory it needs to spill in order to consolidate intermediary >> files into a single file. The more data you need to run through this, the >> more it will need to spill. My findings is that once it gets stuck in this >> spill chain with our dataset it's all over @ that point because it will >> spill and spill and spill and spill and spill. If I give the shuffle enough >> memory it won't- irrespective of the number of partitions we have (i've >> done everything from repartition(500) to repartition(2500)). It's not a >> matter of running out of memory on a single node because the data is >> skewed. It's more a matter of the shuffle buffer filling up and needing to >> spill. I think what may be happening is that it gets to a point where it's >> spending more time reading/writing from disk while doing the spills then it >> is actually processing any data. I can tell this because I can see that the >> spills sometimes get up into the 10's to 100's of TB where the input data >> was maybe acquireExecutionMemory at most. Unfortunately my code is on a >> private internal network and I'm not able to share it. >> >> On Sun, Feb 7, 2016 at 3:38 AM, Igor Berman >> wrote: >> >>> so can you provide code snippets: especially it's interesting to see >>> what are your transformation chain, how many partitions are there on each >>> side of shuffle operation >>> >>> the question is why it can't fit stuff in memory when you are shuffling >>> - maybe your partitioner on "reduce" side is not configured properly? I >>> mean if map side is ok, and you just reducing by key or something it should >>> be ok, so some detail is missing...skewed data? aggregate by key? >>> >>> On 6 February 2016 at 20:13, Corey Nolet wrote: >>> Igor, Thank you for the response but unfortunately, the problem I'm referring to goes beyond this. I have set the shuffle memory fraction to be 90% and set the cache memory to be 0. Repartitioning the RDD helped a tad on the map side but didn't do much for the spilling when there was no longer any memory left for the shuffle. Also the new auto-memory management doesn't seem like it'll have too much of an effect after i've already given most the memory i've allocated to the shuffle. The problem I'm having is most specifically related to the shuffle performing declining by several orders of magnitude when it needs to spill multiple times (it ends up spilling several hundred for me when it can't fit stuff into memory). On Sat, Feb 6, 2016 at 6:40 AM, Igor Berman wrote: > Hi, > usually you can solve this by 2 steps > make rdd to have more partitions > play with shuffle memory fraction > > in spark 1.6 cache vs shuffle memory fractions are adjusted > automatically > > On 5 February 2016 at 23:07, Corey Nolet wrote: > >> I just recently had a discovery that my jobs were taking several >> hours to completely because of excess shuffle spills. What I found was >> that >> when I hit the high point where I didn't have enough mem
Re: Use KafkaRDD to Batch Process Messages from Kafka
Thanks a lot for the help! I'll definately check out the KafkaCluster.scala. I probably first try use that api from java, and later try to build the subproject. thanks, Charles On Fri, Jan 22, 2016 at 12:26 PM, Cody Koeninger wrote: > Yes, you should query Kafka if you want to know the latest available > offsets. > > There's code to make this straightforward in KafkaCluster.scala, but the > interface isnt public. There's an outstanding pull request to expose the > api at > > https://issues.apache.org/jira/browse/SPARK-10963 > > but frankly it appears unlikely that a committer will merge it. > > Your options are: > - use that api from java (since javac apparently doesn't respect scala > privacy) > - apply that patch or its equivalent and rebuild (just the > spark-streaming-kafka subproject, you don't have to redeploy spark) > - write / find equivalent code yourself > > If you want to build a patched version of the subproject and need a hand, > just ask on the list. > > > On Fri, Jan 22, 2016 at 1:30 PM, Charles Chao > wrote: > >> Hi, >> >> I have been using DirectKafkaInputDStream in Spark Streaming to consumer >> kafka messages and it’s been working very well. Now I have the need to >> batch process messages from Kafka, for example, retrieve all messages every >> hour and process them, output to destinations like Hive or HDFS. I would >> like to use KafkaRDD and normal Spark job to achieve this, so that many of >> the logics in my streaming code can be reused. >> >> In the excellent blog post *Exactly-Once Spark Streaming from Apache >> Kafka*, there are code examples about using KafkaRDD. However, it >> requires an array of OffsetRange, which needs specify the start and end >> offset. >> >> My question is, should I write additional code to talk to Kafka and >> retrieve the latest offset for each partition every time this hourly job is >> run? Or is there any way to let KafkaUtils know to “read till latest” when >> creating the KafkaRDD? >> >> Thanks, >> >> Charles >> >> >
Use KafkaRDD to Batch Process Messages from Kafka
Hi, I have been using DirectKafkaInputDStream in Spark Streaming to consumer kafka messages and it's been working very well. Now I have the need to batch process messages from Kafka, for example, retrieve all messages every hour and process them, output to destinations like Hive or HDFS. I would like to use KafkaRDD and normal Spark job to achieve this, so that many of the logics in my streaming code can be reused. In the excellent blog post Exactly-Once Spark Streaming from Apache Kafka, there are code examples about using KafkaRDD. However, it requires an array of OffsetRange, which needs specify the start and end offset. My question is, should I write additional code to talk to Kafka and retrieve the latest offset for each partition every time this hourly job is run? Or is there any way to let KafkaUtils know to "read till latest" when creating the KafkaRDD? Thanks, Charles
Re: Event logging not working when worker machine terminated
Fixed in 1.3.1 https://issues.apache.org/jira/browse/SPARK-6950 Thanks, Charles On 9/9/15, 8:54 AM, "David Rosenstrauch" wrote: >Thanks for the info. Do you know if there's a ticket already open for >this issue? If so, I'd like to monitor it. > >Thanks, > >DR > >On 09/09/2015 11:50 AM, Charles Chao wrote: >> I have encountered the same problem after migrating from 1.2.2 to 1.3.0. >> After some searching this appears to be a bug introduced in 1.3. >>Hopefully >> it¹s fixed in 1.4. >> >> Thanks, >> >> Charles >> >> >> >> >> >> On 9/9/15, 7:30 AM, "David Rosenstrauch" wrote: >> >>> Standalone. >>> >>> On 09/08/2015 11:18 PM, Jeff Zhang wrote: >>>> What cluster mode do you use ? Standalone/Yarn/Mesos ? >>>> >>>> >>>> On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch >>>> >>>> wrote: >>>> >>>>> Our Spark cluster is configured to write application history event >>>>> logging >>>>> to a directory on HDFS. This all works fine. (I've tested it with >>>>> Spark >>>>> shell.) >>>>> >>>>> However, on a large, long-running job that we ran tonight, one of our >>>>> machines at the cloud provider had issues and had to be terminated >>>>>and >>>>> replaced in the middle of the job. >>>>> >>>>> The job completed correctly, and shows in state FINISHED in the >>>>> "Completed >>>>> Applications" section of the Spark GUI. However, when I try to look >>>>> at the >>>>> application's history, the GUI says "Application history not found" >>>>>and >>>>> "Application ... is still in progress". >>>>> >>>>> The reason appears to be the machine that was terminated. When I >>>>> click on >>>>> the executor list for that job, Spark is showing the executor from >>>>>the >>>>> terminated machine as still in state RUNNING. >>>>> >>>>> Any solution/workaround for this? BTW, I'm running Spark v1.3.0. >>>>> >>>>> Thanks, >>>>> >>>>> DR >>>>> >>>>> - >>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>> >>>>> >>>> >>>> >>> >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >> >> >> - >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >> For additional commands, e-mail: user-h...@spark.apache.org >> > > >- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org
Re: Event logging not working when worker machine terminated
I have encountered the same problem after migrating from 1.2.2 to 1.3.0. After some searching this appears to be a bug introduced in 1.3. Hopefully it¹s fixed in 1.4. Thanks, Charles On 9/9/15, 7:30 AM, "David Rosenstrauch" wrote: >Standalone. > >On 09/08/2015 11:18 PM, Jeff Zhang wrote: >> What cluster mode do you use ? Standalone/Yarn/Mesos ? >> >> >> On Wed, Sep 9, 2015 at 11:15 AM, David Rosenstrauch >> wrote: >> >>> Our Spark cluster is configured to write application history event >>>logging >>> to a directory on HDFS. This all works fine. (I've tested it with >>>Spark >>> shell.) >>> >>> However, on a large, long-running job that we ran tonight, one of our >>> machines at the cloud provider had issues and had to be terminated and >>> replaced in the middle of the job. >>> >>> The job completed correctly, and shows in state FINISHED in the >>>"Completed >>> Applications" section of the Spark GUI. However, when I try to look >>>at the >>> application's history, the GUI says "Application history not found" and >>> "Application ... is still in progress". >>> >>> The reason appears to be the machine that was terminated. When I >>>click on >>> the executor list for that job, Spark is showing the executor from the >>> terminated machine as still in state RUNNING. >>> >>> Any solution/workaround for this? BTW, I'm running Spark v1.3.0. >>> >>> Thanks, >>> >>> DR >>> >>> - >>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>> For additional commands, e-mail: user-h...@spark.apache.org >>> >>> >> >> > > >- >To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >For additional commands, e-mail: user-h...@spark.apache.org > - To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org