[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15174485#comment-15174485
 ] 

Mansi Shah commented on SPARK-12177:
------------------------------------

Caching the new consumer across batches. I did not know how to do this with the 
current implementation of spark plugin, so I ran a simple experiment that just 
reads messages from kafka in batches. So say for a batch that has 100K record - 
a cached consumer model finishes in 68.3 secs but its non-cached counter part 
where we recreate the consumer for every batch takes 165.9 secs. This gets way 
worse as the batch sizes change. Now if we make the batch sizes restrictive and 
throw out messages beyond the until offset then the same job takes 180 secs. 

I had one unrelated question - how come RDD batch sizes do not have anything to 
do with the actual message size. If the batch size is fixed at 100K records - 
wouldn't that mean completely different things for messages that are 100 bytes 
worse 1M? Just in terms of pure memory pressure? Do you have any thoughts on 
that?

Btw the solution where we cache the records in the executer will unfortunately 
not work for us, as our system is truely distributed and a topic partition does 
not always reside on the same node like kafka. I mean we can do some predefined 
static allocation using getPreferredLocations but that means we can never use 
locality and always have to use this static assignment even if the spark 
cluster is colocated with mapr cluster. That is where I was suggesting the 
offsets being communicated back to driver. We do not need to do this explicitly 
- just doing a kafka commit on the executor should take care of this. 



> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --------------------------------------------------
>
>                 Key: SPARK-12177
>                 URL: https://issues.apache.org/jira/browse/SPARK-12177
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.6.0
>            Reporter: Nikita Tarasenko
>              Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to