[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15075205#comment-15075205
 ] 

Mario Briggs commented on SPARK-12177:
--------------------------------------

Very good point about creation of KafkaConsumer frequently. In fact, Praveen is 
investigating if that is reason the 'position()' method hangs when we have 
batch intervals at 200ms and below.
 So one way to try to optimize it is this way : since the 'compute' method in 
DirectKafkaInputDStream runs in the driver, why not store the 'KafkaConsumer' 
rather than the KafkaCluster as a member variable in this class. Of course we 
will need to mark it transient, so that its not attempted to be serialized and 
that means always check if null and re-initialize if required, before use. The 
only use of the Consumer here is to find the new latest offsets, so we will 
have to massage that method for use with an existing consumer object .
Or another option is to let KafkaCluster have a KafkaConsumer instance as a 
member variable with same noted aspects about being transient.

This also means, move the part about fetching the leader ipAddress for 
getPreferredLocations() away from KafkaRDD.getPartitions() to 
DirectKafkaInputDStream.compute() and have 'leaders' as constructor param to 
KafkaRDD ( i now realize that KafkaRDD is private so we are not having that on 
a public API as i thought earlier)
  

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --------------------------------------------------
>
>                 Key: SPARK-12177
>                 URL: https://issues.apache.org/jira/browse/SPARK-12177
>             Project: Spark
>          Issue Type: Improvement
>          Components: Streaming
>    Affects Versions: 1.6.0
>            Reporter: Nikita Tarasenko
>              Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to