@Cody I was able to bring my processing time down to a second by
setting maxRatePerPartition as discussed. My bad that I didn't
recognize it as the cause of my scheduling delay.

Since then I've tried experimenting with a larger Spark Context
duration. I've been trying to get some noticeable improvement
inserting messages from Kafka -> Hbase using the above application.
I'm currently getting around 3500 inserts / second on a 9 node hbase
cluster. So far, I haven't been able to get much more throughput. Then
I'm looking for advice here how I should tune Kafka and Spark for this
job.

I can create a kafka topic with as many partitions that I want. I can
set the Duration and maxRatePerPartition. I have 1.7 billion messages
that I can insert rather quickly into the Kafka queue, and I'd like to
get them into Hbase as quickly as possible.

I'm looking for advice regarding # Kafka Topic Partitions / Streaming
Duration / maxRatePerPartition / any other spark settings or code
changes that I should make to try to get a better consumption rate.

Thanks for all the help so far, this is the first Spark application I
have written.

On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams <disc...@uw.edu> wrote:
> I'll try dropping the maxRatePerPartition=400, or maybe even lower.
> However even at application starts up I have this large scheduling
> delay. I will report my progress later on.
>
> On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <c...@koeninger.org> wrote:
>> If your batch time is 1 second and your average processing time is
>> 1.16 seconds, you're always going to be falling behind.  That would
>> explain why you've built up an hour of scheduling delay after eight
>> hours of running.
>>
>> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <disc...@uw.edu> 
>> wrote:
>>> Hi Mich again,
>>>
>>> Regarding batch window, etc. I have provided the sources, but I'm not
>>> currently calling the window function. Did you see the program source?
>>> It's only 100 lines.
>>>
>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>
>>> Then I would expect I'm using defaults, other than what has been shown
>>> in the configuration.
>>>
>>> For example:
>>>
>>> In the launcher configuration I set --conf
>>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there
>>> are 500 messages for the duration set in the application:
>>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new
>>> Duration(1000));
>>>
>>>
>>> Then with the --num-executors 6 \ submit flag, and the
>>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we
>>> arrive at the 3000 events per batch in the UI, pasted above.
>>>
>>> Feel free to correct me if I'm wrong.
>>>
>>> Then are you suggesting that I set the window?
>>>
>>> Maybe following this as reference:
>>>
>>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html
>>>
>>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh
>>> <mich.talebza...@gmail.com> wrote:
>>>> Ok
>>>>
>>>> What is the set up for these please?
>>>>
>>>> batch window
>>>> window length
>>>> sliding interval
>>>>
>>>> And also in each batch window how much data do you get in (no of messages 
>>>> in
>>>> the topic whatever)?
>>>>
>>>>
>>>>
>>>>
>>>> Dr Mich Talebzadeh
>>>>
>>>>
>>>>
>>>> LinkedIn
>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>
>>>>
>>>>
>>>> http://talebzadehmich.wordpress.com
>>>>
>>>>
>>>>
>>>>
>>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mich.talebza...@gmail.com> 
>>>> wrote:
>>>>>
>>>>> I believe you have an issue with performance?
>>>>>
>>>>> have you checked spark GUI (default 4040) for details including shuffles
>>>>> etc?
>>>>>
>>>>> HTH
>>>>>
>>>>> Dr Mich Talebzadeh
>>>>>
>>>>>
>>>>>
>>>>> LinkedIn
>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>
>>>>>
>>>>>
>>>>> http://talebzadehmich.wordpress.com
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <disc...@uw.edu> wrote:
>>>>>>
>>>>>> There are 25 nodes in the spark cluster.
>>>>>>
>>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh
>>>>>> <mich.talebza...@gmail.com> wrote:
>>>>>> > how many nodes are in your cluster?
>>>>>> >
>>>>>> > --num-executors 6 \
>>>>>> >  --driver-memory 4G \
>>>>>> >  --executor-memory 2G \
>>>>>> >  --total-executor-cores 12 \
>>>>>> >
>>>>>> >
>>>>>> > Dr Mich Talebzadeh
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > LinkedIn
>>>>>> >
>>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > http://talebzadehmich.wordpress.com
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> >
>>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <disc...@uw.edu>
>>>>>> > wrote:
>>>>>> >>
>>>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from
>>>>>> >> Kafka using the direct api and inserts content into an hbase cluster,
>>>>>> >> as described in this thread. I was away from this project for awhile
>>>>>> >> due to events in my family.
>>>>>> >>
>>>>>> >> Currently my scheduling delay is high, but the processing time is
>>>>>> >> stable around a second. I changed my setup to use 6 kafka partitions
>>>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included
>>>>>> >> some details below, including the script I use to launch the
>>>>>> >> application. I'm using a Spark on Hbase library, whose version is
>>>>>> >> relevant to my Hbase cluster. Is it apparent there is something wrong
>>>>>> >> with my launch method that could be causing the delay, related to the
>>>>>> >> included jars?
>>>>>> >>
>>>>>> >> Or is there something wrong with the very simple approach I'm taking
>>>>>> >> for the application?
>>>>>> >>
>>>>>> >> Any advice is appriciated.
>>>>>> >>
>>>>>> >>
>>>>>> >> The application:
>>>>>> >>
>>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>> >>
>>>>>> >>
>>>>>> >> From the streaming UI I get something like:
>>>>>> >>
>>>>>> >> table Completed Batches (last 1000 out of 27136)
>>>>>> >>
>>>>>> >>
>>>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) Total
>>>>>> >> Delay (?) Output Ops: Succeeded/Total
>>>>>> >>
>>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>> >>
>>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>> >>
>>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1
>>>>>> >>
>>>>>> >>
>>>>>> >> Here's how I'm launching the spark application.
>>>>>> >>
>>>>>> >>
>>>>>> >> #!/usr/bin/env bash
>>>>>> >>
>>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark
>>>>>> >>
>>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf
>>>>>> >>
>>>>>> >> export
>>>>>> >>
>>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar
>>>>>> >>
>>>>>> >>
>>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \
>>>>>> >>
>>>>>> >> --class com.example.KafkaToHbase \
>>>>>> >>
>>>>>> >> --master spark://spark_master:7077 \
>>>>>> >>
>>>>>> >> --deploy-mode client \
>>>>>> >>
>>>>>> >> --num-executors 6 \
>>>>>> >>
>>>>>> >> --driver-memory 4G \
>>>>>> >>
>>>>>> >> --executor-memory 2G \
>>>>>> >>
>>>>>> >> --total-executor-cores 12 \
>>>>>> >>
>>>>>> >> --jars
>>>>>> >>
>>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar
>>>>>> >> \
>>>>>> >>
>>>>>> >> --conf spark.app.name="Kafka To Hbase" \
>>>>>> >>
>>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" \
>>>>>> >>
>>>>>> >> --conf spark.eventLog.enabled=false \
>>>>>> >>
>>>>>> >> --conf spark.eventLog.overwrite=true \
>>>>>> >>
>>>>>> >> --conf spark.serializer=org.apache.spark.serializer.KryoSerializer \
>>>>>> >>
>>>>>> >> --conf spark.streaming.backpressure.enabled=false \
>>>>>> >>
>>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \
>>>>>> >>
>>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \
>>>>>> >>
>>>>>> >> --driver-java-options
>>>>>> >>
>>>>>> >>
>>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*
>>>>>> >> \
>>>>>> >>
>>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable"
>>>>>> >> "broker1:9092,broker2:9092"
>>>>>> >>
>>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams
>>>>>> >> <disc...@uw.edu>
>>>>>> >> wrote:
>>>>>> >> > Thanks Cody, I can see that the partitions are well distributed...
>>>>>> >> > Then I'm in the process of using the direct api.
>>>>>> >> >
>>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger <c...@koeninger.org>
>>>>>> >> > wrote:
>>>>>> >> >> 60 partitions in and of itself shouldn't be a big performance issue
>>>>>> >> >> (as long as producers are distributing across partitions evenly).
>>>>>> >> >>
>>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams
>>>>>> >> >> <disc...@uw.edu>
>>>>>> >> >> wrote:
>>>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions on 3
>>>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. Maybe the
>>>>>> >> >>> issue with the receiver was the large number of partitions. I had
>>>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to partition
>>>>>> >> >>> my
>>>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a first
>>>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion
>>>>>> >> >>> entries
>>>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours.
>>>>>> >> >>>
>>>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently putting my
>>>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 library
>>>>>> >> >>> jars
>>>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a bit
>>>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running
>>>>>> >> >>> without
>>>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too much
>>>>>> >> >>> improvement using the same code, but I'll see how the direct api
>>>>>> >> >>> handles it. In the end I can reduce the number of partitions in
>>>>>> >> >>> Kafka
>>>>>> >> >>> if it causes big performance issues.
>>>>>> >> >>>
>>>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger
>>>>>> >> >>> <c...@koeninger.org>
>>>>>> >> >>> wrote:
>>>>>> >> >>>> print() isn't really the best way to benchmark things, since it
>>>>>> >> >>>> calls
>>>>>> >> >>>> take(10) under the covers, but 380 records / second for a single
>>>>>> >> >>>> receiver doesn't sound right in any case.
>>>>>> >> >>>>
>>>>>> >> >>>> Am I understanding correctly that you're trying to process a
>>>>>> >> >>>> large
>>>>>> >> >>>> number of already-existing kafka messages, not keep up with an
>>>>>> >> >>>> incoming stream?  Can you give any details (e.g. hardware, number
>>>>>> >> >>>> of
>>>>>> >> >>>> topicpartitions, etc)?
>>>>>> >> >>>>
>>>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct
>>>>>> >> >>>> streams, or
>>>>>> >> >>>> even just kafkacat, as a baseline.
>>>>>> >> >>>>
>>>>>> >> >>>>
>>>>>> >> >>>>
>>>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams
>>>>>> >> >>>> <disc...@uw.edu> wrote:
>>>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list
>>>>>> >> >>>>> archives
>>>>>> >> >>>>> but
>>>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was going
>>>>>> >> >>>>> to
>>>>>> >> >>>>> use accumulators to make a counter, but then saw on the
>>>>>> >> >>>>> Streaming
>>>>>> >> >>>>> tab
>>>>>> >> >>>>> the Receiver Statistics. Then I removed all other
>>>>>> >> >>>>> "functionality"
>>>>>> >> >>>>> except:
>>>>>> >> >>>>>
>>>>>> >> >>>>>
>>>>>> >> >>>>>     JavaPairReceiverInputDStream<byte[], byte[]> dstream =
>>>>>> >> >>>>> KafkaUtils
>>>>>> >> >>>>>       //createStream(JavaStreamingContext jssc,Class<K>
>>>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> keyDecoderClass,
>>>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String>
>>>>>> >> >>>>> kafkaParams,
>>>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel storageLevel)
>>>>>> >> >>>>>       .createStream(jssc, byte[].class, byte[].class,
>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class,
>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, topicMap,
>>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER());
>>>>>> >> >>>>>
>>>>>> >> >>>>>        dstream.print();
>>>>>> >> >>>>>
>>>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm seeing
>>>>>> >> >>>>> around
>>>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% mentioned
>>>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 records
>>>>>> >> >>>>> /
>>>>>> >> >>>>> second, just using the print output. This seems awfully high to
>>>>>> >> >>>>> me,
>>>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka from a
>>>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. Again
>>>>>> >> >>>>> using
>>>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a similar
>>>>>> >> >>>>> amount of reads.
>>>>>> >> >>>>>
>>>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the
>>>>>> >> >>>>> expected
>>>>>> >> >>>>> way
>>>>>> >> >>>>> to use the Kafka streaming API, or am I doing something terribly
>>>>>> >> >>>>> wrong?
>>>>>> >> >>>>>
>>>>>> >> >>>>> My application looks like
>>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>> >> >>>>>
>>>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger
>>>>>> >> >>>>> <c...@koeninger.org>
>>>>>> >> >>>>> wrote:
>>>>>> >> >>>>>> Have you tested for read throughput (without writing to hbase,
>>>>>> >> >>>>>> just
>>>>>> >> >>>>>> deserialize)?
>>>>>> >> >>>>>>
>>>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading possible?
>>>>>> >> >>>>>> The
>>>>>> >> >>>>>> kafka direct stream is available starting with 1.3.  If you're
>>>>>> >> >>>>>> stuck
>>>>>> >> >>>>>> on 1.2, I believe there have been some attempts to backport it,
>>>>>> >> >>>>>> search
>>>>>> >> >>>>>> the mailing list archives.
>>>>>> >> >>>>>>
>>>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams
>>>>>> >> >>>>>> <disc...@uw.edu> wrote:
>>>>>> >> >>>>>>> I've written an application to get content from a kafka topic
>>>>>> >> >>>>>>> with
>>>>>> >> >>>>>>> 1.7
>>>>>> >> >>>>>>> billion entries,  get the protobuf serialized entries, and
>>>>>> >> >>>>>>> insert
>>>>>> >> >>>>>>> into
>>>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is Spark
>>>>>> >> >>>>>>> 1.2.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting
>>>>>> >> >>>>>>> between
>>>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to
>>>>>> >> >>>>>>> consume
>>>>>> >> >>>>>>> the
>>>>>> >> >>>>>>> entries.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the
>>>>>> >> >>>>>>> bottleneck.
>>>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, and
>>>>>> >> >>>>>>> didn't
>>>>>> >> >>>>>>> notice any large performance difference. I've tried many
>>>>>> >> >>>>>>> different
>>>>>> >> >>>>>>> spark configuration options, but can't seem to get better
>>>>>> >> >>>>>>> performance.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into
>>>>>> >> >>>>>>> kafka
>>>>>> >> >>>>>>> using
>>>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> While hbase inserts might not deliver the same throughput, I'd
>>>>>> >> >>>>>>> like to
>>>>>> >> >>>>>>> at least get 10%.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> My application looks like
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> This is my first spark application. I'd appreciate any
>>>>>> >> >>>>>>> assistance.
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>>
>>>>>> >> >>>>>>> ---------------------------------------------------------------------
>>>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> >> >>>>>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>> >> >>>>>>>
>>>>>> >> >>>>
>>>>>> >> >>>>
>>>>>> >> >>>> ---------------------------------------------------------------------
>>>>>> >> >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> >> >>>> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>> >> >>>>
>>>>>> >>
>>>>>> >> ---------------------------------------------------------------------
>>>>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
>>>>>> >> For additional commands, e-mail: user-h...@spark.apache.org
>>>>>> >>
>>>>>> >
>>>>>
>>>>>
>>>>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to