Streaming UI tab showing empty events and very different metrics than on 1.5.2
On Thu, Jun 23, 2016 at 5:06 AM, Colin Kincaid Williams <disc...@uw.edu> wrote: > After a bit of effort I moved from a Spark cluster running 1.5.2, to a > Yarn cluster running 1.6.1 jars. I'm still setting the maxRPP. The > completed batches are no longer showing the number of events processed > in the Streaming UI tab . I'm getting around 4k inserts per second in > hbase, but I haven't yet tried to remove or reset the mRPP. I will > attach a screenshot of the UI tab. It shows significantly lower > figures for processing and delay times, than the previous posted shot. > It also shows the batches as empty, however I see the requests hitting > hbase. > > Then it's possible my issues were related to running on the Spark > 1.5.2 cluster. Also is the missing event count in the completed > batches a bug? Should I file an issue? > > On Tue, Jun 21, 2016 at 9:04 PM, Colin Kincaid Williams <disc...@uw.edu> > wrote: >> Thanks @Cody, I will try that out. In the interm, I tried to validate >> my Hbase cluster by running a random write test and see 30-40K writes >> per second. This suggests there is noticeable room for improvement. >> >> On Tue, Jun 21, 2016 at 8:32 PM, Cody Koeninger <c...@koeninger.org> wrote: >>> Take HBase out of the equation and just measure what your read >>> performance is by doing something like >>> >>> createDirectStream(...).foreach(_.println) >>> >>> not take() or print() >>> >>> On Tue, Jun 21, 2016 at 3:19 PM, Colin Kincaid Williams <disc...@uw.edu> >>> wrote: >>>> @Cody I was able to bring my processing time down to a second by >>>> setting maxRatePerPartition as discussed. My bad that I didn't >>>> recognize it as the cause of my scheduling delay. >>>> >>>> Since then I've tried experimenting with a larger Spark Context >>>> duration. I've been trying to get some noticeable improvement >>>> inserting messages from Kafka -> Hbase using the above application. >>>> I'm currently getting around 3500 inserts / second on a 9 node hbase >>>> cluster. So far, I haven't been able to get much more throughput. Then >>>> I'm looking for advice here how I should tune Kafka and Spark for this >>>> job. >>>> >>>> I can create a kafka topic with as many partitions that I want. I can >>>> set the Duration and maxRatePerPartition. I have 1.7 billion messages >>>> that I can insert rather quickly into the Kafka queue, and I'd like to >>>> get them into Hbase as quickly as possible. >>>> >>>> I'm looking for advice regarding # Kafka Topic Partitions / Streaming >>>> Duration / maxRatePerPartition / any other spark settings or code >>>> changes that I should make to try to get a better consumption rate. >>>> >>>> Thanks for all the help so far, this is the first Spark application I >>>> have written. >>>> >>>> On Mon, Jun 20, 2016 at 12:32 PM, Colin Kincaid Williams <disc...@uw.edu> >>>> wrote: >>>>> I'll try dropping the maxRatePerPartition=400, or maybe even lower. >>>>> However even at application starts up I have this large scheduling >>>>> delay. I will report my progress later on. >>>>> >>>>> On Mon, Jun 20, 2016 at 2:12 PM, Cody Koeninger <c...@koeninger.org> >>>>> wrote: >>>>>> If your batch time is 1 second and your average processing time is >>>>>> 1.16 seconds, you're always going to be falling behind. That would >>>>>> explain why you've built up an hour of scheduling delay after eight >>>>>> hours of running. >>>>>> >>>>>> On Sat, Jun 18, 2016 at 4:40 PM, Colin Kincaid Williams <disc...@uw.edu> >>>>>> wrote: >>>>>>> Hi Mich again, >>>>>>> >>>>>>> Regarding batch window, etc. I have provided the sources, but I'm not >>>>>>> currently calling the window function. Did you see the program source? >>>>>>> It's only 100 lines. >>>>>>> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>>>>>> >>>>>>> Then I would expect I'm using defaults, other than what has been shown >>>>>>> in the configuration. >>>>>>> >>>>>>> For example: >>>>>>> >>>>>>> In the launcher configuration I set --conf >>>>>>> spark.streaming.kafka.maxRatePerPartition=500 \ and I believe there >>>>>>> are 500 messages for the duration set in the application: >>>>>>> JavaStreamingContext jssc = new JavaStreamingContext(jsc, new >>>>>>> Duration(1000)); >>>>>>> >>>>>>> >>>>>>> Then with the --num-executors 6 \ submit flag, and the >>>>>>> spark.streaming.kafka.maxRatePerPartition=500 I think that's how we >>>>>>> arrive at the 3000 events per batch in the UI, pasted above. >>>>>>> >>>>>>> Feel free to correct me if I'm wrong. >>>>>>> >>>>>>> Then are you suggesting that I set the window? >>>>>>> >>>>>>> Maybe following this as reference: >>>>>>> >>>>>>> https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/windows.html >>>>>>> >>>>>>> On Sat, Jun 18, 2016 at 8:08 PM, Mich Talebzadeh >>>>>>> <mich.talebza...@gmail.com> wrote: >>>>>>>> Ok >>>>>>>> >>>>>>>> What is the set up for these please? >>>>>>>> >>>>>>>> batch window >>>>>>>> window length >>>>>>>> sliding interval >>>>>>>> >>>>>>>> And also in each batch window how much data do you get in (no of >>>>>>>> messages in >>>>>>>> the topic whatever)? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> Dr Mich Talebzadeh >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> LinkedIn >>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On 18 June 2016 at 21:01, Mich Talebzadeh <mich.talebza...@gmail.com> >>>>>>>> wrote: >>>>>>>>> >>>>>>>>> I believe you have an issue with performance? >>>>>>>>> >>>>>>>>> have you checked spark GUI (default 4040) for details including >>>>>>>>> shuffles >>>>>>>>> etc? >>>>>>>>> >>>>>>>>> HTH >>>>>>>>> >>>>>>>>> Dr Mich Talebzadeh >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> LinkedIn >>>>>>>>> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> http://talebzadehmich.wordpress.com >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On 18 June 2016 at 20:59, Colin Kincaid Williams <disc...@uw.edu> >>>>>>>>> wrote: >>>>>>>>>> >>>>>>>>>> There are 25 nodes in the spark cluster. >>>>>>>>>> >>>>>>>>>> On Sat, Jun 18, 2016 at 7:53 PM, Mich Talebzadeh >>>>>>>>>> <mich.talebza...@gmail.com> wrote: >>>>>>>>>> > how many nodes are in your cluster? >>>>>>>>>> > >>>>>>>>>> > --num-executors 6 \ >>>>>>>>>> > --driver-memory 4G \ >>>>>>>>>> > --executor-memory 2G \ >>>>>>>>>> > --total-executor-cores 12 \ >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > Dr Mich Talebzadeh >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > LinkedIn >>>>>>>>>> > >>>>>>>>>> > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > http://talebzadehmich.wordpress.com >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > >>>>>>>>>> > On 18 June 2016 at 20:40, Colin Kincaid Williams <disc...@uw.edu> >>>>>>>>>> > wrote: >>>>>>>>>> >> >>>>>>>>>> >> I updated my app to Spark 1.5.2 streaming so that it consumes from >>>>>>>>>> >> Kafka using the direct api and inserts content into an hbase >>>>>>>>>> >> cluster, >>>>>>>>>> >> as described in this thread. I was away from this project for >>>>>>>>>> >> awhile >>>>>>>>>> >> due to events in my family. >>>>>>>>>> >> >>>>>>>>>> >> Currently my scheduling delay is high, but the processing time is >>>>>>>>>> >> stable around a second. I changed my setup to use 6 kafka >>>>>>>>>> >> partitions >>>>>>>>>> >> on a set of smaller kafka brokers, with fewer disks. I've included >>>>>>>>>> >> some details below, including the script I use to launch the >>>>>>>>>> >> application. I'm using a Spark on Hbase library, whose version is >>>>>>>>>> >> relevant to my Hbase cluster. Is it apparent there is something >>>>>>>>>> >> wrong >>>>>>>>>> >> with my launch method that could be causing the delay, related to >>>>>>>>>> >> the >>>>>>>>>> >> included jars? >>>>>>>>>> >> >>>>>>>>>> >> Or is there something wrong with the very simple approach I'm >>>>>>>>>> >> taking >>>>>>>>>> >> for the application? >>>>>>>>>> >> >>>>>>>>>> >> Any advice is appriciated. >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> The application: >>>>>>>>>> >> >>>>>>>>>> >> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> From the streaming UI I get something like: >>>>>>>>>> >> >>>>>>>>>> >> table Completed Batches (last 1000 out of 27136) >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> Batch Time Input Size Scheduling Delay (?) Processing Time (?) >>>>>>>>>> >> Total >>>>>>>>>> >> Delay (?) Output Ops: Succeeded/Total >>>>>>>>>> >> >>>>>>>>>> >> 2016/06/18 11:21:32 3000 events 1.2 h 1 s 1.2 h 1/1 >>>>>>>>>> >> >>>>>>>>>> >> 2016/06/18 11:21:31 3000 events 1.2 h 1 s 1.2 h 1/1 >>>>>>>>>> >> >>>>>>>>>> >> 2016/06/18 11:21:30 3000 events 1.2 h 1 s 1.2 h 1/1 >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> Here's how I'm launching the spark application. >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> #!/usr/bin/env bash >>>>>>>>>> >> >>>>>>>>>> >> export SPARK_CONF_DIR=/home/colin.williams/spark >>>>>>>>>> >> >>>>>>>>>> >> export HADOOP_CONF_DIR=/etc/hadoop/conf >>>>>>>>>> >> >>>>>>>>>> >> export >>>>>>>>>> >> >>>>>>>>>> >> HADOOP_CLASSPATH=/home/colin.williams/hbase/conf/:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/*:/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/hbase-protocol-0.98.6-cdh5.3.0.jar >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> /opt/spark-1.5.2-bin-hadoop2.4/bin/spark-submit \ >>>>>>>>>> >> >>>>>>>>>> >> --class com.example.KafkaToHbase \ >>>>>>>>>> >> >>>>>>>>>> >> --master spark://spark_master:7077 \ >>>>>>>>>> >> >>>>>>>>>> >> --deploy-mode client \ >>>>>>>>>> >> >>>>>>>>>> >> --num-executors 6 \ >>>>>>>>>> >> >>>>>>>>>> >> --driver-memory 4G \ >>>>>>>>>> >> >>>>>>>>>> >> --executor-memory 2G \ >>>>>>>>>> >> >>>>>>>>>> >> --total-executor-cores 12 \ >>>>>>>>>> >> >>>>>>>>>> >> --jars >>>>>>>>>> >> >>>>>>>>>> >> /home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/zookeeper/zookeeper-3.4.5-cdh5.3.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/guava-12.0.1.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/protobuf-java-2.5.0.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-protocol.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-client.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-common.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop2-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-hadoop-compat.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/hbase-server.jar,/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/htrace-core.jar >>>>>>>>>> >> \ >>>>>>>>>> >> >>>>>>>>>> >> --conf spark.app.name="Kafka To Hbase" \ >>>>>>>>>> >> >>>>>>>>>> >> --conf spark.eventLog.dir="hdfs:///user/spark/applicationHistory" >>>>>>>>>> >> \ >>>>>>>>>> >> >>>>>>>>>> >> --conf spark.eventLog.enabled=false \ >>>>>>>>>> >> >>>>>>>>>> >> --conf spark.eventLog.overwrite=true \ >>>>>>>>>> >> >>>>>>>>>> >> --conf >>>>>>>>>> >> spark.serializer=org.apache.spark.serializer.KryoSerializer \ >>>>>>>>>> >> >>>>>>>>>> >> --conf spark.streaming.backpressure.enabled=false \ >>>>>>>>>> >> >>>>>>>>>> >> --conf spark.streaming.kafka.maxRatePerPartition=500 \ >>>>>>>>>> >> >>>>>>>>>> >> --driver-class-path /home/colin.williams/kafka-hbase.jar \ >>>>>>>>>> >> >>>>>>>>>> >> --driver-java-options >>>>>>>>>> >> >>>>>>>>>> >> >>>>>>>>>> >> -Dspark.executor.extraClassPath=/home/colin.williams/CDH-5.3.0-1.cdh5.3.0.p0.30/lib/hbase/lib/* >>>>>>>>>> >> \ >>>>>>>>>> >> >>>>>>>>>> >> /home/colin.williams/kafka-hbase.jar "FromTable" "ToTable" >>>>>>>>>> >> "broker1:9092,broker2:9092" >>>>>>>>>> >> >>>>>>>>>> >> On Tue, May 3, 2016 at 8:20 PM, Colin Kincaid Williams >>>>>>>>>> >> <disc...@uw.edu> >>>>>>>>>> >> wrote: >>>>>>>>>> >> > Thanks Cody, I can see that the partitions are well >>>>>>>>>> >> > distributed... >>>>>>>>>> >> > Then I'm in the process of using the direct api. >>>>>>>>>> >> > >>>>>>>>>> >> > On Tue, May 3, 2016 at 6:51 PM, Cody Koeninger >>>>>>>>>> >> > <c...@koeninger.org> >>>>>>>>>> >> > wrote: >>>>>>>>>> >> >> 60 partitions in and of itself shouldn't be a big performance >>>>>>>>>> >> >> issue >>>>>>>>>> >> >> (as long as producers are distributing across partitions >>>>>>>>>> >> >> evenly). >>>>>>>>>> >> >> >>>>>>>>>> >> >> On Tue, May 3, 2016 at 1:44 PM, Colin Kincaid Williams >>>>>>>>>> >> >> <disc...@uw.edu> >>>>>>>>>> >> >> wrote: >>>>>>>>>> >> >>> Thanks again Cody. Regarding the details 66 kafka partitions >>>>>>>>>> >> >>> on 3 >>>>>>>>>> >> >>> kafka servers, likely 8 core systems with 10 disks each. >>>>>>>>>> >> >>> Maybe the >>>>>>>>>> >> >>> issue with the receiver was the large number of partitions. I >>>>>>>>>> >> >>> had >>>>>>>>>> >> >>> miscounted the disks and so 11*3*2 is how I decided to >>>>>>>>>> >> >>> partition >>>>>>>>>> >> >>> my >>>>>>>>>> >> >>> topic on insertion, ( by my own, unjustified reasoning, on a >>>>>>>>>> >> >>> first >>>>>>>>>> >> >>> attempt ) . This worked well enough for me, I put 1.7 billion >>>>>>>>>> >> >>> entries >>>>>>>>>> >> >>> into Kafka on a map reduce job in 5 and a half hours. >>>>>>>>>> >> >>> >>>>>>>>>> >> >>> I was concerned using spark 1.5.2 because I'm currently >>>>>>>>>> >> >>> putting my >>>>>>>>>> >> >>> data into a CDH 5.3 HDFS cluster, using hbase-spark .98 >>>>>>>>>> >> >>> library >>>>>>>>>> >> >>> jars >>>>>>>>>> >> >>> built for spark 1.2 on CDH 5.3. But after debugging quite a >>>>>>>>>> >> >>> bit >>>>>>>>>> >> >>> yesterday, I tried building against 1.5.2. So far it's running >>>>>>>>>> >> >>> without >>>>>>>>>> >> >>> issue on a Spark 1.5.2 cluster. I'm not sure there was too >>>>>>>>>> >> >>> much >>>>>>>>>> >> >>> improvement using the same code, but I'll see how the direct >>>>>>>>>> >> >>> api >>>>>>>>>> >> >>> handles it. In the end I can reduce the number of partitions >>>>>>>>>> >> >>> in >>>>>>>>>> >> >>> Kafka >>>>>>>>>> >> >>> if it causes big performance issues. >>>>>>>>>> >> >>> >>>>>>>>>> >> >>> On Tue, May 3, 2016 at 4:08 AM, Cody Koeninger >>>>>>>>>> >> >>> <c...@koeninger.org> >>>>>>>>>> >> >>> wrote: >>>>>>>>>> >> >>>> print() isn't really the best way to benchmark things, since >>>>>>>>>> >> >>>> it >>>>>>>>>> >> >>>> calls >>>>>>>>>> >> >>>> take(10) under the covers, but 380 records / second for a >>>>>>>>>> >> >>>> single >>>>>>>>>> >> >>>> receiver doesn't sound right in any case. >>>>>>>>>> >> >>>> >>>>>>>>>> >> >>>> Am I understanding correctly that you're trying to process a >>>>>>>>>> >> >>>> large >>>>>>>>>> >> >>>> number of already-existing kafka messages, not keep up with >>>>>>>>>> >> >>>> an >>>>>>>>>> >> >>>> incoming stream? Can you give any details (e.g. hardware, >>>>>>>>>> >> >>>> number >>>>>>>>>> >> >>>> of >>>>>>>>>> >> >>>> topicpartitions, etc)? >>>>>>>>>> >> >>>> >>>>>>>>>> >> >>>> Really though, I'd try to start with spark 1.6 and direct >>>>>>>>>> >> >>>> streams, or >>>>>>>>>> >> >>>> even just kafkacat, as a baseline. >>>>>>>>>> >> >>>> >>>>>>>>>> >> >>>> >>>>>>>>>> >> >>>> >>>>>>>>>> >> >>>> On Mon, May 2, 2016 at 7:01 PM, Colin Kincaid Williams >>>>>>>>>> >> >>>> <disc...@uw.edu> wrote: >>>>>>>>>> >> >>>>> Hello again. I searched for "backport kafka" in the list >>>>>>>>>> >> >>>>> archives >>>>>>>>>> >> >>>>> but >>>>>>>>>> >> >>>>> couldn't find anything but a post from Spark 0.7.2 . I was >>>>>>>>>> >> >>>>> going >>>>>>>>>> >> >>>>> to >>>>>>>>>> >> >>>>> use accumulators to make a counter, but then saw on the >>>>>>>>>> >> >>>>> Streaming >>>>>>>>>> >> >>>>> tab >>>>>>>>>> >> >>>>> the Receiver Statistics. Then I removed all other >>>>>>>>>> >> >>>>> "functionality" >>>>>>>>>> >> >>>>> except: >>>>>>>>>> >> >>>>> >>>>>>>>>> >> >>>>> >>>>>>>>>> >> >>>>> JavaPairReceiverInputDStream<byte[], byte[]> dstream = >>>>>>>>>> >> >>>>> KafkaUtils >>>>>>>>>> >> >>>>> //createStream(JavaStreamingContext jssc,Class<K> >>>>>>>>>> >> >>>>> keyTypeClass,Class<V> valueTypeClass, Class<U> >>>>>>>>>> >> >>>>> keyDecoderClass, >>>>>>>>>> >> >>>>> Class<T> valueDecoderClass, java.util.Map<String,String> >>>>>>>>>> >> >>>>> kafkaParams, >>>>>>>>>> >> >>>>> java.util.Map<String,Integer> topics, StorageLevel >>>>>>>>>> >> >>>>> storageLevel) >>>>>>>>>> >> >>>>> .createStream(jssc, byte[].class, byte[].class, >>>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, >>>>>>>>>> >> >>>>> kafka.serializer.DefaultDecoder.class, kafkaParamsMap, >>>>>>>>>> >> >>>>> topicMap, >>>>>>>>>> >> >>>>> StorageLevel.MEMORY_AND_DISK_SER()); >>>>>>>>>> >> >>>>> >>>>>>>>>> >> >>>>> dstream.print(); >>>>>>>>>> >> >>>>> >>>>>>>>>> >> >>>>> Then in the Recieiver Stats for the single receiver, I'm >>>>>>>>>> >> >>>>> seeing >>>>>>>>>> >> >>>>> around >>>>>>>>>> >> >>>>> 380 records / second. Then to get anywhere near my 10% >>>>>>>>>> >> >>>>> mentioned >>>>>>>>>> >> >>>>> above, I'd need to run around 21 receivers, assuming 380 >>>>>>>>>> >> >>>>> records >>>>>>>>>> >> >>>>> / >>>>>>>>>> >> >>>>> second, just using the print output. This seems awfully >>>>>>>>>> >> >>>>> high to >>>>>>>>>> >> >>>>> me, >>>>>>>>>> >> >>>>> considering that I wrote 80000+ records a second to Kafka >>>>>>>>>> >> >>>>> from a >>>>>>>>>> >> >>>>> mapreduce job, and that my bottleneck was likely Hbase. >>>>>>>>>> >> >>>>> Again >>>>>>>>>> >> >>>>> using >>>>>>>>>> >> >>>>> the 380 estimate, I would need 200+ receivers to reach a >>>>>>>>>> >> >>>>> similar >>>>>>>>>> >> >>>>> amount of reads. >>>>>>>>>> >> >>>>> >>>>>>>>>> >> >>>>> Even given the issues with the 1.2 receivers, is this the >>>>>>>>>> >> >>>>> expected >>>>>>>>>> >> >>>>> way >>>>>>>>>> >> >>>>> to use the Kafka streaming API, or am I doing something >>>>>>>>>> >> >>>>> terribly >>>>>>>>>> >> >>>>> wrong? >>>>>>>>>> >> >>>>> >>>>>>>>>> >> >>>>> My application looks like >>>>>>>>>> >> >>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>>>>>>>>> >> >>>>> >>>>>>>>>> >> >>>>> On Mon, May 2, 2016 at 6:09 PM, Cody Koeninger >>>>>>>>>> >> >>>>> <c...@koeninger.org> >>>>>>>>>> >> >>>>> wrote: >>>>>>>>>> >> >>>>>> Have you tested for read throughput (without writing to >>>>>>>>>> >> >>>>>> hbase, >>>>>>>>>> >> >>>>>> just >>>>>>>>>> >> >>>>>> deserialize)? >>>>>>>>>> >> >>>>>> >>>>>>>>>> >> >>>>>> Are you limited to using spark 1.2, or is upgrading >>>>>>>>>> >> >>>>>> possible? >>>>>>>>>> >> >>>>>> The >>>>>>>>>> >> >>>>>> kafka direct stream is available starting with 1.3. If >>>>>>>>>> >> >>>>>> you're >>>>>>>>>> >> >>>>>> stuck >>>>>>>>>> >> >>>>>> on 1.2, I believe there have been some attempts to >>>>>>>>>> >> >>>>>> backport it, >>>>>>>>>> >> >>>>>> search >>>>>>>>>> >> >>>>>> the mailing list archives. >>>>>>>>>> >> >>>>>> >>>>>>>>>> >> >>>>>> On Mon, May 2, 2016 at 12:54 PM, Colin Kincaid Williams >>>>>>>>>> >> >>>>>> <disc...@uw.edu> wrote: >>>>>>>>>> >> >>>>>>> I've written an application to get content from a kafka >>>>>>>>>> >> >>>>>>> topic >>>>>>>>>> >> >>>>>>> with >>>>>>>>>> >> >>>>>>> 1.7 >>>>>>>>>> >> >>>>>>> billion entries, get the protobuf serialized entries, and >>>>>>>>>> >> >>>>>>> insert >>>>>>>>>> >> >>>>>>> into >>>>>>>>>> >> >>>>>>> hbase. Currently the environment that I'm running in is >>>>>>>>>> >> >>>>>>> Spark >>>>>>>>>> >> >>>>>>> 1.2. >>>>>>>>>> >> >>>>>>> >>>>>>>>>> >> >>>>>>> With 8 executors and 2 cores, and 2 jobs, I'm only getting >>>>>>>>>> >> >>>>>>> between >>>>>>>>>> >> >>>>>>> 0-2500 writes / second. This will take much too long to >>>>>>>>>> >> >>>>>>> consume >>>>>>>>>> >> >>>>>>> the >>>>>>>>>> >> >>>>>>> entries. >>>>>>>>>> >> >>>>>>> >>>>>>>>>> >> >>>>>>> I currently believe that the spark kafka receiver is the >>>>>>>>>> >> >>>>>>> bottleneck. >>>>>>>>>> >> >>>>>>> I've tried both 1.2 receivers, with the WAL and without, >>>>>>>>>> >> >>>>>>> and >>>>>>>>>> >> >>>>>>> didn't >>>>>>>>>> >> >>>>>>> notice any large performance difference. I've tried many >>>>>>>>>> >> >>>>>>> different >>>>>>>>>> >> >>>>>>> spark configuration options, but can't seem to get better >>>>>>>>>> >> >>>>>>> performance. >>>>>>>>>> >> >>>>>>> >>>>>>>>>> >> >>>>>>> I saw 80000 requests / second inserting these records into >>>>>>>>>> >> >>>>>>> kafka >>>>>>>>>> >> >>>>>>> using >>>>>>>>>> >> >>>>>>> yarn / hbase / protobuf / kafka in a bulk fashion. >>>>>>>>>> >> >>>>>>> >>>>>>>>>> >> >>>>>>> While hbase inserts might not deliver the same >>>>>>>>>> >> >>>>>>> throughput, I'd >>>>>>>>>> >> >>>>>>> like to >>>>>>>>>> >> >>>>>>> at least get 10%. >>>>>>>>>> >> >>>>>>> >>>>>>>>>> >> >>>>>>> My application looks like >>>>>>>>>> >> >>>>>>> >>>>>>>>>> >> >>>>>>> https://gist.github.com/drocsid/b0efa4ff6ff4a7c3c8bb56767d0b6877 >>>>>>>>>> >> >>>>>>> >>>>>>>>>> >> >>>>>>> This is my first spark application. I'd appreciate any >>>>>>>>>> >> >>>>>>> assistance. >>>>>>>>>> >> >>>>>>> >>>>>>>>>> >> >>>>>>> >>>>>>>>>> >> >>>>>>> >>>>>>>>>> >> >>>>>>> --------------------------------------------------------------------- >>>>>>>>>> >> >>>>>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>> >> >>>>>>> For additional commands, e-mail: >>>>>>>>>> >> >>>>>>> user-h...@spark.apache.org >>>>>>>>>> >> >>>>>>> >>>>>>>>>> >> >>>> >>>>>>>>>> >> >>>> >>>>>>>>>> >> >>>> --------------------------------------------------------------------- >>>>>>>>>> >> >>>> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>> >> >>>> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>>> >> >>>> >>>>>>>>>> >> >>>>>>>>>> >> --------------------------------------------------------------------- >>>>>>>>>> >> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org >>>>>>>>>> >> For additional commands, e-mail: user-h...@spark.apache.org >>>>>>>>>> >> >>>>>>>>>> > >>>>>>>>> >>>>>>>>> >>>>>>>>
--------------------------------------------------------------------- To unsubscribe, e-mail: user-unsubscr...@spark.apache.org For additional commands, e-mail: user-h...@spark.apache.org