Yes, the partition IDs are the same. As far as the failure / subclassing goes, you may want to keep an eye on https://issues.apache.org/jira/browse/SPARK-10320 , not sure if the suggestions in there will end up going anywhere.
On Fri, Sep 25, 2015 at 3:01 PM, Neelesh <neele...@gmail.com> wrote: > For the 1-1 mapping case, can I use TaskContext.get().partitionId as an > index in to the offset ranges? > For the failure case, yes, I'm subclassing of DirectKafkaInputDStream. As > for failures, different partitions in the same batch may be talking to > different RDBMS servers due to multitenancy - a spark streaming app is > consuming from several topics, each topic mapped to a customer for example. > It is quite possible that in a batch, several partitions belonging to the > same customer may fail, and others will go through. We don't want the whole > job to be killed because of one failing customer,and affect others in the > same job. Hope that makes sense. > > thnx > > On Fri, Sep 25, 2015 at 12:52 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Your success case will work fine, it is a 1-1 mapping as you said. >> >> To handle failures in exactly the way you describe, you'd need to >> subclass or modify DirectKafkaInputDStream and change the way compute() >> works. >> >> Unless you really are going to have very fine-grained failures (why would >> only a given partition be failing while the rest are fine?) it's going to >> be easier to just fail the whole task and retry, or eventually kill the job. >> >> On Fri, Sep 25, 2015 at 1:55 PM, Neelesh <neele...@gmail.com> wrote: >> >>> Thanks Petr, Cody. This is a reasonable place to start for me. What I'm >>> trying to achieve >>> >>> stream.foreachRDD {rdd=> >>> rdd.foreachPartition { p=> >>> >>> Try(myFunc(...)) match { >>> case Sucess(s) => updatewatermark for this partition //of >>> course, expectation is that it will work only if there is a 1-1 mapping at >>> this point in time >>> case Failure() => Tell the driver not to generate a partition >>> for this kafka topic+partition for a while, by updating some shared state >>> (zk) >>> >>> } >>> >>> } >>> } >>> >>> I was looking for that mapping b/w kafka partition thats bound to a task >>> inside the task execution code, in cases where the intermediate operations >>> do not change partitions, shuffle etc. >>> >>> -neelesh >>> >>> On Fri, Sep 25, 2015 at 11:14 AM, Cody Koeninger <c...@koeninger.org> >>> wrote: >>> >>>> >>>> http://spark.apache.org/docs/latest/streaming-kafka-integration.html#approach-2-direct-approach-no-receivers >>>> >>>> also has an example of how to close over the offset ranges so they are >>>> available on executors. >>>> >>>> On Fri, Sep 25, 2015 at 12:50 PM, Neelesh <neele...@gmail.com> wrote: >>>> >>>>> Hi, >>>>> We are using DirectKafkaInputDStream and store completed consumer >>>>> offsets in Kafka (0.8.2). However, some of our use case require that >>>>> offsets be not written if processing of a partition fails with certain >>>>> exceptions. This allows us to build various backoff strategies for that >>>>> partition, instead of either blindly committing consumer offsets >>>>> regardless >>>>> of errors (because KafkaRDD as HasOffsetRanges is available only on the >>>>> driver) or relying on Spark's retry logic and continuing without remedial >>>>> action. >>>>> >>>>> I was playing with SparkListener and found that while one can listen >>>>> on taskCompletedEvent on the driver and even figure out that there was an >>>>> error, there is no way of mapping this task back to the partition and >>>>> retrieving offset range, topic & kafka partition # etc. >>>>> >>>>> Any pointers appreciated! >>>>> >>>>> Thanks! >>>>> -neelesh >>>>> >>>> >>>> >>> >> >