[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15075205#comment-15075205 ]
Mario Briggs commented on SPARK-12177: -------------------------------------- Very good point about creation of KafkaConsumer frequently. In fact, Praveen is investigating if that is reason the 'position()' method hangs when we have batch intervals at 200ms and below. So one way to try to optimize it is this way : since the 'compute' method in DirectKafkaInputDStream runs in the driver, why not store the 'KafkaConsumer' rather than the KafkaCluster as a member variable in this class. Of course we will need to mark it transient, so that its not attempted to be serialized and that means always check if null and re-initialize if required, before use. The only use of the Consumer here is to find the new latest offsets, so we will have to massage that method for use with an existing consumer object . Or another option is to let KafkaCluster have a KafkaConsumer instance as a member variable with same noted aspects about being transient. This also means, move the part about fetching the leader ipAddress for getPreferredLocations() away from KafkaRDD.getPartitions() to DirectKafkaInputDStream.compute() and have 'leaders' as constructor param to KafkaRDD ( i now realize that KafkaRDD is private so we are not having that on a public API as i thought earlier) > Update KafkaDStreams to new Kafka 0.9 Consumer API > -------------------------------------------------- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming > Affects Versions: 1.6.0 > Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org