[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2016-03-01 Thread Reynold Xin (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15175066#comment-15175066
 ] 

Reynold Xin edited comment on SPARK-12177 at 3/2/16 5:32 AM:
-

This thread is getting to long for me to follow, but my instinct is that maybe 
we should have two subprojects and support both. Otherwise it is very bad for 
Kafka 0.8 users when upgrading to Spark 2.0.

It's much more difficult to upgrade Kafka which is a message bus than just 
upgrading Spark.


was (Author: rxin):
This thread is getting to long for me to follow, but my instinct is that maybe 
we should have two subprojects and support both.



> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2016-01-20 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15109056#comment-15109056
 ] 

Mario Briggs edited comment on SPARK-12177 at 1/20/16 6:16 PM:
---

bq. I totally understand what you mean. However, kafka has its own assembly in 
Spark and the way the code is structured right now, both the new API and old 
API would go in the same assembly so it's important to have a different package 
name. Also, I think for our end users transitioning from old to new API, I 
foresee them having 2 versions of their spark-kafka app. One that works with 
the old API and one with the new API. And, I think it would be an easier 
transition if they could include both the kafka API versions in the spark 
classpath and pick and choose which app to run without mucking with maven 
dependencies and re-compiling when they want to switch. Let me know if you 
disagree.

Hey Mark,
Since this is WIP, i myself have atleast 1 more different option to what i 
suggested above... put just one to get the conversation rolling, so thanks for 
chipping in. Thanks for bringing out the kafka-assembly part… the assembly jar 
does include all dependencies too, so we would include kafka's v0.9’s jars i 
guess? I remember Cody mentioning that a v0.8 to v0.9 upgrade on kafka side 
involves upgrading the brokers… i think that is not required when client uses a 
v0.9 jar though consuming only the older high level/low level API and talking 
to a v0.8 kafka cluster. So we can go with 1 kafka-assembly and then my 
suggestion above itself is broken since we would have issues of same package & 
class names.

1 thought around not introducing the version in the package name or class name 
(I see that Flink does it in the class name) was to avoid forcing us to create 
v0.10/v0.11 packages (and customers to change code and recompile), even if 
those releases of kafka don’t have client-api’ or otherwise such changes that 
warrant us to make a new version (Also spark got away without putting a 
version# till now, which means less work in Spark, so not sure we want to start 
forcing this work going forward). Once we introduce the version #, we need to 
ensure it is in sync with kafka.

That’s why 1 earlier idea i mentioned in this JIRA was 'The public API 
signatures (of KafkaUtils in v0.9 subproject) are different and do not clash 
(with KafkaUtils in original kafka subproject) and hence can be added to the 
existing (original kafka subproject) KafkaUtils class.’  This also addresses 
the issues u mention above. Cody mentioned that we need to get others on the 
same page for this idea, so i guess we really need the committers to chime in 
here. Of course i forgot to answer’s Nikita’s followup question - 'do you mean 
that we would change the original KafkaUtils by adding new functions for new 
DirectIS/KafkaRDD but using them from separate module with kafka09 classes’ ?  
To be clear, these new public methods added to  original kafka subproject’s 
‘KafkaUtils' ,will  make use of 
DirectKafkaInputDStream,KafkaRDD,KafkaRDDPartition,OffsetRange classes that are 
in a new v09 package (internal of course).  In short we don’t have a new 
subproject. (I skipped class KafkaCluster class from the list, becuase i am 
thinking it makes more sense to call this class something like 'KafkaClient' 
instead going forward)


was (Author: mariobriggs):
bq. I totally understand what you mean. However, kafka has its own assembly in 
Spark and the way the code is structured right now, both the new API and old 
API would go in the same assembly so it's important to have a different package 
name. Also, I think for our end users transitioning from old to new API, I 
foresee them having 2 versions of their spark-kafka app. One that works with 
the old API and one with the new API. And, I think it would be an easier 
transition if they could include both the kafka API versions in the spark 
classpath and pick and choose which app to run without mucking with maven 
dependencies and re-compiling when they want to switch. Let me know if you 
disagree.

Since this is WIP, i myself have atleast 1 more different option to what i 
suggested above... put just one to get the conversation rolling, so thanks for 
chipping in. Thanks for bringing out the kafka-assembly part… the assembly jar 
does include all dependencies too, so we would include kafka's v0.9’s jars i 
guess? I remember Cody mentioning that a v0.8 to v0.9 upgrade on kafka side 
involves upgrading the brokers… i think that is not required when client uses a 
v0.9 jar though consuming only the older high level/low level API and talking 
to a v0.8 kafka cluster. So we can go with 1 kafka-assembly and then my 
suggestion above itself is broken since we would have issues of same package & 
class names.

1 thought around not introducing the version in the package name or class name 
(I see that Flink does 

[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-30 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15075230#comment-15075230
 ] 

Mario Briggs edited comment on SPARK-12177 at 12/30/15 5:24 PM:


you could also get just a few of the records you want i.e. not all in 1 shot. 
So a gist below

override def getNext(): R = {
  if (iter == null || !iter.hasNext) {
iter = consumer.poll(pollTime).iterator()
  }

  if (!iter.hasNext) {
if ( requestOffset < part.untilOffset ) {

   // need to make another poll() and recheck above. So make a 
recursive call i.e. 'return getnext()' here ?

}
finished = true
null.asInstanceOf[R]
  } else {
   ...


was (Author: mariobriggs):
you could also get just a few of the records you want i.e. not all in 1 shot

override def getNext(): R = {
  if (iter == null || !iter.hasNext) {
iter = consumer.poll(pollTime).iterator()
  }

  if (!iter.hasNext) {
if ( requestOffset < part.untilOffset ) {
   // need to make another poll() and recheck above. So make a 
recursive call i.e. 'return getnext()' here ?
}
finished = true
null.asInstanceOf[R]
  } else {
   ...

> Update KafkaDStreams to new Kafka 0.9 Consumer API
> --
>
> Key: SPARK-12177
> URL: https://issues.apache.org/jira/browse/SPARK-12177
> Project: Spark
>  Issue Type: Improvement
>  Components: Streaming
>Affects Versions: 1.6.0
>Reporter: Nikita Tarasenko
>  Labels: consumer, kafka
>
> Kafka 0.9 already released and it introduce new consumer API that not 
> compatible with old one. So, I added new consumer api. I made separate 
> classes in package org.apache.spark.streaming.kafka.v09 with changed API. I 
> didn't remove old classes for more backward compatibility. User will not need 
> to change his old spark applications when he uprgade to new Spark version.
> Please rewiew my changes



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-12177) Update KafkaDStreams to new Kafka 0.9 Consumer API

2015-12-28 Thread Mario Briggs (JIRA)

[ 
https://issues.apache.org/jira/browse/SPARK-12177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15073020#comment-15073020
 ] 

Mario Briggs edited comment on SPARK-12177 at 12/28/15 7:02 PM:


  Hi Nikita,
 thanks. Here are my review comments. I couldnt find a way to add them on the 
PR review in github, so added them here. My comments are a little detailed, 
since i too developed an implementation and tested it along with my colleague 
Praveen :-) after initial discussion on dev list 

A - KafkaCluster class
 getPartitions()
 seek()
callers of the above methods
all other methods that use withConsumer()

 These should not return a ‘Either’, but rather just the expected object ( the 
‘Right’). The reason for the ‘Either’ object in the earlier code was due to the 
fact the earlier kafka client had to deal with trying the operation on all the 
‘seedBrokers’ and handle the case if some of them were down. Similarly when 
dealing with ‘leaders’, client had to try the operation on all leaders for a TP 
(TopicAndPartition).  When we use the new kaka-clients API, we don’t have to 
deal with trying against all the seedBrokers, leaders etc, since the new 
KafkaConsumer object internally handles all those details.
Notice that in the earlier code, withBrokers() tries to connect() and invoke 
the passed in function multiple times with the brokers.forEach() and hence the 
need to accumulate errors. The earlier code also did a ‘return’ immediately 
when successful with one of the brokers. This does not apply with the new 
KafkaConsumer object.

getPartitions() - 
https://github.com/nikit-os/spark/blob/kafka-09-consumer-api/external/kafka-v09/src/main/scala/org/apache/spark/streaming/kafka/v09/KafkaCluster.scala#L62
  consumer.partitionsFor() java API will returns a null if the topic doesn’t 
exist. If you don’t handle that, you run into a NPE when the user specifies a 
topic that doesn’t exist or makes a typo in the topic name (also not returning 
an exception saying the partition doesn’t exist is not right)

our implementation is at - 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaCluster.scala.
 If it is easier for you that we issue a PR to your repo, let us know

B - KafkaRDD class
   getPreferredLocations()
  this method is missing in your code. The earlier implementation from Cody had 
the optimisation that if Kafka and the spark code (KafkaRDD) was running on the 
same cluster, then the RDD partition for a particular TopicPartition, would be 
local to that TopicPartition leader. Could you please add code to bring back 
this functionality.

 Our implementation, pulled this info inside the getPartitions- 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L52.
 Probably more efficient to do it inside compute() of the DStream, but that 
meant exposing ‘leaders’ on the KafkaRDD constructor as discussed - 
https://mail-archives.apache.org/mod_mbox/spark-dev/201512.mbox/%3CCAKWX9VV34Dto9irT3ZZfH78EoXO3bV3VHN8YYvTxfnyvGcRsQw%40mail.gmail.com%3E

C - KafkaRDDIterator class

 getNext()
  As mentioned in issue #1 noted here - 
https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/README.md#issues-noted-with-kafka-090-consumer-api
,KafkaConsumer.poll(x) is not guaranteed to return the data, when x is a small 
value. We are following up this issue with kafka - 
https://issues.apache.org/jira/browse/KAFKA-3044 . I see that you have made 
this a configurable value in your implementation which is good, but either ways 
till this behaviour is clarified or even otherwise, we need this assert 
-https://github.com/mariobriggs/spark/blob/kafka0.9-streaming/external/kafka-newconsumer/src/main/scala/org/apache/spark/streaming/kafka/KafkaRDD.scala#L171
 or else we will be silently skipping data without the user knowing it (either 
default value or user specifying a smaller value)

D- Non use of TopicPartition class of new Consumer

   You already have figured out that this class is not Serializable and hence 
in the public interface you have used the older TopicAndPartition class. We 
have raised this issue https://issues.apache.org/jira/browse/KAFKA-3029 with 
Kafka and maybe be provided with one (yet to see). However using the older 
TopicAndPartition class in our public API, which introduces a dependency on the 
older kafka core rather than just kaka-clients jar, i would think is not the 
preferred approach. If we are not provided with a serializable TopicPartition, 
then we should rather use our own serializable object (or just a tuple of 
string, int, Long) inside of DirectKafkaInputDStream to ensure it is 
Serializable.



was (Author: mariobriggs):
  Hi Nikita,
 thanks. Here are my review