[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175066#comment-15175066 ] Reynold Xin edited comment on SPARK-12177 at 3/2/16 5:32 AM: - This thread is getting to long for me to follow, but my instinct is that maybe we should have two subprojects and support both. Otherwise it is very bad for Kafka 0.8 users when upgrading to Spark 2.0. It's much more difficult to upgrade Kafka which is a message bus than just upgrading Spark. was (Author: rxin): This thread is getting to long for me to follow, but my instinct is that maybe we should have two subprojects and support both. > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109056#comment-15109056 ] Mario Briggs edited comment on SPARK-12177 at 1/20/16 6:16 PM: --- bq. I totally understand what you mean. However, kafka has its own assembly in Spark and the way the code is structured right now, both the new API and old API would go in the same assembly so it's important to have a different package name. Also, I think for our end users transitioning from old to new API, I foresee them having 2 versions of their spark-kafka app. One that works with the old API and one with the new API. And, I think it would be an easier transition if they could include both the kafka API versions in the spark classpath and pick and choose which app to run without mucking with maven dependencies and re-compiling when they want to switch. Let me know if you disagree. Hey Mark, Since this is WIP, i myself have atleast 1 more different option to what i suggested above... put just one to get the conversation rolling, so thanks for chipping in. Thanks for bringing out the kafka-assembly part… the assembly jar does include all dependencies too, so we would include kafka's v0.9’s jars i guess? I remember Cody mentioning that a v0.8 to v0.9 upgrade on kafka side involves upgrading the brokers… i think that is not required when client uses a v0.9 jar though consuming only the older high level/low level API and talking to a v0.8 kafka cluster. So we can go with 1 kafka-assembly and then my suggestion above itself is broken since we would have issues of same package & class names. 1 thought around not introducing the version in the package name or class name (I see that Flink does it in the class name) was to avoid forcing us to create v0.10/v0.11 packages (and customers to change code and recompile), even if those releases of kafka don’t have client-api’ or otherwise such changes that warrant us to make a new version (Also spark got away without putting a version# till now, which means less work in Spark, so not sure we want to start forcing this work going forward). Once we introduce the version #, we need to ensure it is in sync with kafka. That’s why 1 earlier idea i mentioned in this JIRA was 'The public API signatures (of KafkaUtils in v0.9 subproject) are different and do not clash (with KafkaUtils in original kafka subproject) and hence can be added to the existing (original kafka subproject) KafkaUtils class.’ This also addresses the issues u mention above. Cody mentioned that we need to get others on the same page for this idea, so i guess we really need the committers to chime in here. Of course i forgot to answer’s Nikita’s followup question - 'do you mean that we would change the original KafkaUtils by adding new functions for new DirectIS/KafkaRDD but using them from separate module with kafka09 classes’ ? To be clear, these new public methods added to original kafka subproject’s ‘KafkaUtils' ,will make use of DirectKafkaInputDStream,KafkaRDD,KafkaRDDPartition,OffsetRange classes that are in a new v09 package (internal of course). In short we don’t have a new subproject. (I skipped class KafkaCluster class from the list, becuase i am thinking it makes more sense to call this class something like 'KafkaClient' instead going forward) was (Author: mariobriggs): bq. I totally understand what you mean. However, kafka has its own assembly in Spark and the way the code is structured right now, both the new API and old API would go in the same assembly so it's important to have a different package name. Also, I think for our end users transitioning from old to new API, I foresee them having 2 versions of their spark-kafka app. One that works with the old API and one with the new API. And, I think it would be an easier transition if they could include both the kafka API versions in the spark classpath and pick and choose which app to run without mucking with maven dependencies and re-compiling when they want to switch. Let me know if you disagree. Since this is WIP, i myself have atleast 1 more different option to what i suggested above... put just one to get the conversation rolling, so thanks for chipping in. Thanks for bringing out the kafka-assembly part… the assembly jar does include all dependencies too, so we would include kafka's v0.9’s jars i guess? I remember Cody mentioning that a v0.8 to v0.9 upgrade on kafka side involves upgrading the brokers… i think that is not required when client uses a v0.9 jar though consuming only the older high level/low level API and talking to a v0.8 kafka cluster. So we can go with 1 kafka-assembly and then my suggestion above itself is broken since we would have issues of same package & class names. 1 thought around not introducing the version in the package name or class name (I see that Flink does
[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15075230#comment-15075230 ] Mario Briggs edited comment on SPARK-12177 at 12/30/15 5:24 PM: you could also get just a few of the records you want i.e. not all in 1 shot. So a gist below override def getNext(): R = { if (iter == null || !iter.hasNext) { iter = consumer.poll(pollTime).iterator() } if (!iter.hasNext) { if ( requestOffset < part.untilOffset ) { // need to make another poll() and recheck above. So make a recursive call i.e. 'return getnext()' here ? } finished = true null.asInstanceOf[R] } else { ... was (Author: mariobriggs): you could also get just a few of the records you want i.e. not all in 1 shot override def getNext(): R = { if (iter == null || !iter.hasNext) { iter = consumer.poll(pollTime).iterator() } if (!iter.hasNext) { if ( requestOffset < part.untilOffset ) { // need to make another poll() and recheck above. So make a recursive call i.e. 'return getnext()' here ? } finished = true null.asInstanceOf[R] } else { ... > Update KafkaDStreams to new Kafka 0.9 Consumer API > -- > > Key: SPARK-12177 > URL: https://issues.apache.org/jira/browse/SPARK-12177 > Project: Spark > Issue Type: Improvement > Components: Streaming >Affects Versions: 1.6.0 >Reporter: Nikita Tarasenko > Labels: consumer, kafka > > Kafka 0.9 already released and it introduce new consumer API that not > compatible with old one. So, I added new consumer api. I made separate > classes in package org.apache.spark.streaming.kafka.v09 with changed API. I > didn't remove old classes for more backward compatibility. User will not need > to change his old spark applications when he uprgade to new Spark version. > Please rewiew my changes -- This message was sent by Atlassian JIRA (v6.3.4#6332) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API
[ https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073020#comment-15073020 ] Mario Briggs edited comment on SPARK-12177 at 12/28/15 7:02 PM: Hi Nikita, thanks. Here are my review comments. I couldnt find a way to add them on the PR review in github, so added them here. My comments are a little detailed, since i too developed an implementation and tested it along with my colleague Praveen :-) after initial discussion on dev list A - KafkaCluster class getPartitions() seek() callers of the above methods all other methods that use withConsumer() These should not return a ‘Either’, but rather just the expected object ( the ‘Right’). The reason for the ‘Either’ object in the earlier code was due to the fact the earlier kafka client had to deal with trying the operation on all the ‘seedBrokers’ and handle the case if some of them were down. Similarly when dealing with ‘leaders’, client had to try the operation on all leaders for a TP (TopicAndPartition). When we use the new kaka-clients API, we don’t have to deal with trying against all the seedBrokers, leaders etc, since the new KafkaConsumer object internally handles all those details. Notice that in the earlier code, withBrokers() tries to connect() and invoke the passed in function multiple times with the brokers.forEach() and hence the need to accumulate errors. The earlier code also did a ‘return’ immediately when successful with one of the brokers. This does not apply with the new KafkaConsumer object. getPartitions() - https://github.com/nikit-os/spark/blob/kafka-09-consumer-api/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala#L62 consumer.partitionsFor() java API will returns a null if the topic doesn’t exist. If you don’t handle that, you run into a NPE when the user specifies a topic that doesn’t exist or makes a typo in the topic name (also not returning an exception saying the partition doesn’t exist is not right) our implementation is at - https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala. If it is easier for you that we issue a PR to your repo, let us know B - KafkaRDD class getPreferredLocations() this method is missing in your code. The earlier implementation from Cody had the optimisation that if Kafka and the spark code (KafkaRDD) was running on the same cluster, then the RDD partition for a particular TopicPartition, would be local to that TopicPartition leader. Could you please add code to bring back this functionality. Our implementation, pulled this info inside the getPartitions- https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L52. Probably more efficient to do it inside compute() of the DStream, but that meant exposing ‘leaders’ on the KafkaRDD constructor as discussed - https://mail-archives.apache.org/mod_mbox/spark-dev/201512.mbox/%3CCAKWX9VV34Dto9irT3ZZfH78EoXO3bV3VHN8YYvTxfnyvGcRsQw%40mail.gmail.com%3E C - KafkaRDDIterator class getNext() As mentioned in issue #1 noted here - https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/README.md#issues-noted-with-kafka-090-consumer-api ,KafkaConsumer.poll(x) is not guaranteed to return the data, when x is a small value. We are following up this issue with kafka - https://issues.apache.org/jira/browse/KAFKA-3044 . I see that you have made this a configurable value in your implementation which is good, but either ways till this behaviour is clarified or even otherwise, we need this assert -https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L171 or else we will be silently skipping data without the user knowing it (either default value or user specifying a smaller value) D- Non use of TopicPartition class of new Consumer You already have figured out that this class is not Serializable and hence in the public interface you have used the older TopicAndPartition class. We have raised this issue https://issues.apache.org/jira/browse/KAFKA-3029 with Kafka and maybe be provided with one (yet to see). However using the older TopicAndPartition class in our public API, which introduces a dependency on the older kafka core rather than just kaka-clients jar, i would think is not the preferred approach. If we are not provided with a serializable TopicPartition, then we should rather use our own serializable object (or just a tuple of string, int, Long) inside of DirectKafkaInputDStream to ensure it is Serializable. was (Author: mariobriggs): Hi Nikita, thanks. Here are my review