[ 
https://issues.apache.org/jira/browse/SPARK-18779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pranav Nakhe updated SPARK-18779:
---------------------------------
    Description: 
I apologize for the earlier descripion which wasnt very clear about the issue. 
I would give a detailed description and my usecase now -

I have a spark application running which is consuming kafka messages using 
Spark Kafka 0.10 integration. I now need to stop my spark application and the 
user would then tell what timestamp in the past the spark application should 
start reading messages from (replaying messages). The timestamp is mapped to 
kafka offset by using the 'offsetsForTimes' API in KafkaConsumer introduced in 
10.1.0 client of Kafka. That offset is then used to create DStream

Because Kafka 10.0.1 des not have API 'offsetsForTimes' I need to use Kafka 
10.1.0. 

So to achieve that behavior I replaced the 10.0.1 jar in Spark environment with 
10.1.0 jar. Things started working for me but the application could read only 
messages from the first partition.

To recreate the issue I wrote a local program and had 10.1.0 jar in the 
classpath

********************************
val topics = Set("Z1Topic")
val topicPartitionOffsetMap = new HashMap[TopicPartition, Long]()
topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",0), 10L) //hardcoded 
offset to 10 instead of getting the offset from 'offsetsForTimes'
topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",1), 10L)

import scala.collection.JavaConversions._
val stream = KafkaUtils.createDirectStream[String, String](ssc, PreferBrokers, 
Subscribe[String, String](topics, kafkaParams, topicPartitionOffsetMap))
val x = stream.map(x => x.value())
x.print()
********************************

This printed only the messages in the first topic from offset 10.  (This is 
with 10.1.0 client)

If I am to use Kafka 10.0.1 client for the above program, things work fine and 
I receive messages from all partitions but I cant use the 'offsetsForTimes' API 
(because it doesnt exist in 10.0.1 client). 

  was:
The Spark Streaming integration for Kafka 0.10 currently pulls the kafka client 
10.0.1 as a maven dependency. There is a newer kafka client available 10.1.0.  
Would it be possible to have Spark Kafka integration use kafka client 10.1.0.

The reason we need the later Kafka client is because the later one (10.1.0) has 
introduced a new API 'offsetsForTimes' for the KafkaConsumer which look up the 
offsets for the given partitions by timestamp. We are using that to find the 
offset that needs to be fed to the createDirectStream.

        Summary: Messages being received only from one partition when using 
Spark Streaming integration for Kafka 0.10 with kafka client library at 0.10.1  
(was: Upgrade Spark Streaming integration for Kafka 0.10 to use Kafka client 
library at 0.10.1)

I am updating the Summary and description field of the jira to represent the 
issue correctly

> Messages being received only from one partition when using Spark Streaming 
> integration for Kafka 0.10 with kafka client library at 0.10.1
> -----------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-18779
>                 URL: https://issues.apache.org/jira/browse/SPARK-18779
>             Project: Spark
>          Issue Type: Improvement
>          Components: DStreams
>    Affects Versions: 2.0.2
>            Reporter: Pranav Nakhe
>
> I apologize for the earlier descripion which wasnt very clear about the 
> issue. I would give a detailed description and my usecase now -
> I have a spark application running which is consuming kafka messages using 
> Spark Kafka 0.10 integration. I now need to stop my spark application and the 
> user would then tell what timestamp in the past the spark application should 
> start reading messages from (replaying messages). The timestamp is mapped to 
> kafka offset by using the 'offsetsForTimes' API in KafkaConsumer introduced 
> in 10.1.0 client of Kafka. That offset is then used to create DStream
> Because Kafka 10.0.1 des not have API 'offsetsForTimes' I need to use Kafka 
> 10.1.0. 
> So to achieve that behavior I replaced the 10.0.1 jar in Spark environment 
> with 10.1.0 jar. Things started working for me but the application could read 
> only messages from the first partition.
> To recreate the issue I wrote a local program and had 10.1.0 jar in the 
> classpath
> ********************************
> val topics = Set("Z1Topic")
> val topicPartitionOffsetMap = new HashMap[TopicPartition, Long]()
> topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",0), 10L) //hardcoded 
> offset to 10 instead of getting the offset from 'offsetsForTimes'
> topicPartitionOffsetMap.put(new TopicPartition("Z1Topic",1), 10L)
> import scala.collection.JavaConversions._
> val stream = KafkaUtils.createDirectStream[String, String](ssc, 
> PreferBrokers, Subscribe[String, String](topics, kafkaParams, 
> topicPartitionOffsetMap))
> val x = stream.map(x => x.value())
> x.print()
> ********************************
> This printed only the messages in the first topic from offset 10.  (This is 
> with 10.1.0 client)
> If I am to use Kafka 10.0.1 client for the above program, things work fine 
> and I receive messages from all partitions but I cant use the 
> 'offsetsForTimes' API (because it doesnt exist in 10.0.1 client). 



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to