@Cody I was able to bring my processing time down to a second by setting maxRatePerPartition as discussed. My bad that I didn't recognize it as the cause of my scheduling delay.
Since then I've tried experimenting with a larger Spark Context duration. I've been trying to get some noticeable improvement inserting messages from Kafka -> Hbase using the above application. I'm currently getting around 3500 inserts / second on a 9 node hbase cluster. So far, I haven't been able to get much more throughput. Then I'm looking for advice here how I should tune Kafka and Spark for this job. I can create a kafka topic with as many partitions that I want. I can set the Duration and maxRatePerPartition. I have 1.7 billion messages that I can insert rather quickly into the Kafka queue, and I'd like to get them into Hbase as quickly as possible. I'm looking for advice regarding # Kafka Topic Partitions / Streaming Duration / maxRatePerPartition / any other spark settings or code changes that I should make to try to get a better consumption rate. Thanks for all the help so far, this is the first Spark application I have written. On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams <disc...@uw.edu> wrote: > I'll try dropping the maxRatePerPartition=400, or maybe even lower. > However even at application starts up I have this large scheduling > delay. I will report my progress later on. > > On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <c...@koeninger.org> wrote: >> If your batch time is 1 second and your average processing time is >> 1.16 seconds, you're always going to be falling behind. That would >> explain why you've built up an hour of scheduling delay after eight >> hours of running. >> >> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <disc...@uw.edu> >> wrote: >>> Hi Mich again, >>> >>> Regarding batch window, etc. I have provided the sources, but I'm not >>> currently calling the window function. Did you see the program source? >>> It's only 100 lines. >>> >>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>> >>> Then I would expect I'm using defaults, other than what has been shown >>> in the configuration. >>> >>> For example: >>> >>> In the launcher configuration I set --conf >>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there >>> are 500 messages for the duration set in the application: >>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new >>> Duration(1000)); >>> >>> >>> Then with the --num-executors 6 \ submit flag, and the >>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we >>> arrive at the 3000 events per batch in the UI, pasted above. >>> >>> Feel free to correct me if I'm wrong. >>> >>> Then are you suggesting that I set the window? >>> >>> Maybe following this as reference: >>> >>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html >>> >>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh >>> <mich.talebza...@gmail.com> wrote: >>>> Ok >>>> >>>> What is the set up for these please? >>>> >>>> batch window >>>> window length >>>> sliding interval >>>> >>>> And also in each batch window how much data do you get in (no of messages >>>> in >>>> the topic whatever)? >>>> >>>> >>>> >>>> >>>> Dr Mich Talebzadeh >>>> >>>> >>>> >>>> LinkedIn >>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>> >>>> >>>> >>>> http://talebzadehmich.wordpress.com >>>> >>>> >>>> >>>> >>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mich.talebza...@gmail.com> >>>> wrote: >>>>> >>>>> I believe you have an issue with performance? >>>>> >>>>> have you checked spark GUI (default 4040) for details including shuffles >>>>> etc? >>>>> >>>>> HTH >>>>> >>>>> Dr Mich Talebzadeh >>>>> >>>>> >>>>> >>>>> LinkedIn >>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>> >>>>> >>>>> >>>>> http://talebzadehmich.wordpress.com >>>>> >>>>> >>>>> >>>>> >>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <disc...@uw.edu> wrote: >>>>>> >>>>>> There are 25 nodes in the spark cluster. >>>>>> >>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh >>>>>> <mich.talebza...@gmail.com> wrote: >>>>>> > how many nodes are in your cluster? >>>>>> > >>>>>> > --num-executors 6 \ >>>>>> > --driver-memory 4G \ >>>>>> > --executor-memory 2G \ >>>>>> > --total-executor-cores 12 \ >>>>>> > >>>>>> > >>>>>> > Dr Mich Talebzadeh >>>>>> > >>>>>> > >>>>>> > >>>>>> > LinkedIn >>>>>> > >>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>> > >>>>>> > >>>>>> > >>>>>> > http://talebzadehmich.wordpress.com >>>>>> > >>>>>> > >>>>>> > >>>>>> > >>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <disc...@uw.edu> >>>>>> > wrote: >>>>>> >> >>>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from >>>>>> >> Kafka using the direct api and inserts content into an hbase cluster, >>>>>> >> as described in this thread. I was away from this project for awhile >>>>>> >> due to events in my family. >>>>>> >> >>>>>> >> Currently my scheduling delay is high, but the processing time is >>>>>> >> stable around a second. I changed my setup to use 6 kafka partitions >>>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included >>>>>> >> some details below, including the script I use to launch the >>>>>> >> application. I'm using a Spark on Hbase library, whose version is >>>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong >>>>>> >> with my launch method that could be causing the delay, related to the >>>>>> >> included jars? >>>>>> >> >>>>>> >> Or is there something wrong with the very simple approach I'm taking >>>>>> >> for the application? >>>>>> >> >>>>>> >> Any advice is appriciated. >>>>>> >> >>>>>> >> >>>>>> >> The application: >>>>>> >> >>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>>>>> >> >>>>>> >> >>>>>> >> From the streaming UI I get something like: >>>>>> >> >>>>>> >> table Completed Batches (last 1000 out of 27136) >>>>>> >> >>>>>> >> >>>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total >>>>>> >> Delay (?) Output Ops: Succeeded/Total >>>>>> >> >>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1 >>>>>> >> >>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1 >>>>>> >> >>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1 >>>>>> >> >>>>>> >> >>>>>> >> Here's how I'm launching the spark application. >>>>>> >> >>>>>> >> >>>>>> >> #!/usr/bin/env bash >>>>>> >> >>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark >>>>>> >> >>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf >>>>>> >> >>>>>> >> export >>>>>> >> >>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar >>>>>> >> >>>>>> >> >>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \ >>>>>> >> >>>>>> >> --class com.example.KafkaToHbase \ >>>>>> >> >>>>>> >> --master spark://spark_master:7077 \ >>>>>> >> >>>>>> >> --deploy-mode client \ >>>>>> >> >>>>>> >> --num-executors 6 \ >>>>>> >> >>>>>> >> --driver-memory 4G \ >>>>>> >> >>>>>> >> --executor-memory 2G \ >>>>>> >> >>>>>> >> --total-executor-cores 12 \ >>>>>> >> >>>>>> >> --jars >>>>>> >> >>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar >>>>>> >> \ >>>>>> >> >>>>>> >> --conf spark.app.name="Kafka To Hbase" \ >>>>>> >> >>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \ >>>>>> >> >>>>>> >> --conf spark.eventLog.enabled=false \ >>>>>> >> >>>>>> >> --conf spark.eventLog.overwrite=true \ >>>>>> >> >>>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \ >>>>>> >> >>>>>> >> --conf spark.streaming.backpressure.enabled=false \ >>>>>> >> >>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \ >>>>>> >> >>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \ >>>>>> >> >>>>>> >> --driver-java-options >>>>>> >> >>>>>> >> >>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/* >>>>>> >> \ >>>>>> >> >>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable" >>>>>> >> "broker1:9092,broker2:9092" >>>>>> >> >>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams >>>>>> >> <disc...@uw.edu> >>>>>> >> wrote: >>>>>> >> > Thanks Cody, I can see that the partitions are well distributed... >>>>>> >> > Then I'm in the process of using the direct api. >>>>>> >> > >>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <c...@koeninger.org> >>>>>> >> > wrote: >>>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue >>>>>> >> >> (as long as producers are distributing across partitions evenly). >>>>>> >> >> >>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams >>>>>> >> >> <disc...@uw.edu> >>>>>> >> >> wrote: >>>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3 >>>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the >>>>>> >> >>> issue with the receiver was the large number of partitions. I had >>>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition >>>>>> >> >>> my >>>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first >>>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion >>>>>> >> >>> entries >>>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours. >>>>>> >> >>> >>>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my >>>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library >>>>>> >> >>> jars >>>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit >>>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running >>>>>> >> >>> without >>>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much >>>>>> >> >>> improvement using the same code, but I'll see how the direct api >>>>>> >> >>> handles it. In the end I can reduce the number of partitions in >>>>>> >> >>> Kafka >>>>>> >> >>> if it causes big performance issues. >>>>>> >> >>> >>>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger >>>>>> >> >>> <c...@koeninger.org> >>>>>> >> >>> wrote: >>>>>> >> >>>> print() isn't really the best way to benchmark things, since it >>>>>> >> >>>> calls >>>>>> >> >>>> take(10) under the covers, but 380 records / second for a single >>>>>> >> >>>> receiver doesn't sound right in any case. >>>>>> >> >>>> >>>>>> >> >>>> Am I understanding correctly that you're trying to process a >>>>>> >> >>>> large >>>>>> >> >>>> number of already-existing kafka messages, not keep up with an >>>>>> >> >>>> incoming stream? Can you give any details (e.g. hardware, number >>>>>> >> >>>> of >>>>>> >> >>>> topicpartitions, etc)? >>>>>> >> >>>> >>>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct >>>>>> >> >>>> streams, or >>>>>> >> >>>> even just kafkacat, as a baseline. >>>>>> >> >>>> >>>>>> >> >>>> >>>>>> >> >>>> >>>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams >>>>>> >> >>>> <disc...@uw.edu> wrote: >>>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list >>>>>> >> >>>>> archives >>>>>> >> >>>>> but >>>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going >>>>>> >> >>>>> to >>>>>> >> >>>>> use accumulators to make a counter, but then saw on the >>>>>> >> >>>>> Streaming >>>>>> >> >>>>> tab >>>>>> >> >>>>> the Receiver Statistics. Then I removed all other >>>>>> >> >>>>> "functionality" >>>>>> >> >>>>> except: >>>>>> >> >>>>> >>>>>> >> >>>>> >>>>>> >> >>>>> JavaPairReceiverInputDStream<byte[], byte[]> dstream = >>>>>> >> >>>>> KafkaUtils >>>>>> >> >>>>> //createStream(JavaStreamingContext jssc,Class<K> >>>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass, >>>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String> >>>>>> >> >>>>> kafkaParams, >>>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel) >>>>>> >> >>>>> .createStream(jssc, byte[].class, byte[].class, >>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, >>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap, >>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER()); >>>>>> >> >>>>> >>>>>> >> >>>>> dstream.print(); >>>>>> >> >>>>> >>>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing >>>>>> >> >>>>> around >>>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned >>>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records >>>>>> >> >>>>> / >>>>>> >> >>>>> second, just using the print output. This seems awfully high to >>>>>> >> >>>>> me, >>>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a >>>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again >>>>>> >> >>>>> using >>>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar >>>>>> >> >>>>> amount of reads. >>>>>> >> >>>>> >>>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the >>>>>> >> >>>>> expected >>>>>> >> >>>>> way >>>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly >>>>>> >> >>>>> wrong? >>>>>> >> >>>>> >>>>>> >> >>>>> My application looks like >>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>>>>> >> >>>>> >>>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger >>>>>> >> >>>>> <c...@koeninger.org> >>>>>> >> >>>>> wrote: >>>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase, >>>>>> >> >>>>>> just >>>>>> >> >>>>>> deserialize)? >>>>>> >> >>>>>> >>>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible? >>>>>> >> >>>>>> The >>>>>> >> >>>>>> kafka direct stream is available starting with 1.3. If you're >>>>>> >> >>>>>> stuck >>>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it, >>>>>> >> >>>>>> search >>>>>> >> >>>>>> the mailing list archives. >>>>>> >> >>>>>> >>>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams >>>>>> >> >>>>>> <disc...@uw.edu> wrote: >>>>>> >> >>>>>>> I've written an application to get content from a kafka topic >>>>>> >> >>>>>>> with >>>>>> >> >>>>>>> 1.7 >>>>>> >> >>>>>>> billion entries, get the protobuf serialized entries, and >>>>>> >> >>>>>>> insert >>>>>> >> >>>>>>> into >>>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark >>>>>> >> >>>>>>> 1.2. >>>>>> >> >>>>>>> >>>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting >>>>>> >> >>>>>>> between >>>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to >>>>>> >> >>>>>>> consume >>>>>> >> >>>>>>> the >>>>>> >> >>>>>>> entries. >>>>>> >> >>>>>>> >>>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the >>>>>> >> >>>>>>> bottleneck. >>>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and >>>>>> >> >>>>>>> didn't >>>>>> >> >>>>>>> notice any large performance difference. I've tried many >>>>>> >> >>>>>>> different >>>>>> >> >>>>>>> spark configuration options, but can't seem to get better >>>>>> >> >>>>>>> performance. >>>>>> >> >>>>>>> >>>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into >>>>>> >> >>>>>>> kafka >>>>>> >> >>>>>>> using >>>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion. >>>>>> >> >>>>>>> >>>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd >>>>>> >> >>>>>>> like to >>>>>> >> >>>>>>> at least get 10%. >>>>>> >> >>>>>>> >>>>>> >> >>>>>>> My application looks like >>>>>> >> >>>>>>> >>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>>>>> >> >>>>>>> >>>>>> >> >>>>>>> This is my first spark application. I'd appreciate any >>>>>> >> >>>>>>> assistance. >>>>>> >> >>>>>>> >>>>>> >> >>>>>>> >>>>>> >> >>>>>>> >>>>>> >> >>>>>>> --------------------------------------------------------------------- >>>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> >> >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >> >>>>>>> >>>>>> >> >>>> >>>>>> >> >>>> >>>>>> >> >>>> --------------------------------------------------------------------- >>>>>> >> >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> >> >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >> >>>> >>>>>> >> >>>>>> >> --------------------------------------------------------------------- >>>>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>> >> For additional commands, e-mail: user-h...@spark.apache.org >>>>>> >> >>>>>> > >>>>> >>>>> >>>> --------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org