Thanks. Ill keep an eye on this. Our implementation of the DStream
basically accepts a function to compute current offsets. The implementation
of the function fetches list of topics from zookeeper once in while. It
then adds consumer offsets for newly added topics  with the currentOffsets
thats in memory  & deletes removed topics. The "once in a while" is
pluggable as well, and we are planning to use ZK watches instead of a time
based refresh. Works for us because we use ZK extensively for a lot of
other book keeping.


On Fri, Sep 25, 2015 at 1:16 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Yes, the partition IDs are the same.
>
> As far as the failure / subclassing goes, you may want to keep an eye on
> https://issues.apache.org/jira/browse/SPARK-10320 , not sure if the
> suggestions in there will end up going anywhere.
>
> On Fri, Sep 25, 2015 at 3:01 PM, Neelesh <neele...@gmail.com> wrote:
>
>> For the 1-1 mapping case, can I use TaskContext.get().partitionId as an
>> index in to the offset ranges?
>> For the failure case, yes, I'm subclassing of DirectKafkaInputDStream.
>> As for failures, different partitions in the same batch may be talking to
>> different RDBMS servers due to multitenancy - a spark streaming app is
>> consuming from several topics, each topic mapped to a customer for example.
>> It is quite possible that in a batch, several partitions belonging to the
>> same customer may fail, and others will go through. We don't want the whole
>> job to be killed because of one failing customer,and affect others in the
>> same job. Hope that makes sense.
>>
>> thnx
>>
>> On Fri, Sep 25, 2015 at 12:52 PM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>> Your success case will work fine, it is a 1-1 mapping as you said.
>>>
>>> To handle failures in exactly the way you describe, you'd need to
>>> subclass or modify DirectKafkaInputDStream and change the way compute()
>>> works.
>>>
>>> Unless you really are going to have very fine-grained failures (why
>>> would only a given partition be failing while the rest are fine?) it's
>>> going to be easier to just fail the whole task and retry, or eventually
>>> kill the job.
>>>
>>> On Fri, Sep 25, 2015 at 1:55 PM, Neelesh <neele...@gmail.com> wrote:
>>>
>>>> Thanks Petr, Cody. This is a reasonable place to start for me. What I'm
>>>> trying to achieve
>>>>
>>>> stream.foreachRDD {rdd=>
>>>>    rdd.foreachPartition { p=>
>>>>
>>>>        Try(myFunc(...))  match {
>>>>          case Sucess(s) => updatewatermark for this partition //of
>>>> course, expectation is that it will work only if there is a 1-1 mapping at
>>>> this point in time
>>>>          case Failure()  => Tell the driver not to generate a partition
>>>> for this kafka topic+partition for a while, by updating some shared state
>>>> (zk)
>>>>
>>>>        }
>>>>
>>>>  }
>>>> }
>>>>
>>>> I was looking for that mapping b/w kafka partition thats bound to a
>>>> task inside the task execution code, in cases where the intermediate
>>>> operations do not change partitions, shuffle etc.
>>>>
>>>> -neelesh
>>>>
>>>> On Fri, Sep 25, 2015 at 11:14 AM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>>
>>>>>
>>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>>>
>>>>> also has an example of how to close over the offset ranges so they are
>>>>> available on executors.
>>>>>
>>>>> On Fri, Sep 25, 2015 at 12:50 PM, Neelesh <neele...@gmail.com> wrote:
>>>>>
>>>>>> Hi,
>>>>>>    We are using DirectKafkaInputDStream and store completed consumer
>>>>>> offsets in Kafka (0.8.2). However, some of our use case require that
>>>>>> offsets be not written if processing of a partition fails with certain
>>>>>> exceptions. This allows us to build various backoff strategies for that
>>>>>> partition, instead of either blindly committing consumer offsets 
>>>>>> regardless
>>>>>> of errors (because KafkaRDD as HasOffsetRanges is available only on the
>>>>>> driver)  or relying on Spark's retry logic and continuing without 
>>>>>> remedial
>>>>>> action.
>>>>>>
>>>>>> I was playing with SparkListener and found that while one can listen
>>>>>> on taskCompletedEvent on the driver and even figure out that there was an
>>>>>> error, there is no way of mapping this task back to the partition and
>>>>>> retrieving offset range, topic & kafka partition # etc.
>>>>>>
>>>>>> Any pointers appreciated!
>>>>>>
>>>>>> Thanks!
>>>>>> -neelesh
>>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to