For the 1-1 mapping case, can I use TaskContext.get().partitionId as an
index in to the offset ranges?
For the failure case, yes, I'm subclassing of DirectKafkaInputDStream. As
for failures, different partitions in the same batch may be talking to
different RDBMS servers due to multitenancy - a spark streaming app is
consuming from several topics, each topic mapped to a customer for example.
It is quite possible that in a batch, several partitions belonging to the
same customer may fail, and others will go through. We don't want the whole
job to be killed because of one failing customer,and affect others in the
same job. Hope that makes sense.

thnx

On Fri, Sep 25, 2015 at 12:52 PM, Cody Koeninger <c...@koeninger.org> wrote:

> Your success case will work fine, it is a 1-1 mapping as you said.
>
> To handle failures in exactly the way you describe, you'd need to subclass
> or modify DirectKafkaInputDStream and change the way compute() works.
>
> Unless you really are going to have very fine-grained failures (why would
> only a given partition be failing while the rest are fine?) it's going to
> be easier to just fail the whole task and retry, or eventually kill the job.
>
> On Fri, Sep 25, 2015 at 1:55 PM, Neelesh <neele...@gmail.com> wrote:
>
>> Thanks Petr, Cody. This is a reasonable place to start for me. What I'm
>> trying to achieve
>>
>> stream.foreachRDD {rdd=>
>>    rdd.foreachPartition { p=>
>>
>>        Try(myFunc(...))  match {
>>          case Sucess(s) => updatewatermark for this partition //of
>> course, expectation is that it will work only if there is a 1-1 mapping at
>> this point in time
>>          case Failure()  => Tell the driver not to generate a partition
>> for this kafka topic+partition for a while, by updating some shared state
>> (zk)
>>
>>        }
>>
>>  }
>> }
>>
>> I was looking for that mapping b/w kafka partition thats bound to a task
>> inside the task execution code, in cases where the intermediate operations
>> do not change partitions, shuffle etc.
>>
>> -neelesh
>>
>> On Fri, Sep 25, 2015 at 11:14 AM, Cody Koeninger <c...@koeninger.org>
>> wrote:
>>
>>>
>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>
>>> also has an example of how to close over the offset ranges so they are
>>> available on executors.
>>>
>>> On Fri, Sep 25, 2015 at 12:50 PM, Neelesh <neele...@gmail.com> wrote:
>>>
>>>> Hi,
>>>>    We are using DirectKafkaInputDStream and store completed consumer
>>>> offsets in Kafka (0.8.2). However, some of our use case require that
>>>> offsets be not written if processing of a partition fails with certain
>>>> exceptions. This allows us to build various backoff strategies for that
>>>> partition, instead of either blindly committing consumer offsets regardless
>>>> of errors (because KafkaRDD as HasOffsetRanges is available only on the
>>>> driver)  or relying on Spark's retry logic and continuing without remedial
>>>> action.
>>>>
>>>> I was playing with SparkListener and found that while one can listen on
>>>> taskCompletedEvent on the driver and even figure out that there was an
>>>> error, there is no way of mapping this task back to the partition and
>>>> retrieving offset range, topic & kafka partition # etc.
>>>>
>>>> Any pointers appreciated!
>>>>
>>>> Thanks!
>>>> -neelesh
>>>>
>>>
>>>
>>
>

Reply via email to