That's precisely what this constructor does:
KafkaUtils.createDirectStream[...](ssc,
kafkaConfig, topics)

Is there a reason to do that yourself?  In that case, look at how it's done
in Spark Streaming for inspiration:
https://github.com/apache/spark/blob/master/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala#L204

-kr, Gerard.




On Mon, Jan 25, 2016 at 5:53 PM, Ashish Soni <asoni.le...@gmail.com> wrote:

> Correct what i am trying to achieve is that before the streaming job
> starts query the topic meta data from kafka , determine all the partition
> and provide those to direct API.
>
> So my question is should i consider passing all the partition from command
> line and query kafka and find and provide , what is the correct approach.
>
> Ashish
>
> On Mon, Jan 25, 2016 at 11:38 AM, Gerard Maas <gerard.m...@gmail.com>
> wrote:
>
>> What are you trying to achieve?
>>
>> Looks like you want to provide offsets but you're not managing them
>> and I'm assuming you're using the direct stream approach.
>>
>> In that case, use the simpler constructor that takes the kafka config and
>> the topics. Let it figure it out the offsets (it will contact kafka and
>> request the partitions for the topics provided)
>>
>> KafkaUtils.createDirectStream[...](ssc, kafkaConfig, topics)
>>
>>  -kr, Gerard
>>
>> On Mon, Jan 25, 2016 at 5:31 PM, Ashish Soni <asoni.le...@gmail.com>
>> wrote:
>>
>>> Hi All ,
>>>
>>> What is the best way to tell spark streaming job for the no of partition
>>> to to a given topic -
>>>
>>> Should that be provided as a parameter or command line argument
>>> or
>>> We should connect to kafka in the driver program and query it
>>>
>>> Map<TopicAndPartition, Long> fromOffsets = new
>>> HashMap<TopicAndPartition, Long>();
>>> fromOffsets.put(new TopicAndPartition(driverArgs.inputTopic, 0), 0L);
>>>
>>> Thanks,
>>> Ashish
>>>
>>
>>
>

Reply via email to