Hi Dibyendu,

Thanks for the details on the implementation. But I still do not believe
that it is no duplicates - what they achieve is that the same batch is
processed exactly the same way every time (but see it may be processed more
than once) - so it depends on the operation being idempotent. I believe
Trident uses ZK to keep track of the transactions - a batch can be
processed multiple times in failure scenarios (for example, the transaction
is processed but before ZK is updated the machine fails, causing a "new"
node to process it again).

I don't think it is impossible to do this in Spark Streaming as well and
I'd be really interested in working on it at some point in the near future.

On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
dibyendu.bhattach...@gmail.com> wrote:

> Hi,
>
> Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
> Trident has done the exact-once guarantee by processing the tuple in a
> batch  and assigning same transaction-id for a given batch . The replay for
> a given batch with a transaction-id will have exact same set of tuples and
> replay of batches happen in exact same order before the failure.
>
> Having this paradigm, if downstream system process data for a given batch
> for having a given transaction-id , and if during failure if same batch is
> again emitted , you can check if same transaction-id is already processed
> or not and hence can guarantee exact once semantics.
>
> And this can only be achieved in Spark if we use Low Level Kafka consumer
> API to process the offsets. This low level Kafka Consumer (
> https://github.com/dibbhatt/kafka-spark-consumer) has implemented the
> Spark Kafka consumer which uses Kafka Low Level APIs . All of the Kafka
> related logic has been taken from Storm-Kafka spout and which manages all
> Kafka re-balance and fault tolerant aspects and Kafka metadata managements.
>
> Presently this Consumer maintains that during Receiver failure, it will
> re-emit the exact same Block with same set of messages . Every message have
> the details of its partition, offset and topic related details which can
> tackle the SPARK-3146.
>
> As this Low Level consumer has complete control over the Kafka Offsets ,
> we can implement Trident like feature on top of it like having implement a
> transaction-id for a given block , and re-emit the same block with same set
> of message during Driver failure.
>
> Regards,
> Dibyendu
>
>
> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <saisai.s...@intel.com>
> wrote:
>>
>> Hi all,
>>
>> I agree with Hari that Strong exact-once semantics is very hard to
>> guarantee, especially in the failure situation. From my understanding even
>> current implementation of ReliableKafkaReceiver cannot fully guarantee the
>> exact once semantics once failed, first is the ordering of data replaying
>> from last checkpoint, this is hard to guarantee when multiple partitions
>> are injected in; second is the design complexity of achieving this, you can
>> refer to the Kafka Spout in Trident, we have to dig into the very details
>> of Kafka metadata management system to achieve this, not to say rebalance
>> and fault-tolerance.
>>
>> Thanks
>> Jerry
>>
>> -----Original Message-----
>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
>> Sent: Friday, December 19, 2014 5:57 AM
>> To: Cody Koeninger
>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>> Subject: Re: Which committers care about Kafka?
>>
>> But idempotency is not that easy t achieve sometimes. A strong only once
>> semantic through a proper API would  be superuseful; but I'm not implying
>> this is easy to achieve.
>> On 18 Dec 2014 21:52, "Cody Koeninger" <c...@koeninger.org> wrote:
>>
>> > If the downstream store for the output data is idempotent or
>> > transactional, and that downstream store also is the system of record
>> > for kafka offsets, then you have exactly-once semantics.  Commit
>> > offsets with / after the data is stored.  On any failure, restart from
>> the last committed offsets.
>> >
>> > Yes, this approach is biased towards the etl-like use cases rather
>> > than near-realtime-analytics use cases.
>> >
>> > On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>> > hshreedha...@cloudera.com
>> > > wrote:
>> > >
>> > > I get what you are saying. But getting exactly once right is an
>> > > extremely hard problem - especially in presence of failure. The
>> > > issue is failures
>> > can
>> > > happen in a bunch of places. For example, before the notification of
>> > > downstream store being successful reaches the receiver that updates
>> > > the offsets, the node fails. The store was successful, but
>> > > duplicates came in either way. This is something worth discussing by
>> > > itself - but without uuids etc this might not really be solved even
>> when you think it is.
>> > >
>> > > Anyway, I will look at the links. Even I am interested in all of the
>> > > features you mentioned - no HDFS WAL for Kafka and once-only
>> > > delivery,
>> > but
>> > > I doubt the latter is really possible to guarantee - though I really
>> > would
>> > > love to have that!
>> > >
>> > > Thanks,
>> > > Hari
>> > >
>> > >
>> > > On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>> > > <c...@koeninger.org>
>> > > wrote:
>> > >
>> > >> Thanks for the replies.
>> > >>
>> > >> Regarding skipping WAL, it's not just about optimization.  If you
>> > >> actually want exactly-once semantics, you need control of kafka
>> > >> offsets
>> > as
>> > >> well, including the ability to not use zookeeper as the system of
>> > >> record for offsets.  Kafka already is a reliable system that has
>> > >> strong
>> > ordering
>> > >> guarantees (within a partition) and does not mandate the use of
>> > zookeeper
>> > >> to store offsets.  I think there should be a spark api that acts as
>> > >> a
>> > very
>> > >> simple intermediary between Kafka and the user's choice of
>> > >> downstream
>> > store.
>> > >>
>> > >> Take a look at the links I posted - if there's already been 2
>> > independent
>> > >> implementations of the idea, chances are it's something people need.
>> > >>
>> > >> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>> > >> hshreedha...@cloudera.com> wrote:
>> > >>>
>> > >>> Hi Cody,
>> > >>>
>> > >>> I am an absolute +1 on SPARK-3146. I think we can implement
>> > >>> something pretty simple and lightweight for that one.
>> > >>>
>> > >>> For the Kafka DStream skipping the WAL implementation - this is
>> > >>> something I discussed with TD a few weeks ago. Though it is a good
>> > idea to
>> > >>> implement this to avoid unnecessary HDFS writes, it is an
>> > optimization. For
>> > >>> that reason, we must be careful in implementation. There are a
>> > >>> couple
>> > of
>> > >>> issues that we need to ensure works properly - specifically
>> ordering.
>> > To
>> > >>> ensure we pull messages from different topics and partitions in
>> > >>> the
>> > same
>> > >>> order after failure, we’d still have to persist the metadata to
>> > >>> HDFS
>> > (or
>> > >>> some other system) - this metadata must contain the order of
>> > >>> messages consumed, so we know how to re-read the messages. I am
>> > >>> planning to
>> > explore
>> > >>> this once I have some time (probably in Jan). In addition, we must
>> > >>> also ensure bucketing functions work fine as well. I will file a
>> > >>> placeholder jira for this one.
>> > >>>
>> > >>> I also wrote an API to write data back to Kafka a while back -
>> > >>> https://github.com/apache/spark/pull/2994 . I am hoping that this
>> > >>> will get pulled in soon, as this is something I know people want.
>> > >>> I am open
>> > to
>> > >>> feedback on that - anything that I can do to make it better.
>> > >>>
>> > >>> Thanks,
>> > >>> Hari
>> > >>>
>> > >>>
>> > >>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>> > >>> <pwend...@gmail.com>
>> > >>> wrote:
>> > >>>
>> > >>>>  Hey Cody,
>> > >>>>
>> > >>>> Thanks for reaching out with this. The lead on streaming is TD -
>> > >>>> he is traveling this week though so I can respond a bit. To the
>> > >>>> high level point of whether Kafka is important - it definitely
>> > >>>> is. Something like 80% of Spark Streaming deployments
>> > >>>> (anecdotally) ingest data from Kafka. Also, good support for
>> > >>>> Kafka is something we generally want in Spark and not a library.
>> > >>>> In some cases IIRC there were user libraries that used unstable
>> > >>>> Kafka API's and we were somewhat waiting on Kafka to stabilize
>> > >>>> them to merge things upstream. Otherwise users wouldn't be able
>> > >>>> to use newer Kakfa versions. This is a high level impression only
>> > >>>> though, I haven't talked to TD about this recently so it's worth
>> revisiting given the developments in Kafka.
>> > >>>>
>> > >>>> Please do bring things up like this on the dev list if there are
>> > >>>> blockers for your usage - thanks for pinging it.
>> > >>>>
>> > >>>> - Patrick
>> > >>>>
>> > >>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>> > >>>> <c...@koeninger.org>
>> > >>>> wrote:
>> > >>>> > Now that 1.2 is finalized... who are the go-to people to get
>> > >>>> > some long-standing Kafka related issues resolved?
>> > >>>> >
>> > >>>> > The existing api is not sufficiently safe nor flexible for our
>> > >>>> production
>> > >>>> > use. I don't think we're alone in this viewpoint, because I've
>> > >>>> > seen several different patches and libraries to fix the same
>> > >>>> > things we've
>> > >>>> been
>> > >>>> > running into.
>> > >>>> >
>> > >>>> > Regarding flexibility
>> > >>>> >
>> > >>>> > https://issues.apache.org/jira/browse/SPARK-3146
>> > >>>> >
>> > >>>> > has been outstanding since August, and IMHO an equivalent of
>> > >>>> > this is absolutely necessary. We wrote a similar patch
>> > >>>> > ourselves, then found
>> > >>>> that
>> > >>>> > PR and have been running it in production. We wouldn't be able
>> > >>>> > to
>> > get
>> > >>>> our
>> > >>>> > jobs done without it. It also allows users to solve a whole
>> > >>>> > class of problems for themselves (e.g. SPARK-2388, arbitrary
>> > >>>> > delay of
>> > >>>> messages, etc).
>> > >>>> >
>> > >>>> > Regarding safety, I understand the motivation behind
>> > >>>> > WriteAheadLog
>> > as
>> > >>>> a
>> > >>>> > general solution for streaming unreliable sources, but Kafka
>> > >>>> > already
>> > >>>> is a
>> > >>>> > reliable source. I think there's a need for an api that treats
>> > >>>> > it as such. Even aside from the performance issues of
>> > >>>> > duplicating the write-ahead log in kafka into another
>> > >>>> > write-ahead log in hdfs, I
>> > need
>> > >>>> > exactly-once semantics in the face of failure (I've had
>> > >>>> > failures
>> > that
>> > >>>> > prevented reloading a spark streaming checkpoint, for instance).
>> > >>>> >
>> > >>>> > I've got an implementation i've been using
>> > >>>> >
>> > >>>> > https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>> > >>>> > ka /src/main/scala/org/apache/spark/rdd/kafka
>> > >>>> >
>> > >>>> > Tresata has something similar at
>> > >>>> https://github.com/tresata/spark-kafka,
>> > >>>> > and I know there were earlier attempts based on Storm code.
>> > >>>> >
>> > >>>> > Trying to distribute these kinds of fixes as libraries rather
>> > >>>> > than
>> > >>>> patches
>> > >>>> > to Spark is problematic, because large portions of the
>> > implementation
>> > >>>> are
>> > >>>> > private[spark].
>> > >>>> >
>> > >>>> > I'd like to help, but i need to know whose attention to get.
>> > >>>>
>> > >>>> -----------------------------------------------------------------
>> > >>>> ---- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For
>> > >>>> additional commands, e-mail: dev-h...@spark.apache.org
>> > >>>>
>> > >>>>
>> > >>>
>> > >
>> >
>>
>

Reply via email to