Yes, the partition IDs are the same.

As far as the failure / subclassing goes, you may want to keep an eye on
https://issues.apache.org/jira/browse/SPARK-10320 , not sure if the
suggestions in there will end up going anywhere.

On Fri, Sep 25, 2015 at 3:01 PM, Neelesh <neele...@gmail.com> wrote:

> For the 1-1 mapping case, can I use TaskContext.get().partitionId as an
> index in to the offset ranges?
> For the failure case, yes, I'm subclassing of DirectKafkaInputDStream. As
> for failures, different partitions in the same batch may be talking to
> different RDBMS servers due to multitenancy - a spark streaming app is
> consuming from several topics, each topic mapped to a customer for example.
> It is quite possible that in a batch, several partitions belonging to the
> same customer may fail, and others will go through. We don't want the whole
> job to be killed because of one failing customer,and affect others in the
> same job. Hope that makes sense.
>
> thnx
>
> On Fri, Sep 25, 2015 at 12:52 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Your success case will work fine, it is a 1-1 mapping as you said.
>>
>> To handle failures in exactly the way you describe, you'd need to
>> subclass or modify DirectKafkaInputDStream and change the way compute()
>> works.
>>
>> Unless you really are going to have very fine-grained failures (why would
>> only a given partition be failing while the rest are fine?) it's going to
>> be easier to just fail the whole task and retry, or eventually kill the job.
>>
>> On Fri, Sep 25, 2015 at 1:55 PM, Neelesh <neele...@gmail.com> wrote:
>>
>>> Thanks Petr, Cody. This is a reasonable place to start for me. What I'm
>>> trying to achieve
>>>
>>> stream.foreachRDD {rdd=>
>>>    rdd.foreachPartition { p=>
>>>
>>>        Try(myFunc(...))  match {
>>>          case Sucess(s) => updatewatermark for this partition //of
>>> course, expectation is that it will work only if there is a 1-1 mapping at
>>> this point in time
>>>          case Failure()  => Tell the driver not to generate a partition
>>> for this kafka topic+partition for a while, by updating some shared state
>>> (zk)
>>>
>>>        }
>>>
>>>  }
>>> }
>>>
>>> I was looking for that mapping b/w kafka partition thats bound to a task
>>> inside the task execution code, in cases where the intermediate operations
>>> do not change partitions, shuffle etc.
>>>
>>> -neelesh
>>>
>>> On Fri, Sep 25, 2015 at 11:14 AM, Cody Koeninger <c...@koeninger.org>
>>> wrote:
>>>
>>>>
>>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers
>>>>
>>>> also has an example of how to close over the offset ranges so they are
>>>> available on executors.
>>>>
>>>> On Fri, Sep 25, 2015 at 12:50 PM, Neelesh <neele...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>    We are using DirectKafkaInputDStream and store completed consumer
>>>>> offsets in Kafka (0.8.2). However, some of our use case require that
>>>>> offsets be not written if processing of a partition fails with certain
>>>>> exceptions. This allows us to build various backoff strategies for that
>>>>> partition, instead of either blindly committing consumer offsets 
>>>>> regardless
>>>>> of errors (because KafkaRDD as HasOffsetRanges is available only on the
>>>>> driver)  or relying on Spark's retry logic and continuing without remedial
>>>>> action.
>>>>>
>>>>> I was playing with SparkListener and found that while one can listen
>>>>> on taskCompletedEvent on the driver and even figure out that there was an
>>>>> error, there is no way of mapping this task back to the partition and
>>>>> retrieving offset range, topic & kafka partition # etc.
>>>>>
>>>>> Any pointers appreciated!
>>>>>
>>>>> Thanks!
>>>>> -neelesh
>>>>>
>>>>
>>>>
>>>
>>
>

Reply via email to