Can you explain your basic algorithm for the once-only-delivery? It is quite a 
bit of very Kafka-specific code, that would take more time to read than I can 
currently afford? If you can explain your algorithm a bit, it might help.




Thanks, Hari

On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <c...@koeninger.org>
wrote:

> The problems you guys are discussing come from trying to store state in
> spark, so don't do that.  Spark isn't a distributed database.
> Just map kafka partitions directly to rdds, llet user code specify the
> range of offsets explicitly, and let them be in charge of committing
> offsets.
> Using the simple consumer isn't that bad, I'm already using this in
> production with the code I linked to, and tresata apparently has been as
> well.  Again, for everyone saying this is impossible, have you read either
> of those implementations and looked at the approach?
> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <sean.mcnam...@webtrends.com>
> wrote:
>> Please feel free to correct me if I’m wrong, but I think the exactly once
>> spark streaming semantics can easily be solved using updateStateByKey. Make
>> the key going into updateStateByKey be a hash of the event, or pluck off
>> some uuid from the message.  The updateFunc would only emit the message if
>> the key did not exist, and the user has complete control over the window of
>> time / state lifecycle for detecting duplicates.  It also makes it really
>> easy to detect and take action (alert?) when you DO see a duplicate, or
>> make memory tradeoffs within an error bound using a sketch algorithm.  The
>> kafka simple consumer is insanely complex, if possible I think it would be
>> better (and vastly more flexible) to get reliability using the primitives
>> that spark so elegantly provides.
>>
>> Cheers,
>>
>> Sean
>>
>>
>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
>> hshreedha...@cloudera.com> wrote:
>> >
>> > Hi Dibyendu,
>> >
>> > Thanks for the details on the implementation. But I still do not believe
>> > that it is no duplicates - what they achieve is that the same batch is
>> > processed exactly the same way every time (but see it may be processed
>> more
>> > than once) - so it depends on the operation being idempotent. I believe
>> > Trident uses ZK to keep track of the transactions - a batch can be
>> > processed multiple times in failure scenarios (for example, the
>> transaction
>> > is processed but before ZK is updated the machine fails, causing a "new"
>> > node to process it again).
>> >
>> > I don't think it is impossible to do this in Spark Streaming as well and
>> > I'd be really interested in working on it at some point in the near
>> future.
>> >
>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
>> > dibyendu.bhattach...@gmail.com> wrote:
>> >
>> >> Hi,
>> >>
>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
>> >> Trident has done the exact-once guarantee by processing the tuple in a
>> >> batch  and assigning same transaction-id for a given batch . The replay
>> for
>> >> a given batch with a transaction-id will have exact same set of tuples
>> and
>> >> replay of batches happen in exact same order before the failure.
>> >>
>> >> Having this paradigm, if downstream system process data for a given
>> batch
>> >> for having a given transaction-id , and if during failure if same batch
>> is
>> >> again emitted , you can check if same transaction-id is already
>> processed
>> >> or not and hence can guarantee exact once semantics.
>> >>
>> >> And this can only be achieved in Spark if we use Low Level Kafka
>> consumer
>> >> API to process the offsets. This low level Kafka Consumer (
>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented the
>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the Kafka
>> >> related logic has been taken from Storm-Kafka spout and which manages
>> all
>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
>> managements.
>> >>
>> >> Presently this Consumer maintains that during Receiver failure, it will
>> >> re-emit the exact same Block with same set of messages . Every message
>> have
>> >> the details of its partition, offset and topic related details which can
>> >> tackle the SPARK-3146.
>> >>
>> >> As this Low Level consumer has complete control over the Kafka Offsets ,
>> >> we can implement Trident like feature on top of it like having
>> implement a
>> >> transaction-id for a given block , and re-emit the same block with same
>> set
>> >> of message during Driver failure.
>> >>
>> >> Regards,
>> >> Dibyendu
>> >>
>> >>
>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <saisai.s...@intel.com>
>> >> wrote:
>> >>>
>> >>> Hi all,
>> >>>
>> >>> I agree with Hari that Strong exact-once semantics is very hard to
>> >>> guarantee, especially in the failure situation. From my understanding
>> even
>> >>> current implementation of ReliableKafkaReceiver cannot fully guarantee
>> the
>> >>> exact once semantics once failed, first is the ordering of data
>> replaying
>> >>> from last checkpoint, this is hard to guarantee when multiple
>> partitions
>> >>> are injected in; second is the design complexity of achieving this,
>> you can
>> >>> refer to the Kafka Spout in Trident, we have to dig into the very
>> details
>> >>> of Kafka metadata management system to achieve this, not to say
>> rebalance
>> >>> and fault-tolerance.
>> >>>
>> >>> Thanks
>> >>> Jerry
>> >>>
>> >>> -----Original Message-----
>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
>> >>> Sent: Friday, December 19, 2014 5:57 AM
>> >>> To: Cody Koeninger
>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>> >>> Subject: Re: Which committers care about Kafka?
>> >>>
>> >>> But idempotency is not that easy t achieve sometimes. A strong only
>> once
>> >>> semantic through a proper API would  be superuseful; but I'm not
>> implying
>> >>> this is easy to achieve.
>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <c...@koeninger.org> wrote:
>> >>>
>> >>>> If the downstream store for the output data is idempotent or
>> >>>> transactional, and that downstream store also is the system of record
>> >>>> for kafka offsets, then you have exactly-once semantics.  Commit
>> >>>> offsets with / after the data is stored.  On any failure, restart from
>> >>> the last committed offsets.
>> >>>>
>> >>>> Yes, this approach is biased towards the etl-like use cases rather
>> >>>> than near-realtime-analytics use cases.
>> >>>>
>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>> >>>> hshreedha...@cloudera.com
>> >>>>> wrote:
>> >>>>>
>> >>>>> I get what you are saying. But getting exactly once right is an
>> >>>>> extremely hard problem - especially in presence of failure. The
>> >>>>> issue is failures
>> >>>> can
>> >>>>> happen in a bunch of places. For example, before the notification of
>> >>>>> downstream store being successful reaches the receiver that updates
>> >>>>> the offsets, the node fails. The store was successful, but
>> >>>>> duplicates came in either way. This is something worth discussing by
>> >>>>> itself - but without uuids etc this might not really be solved even
>> >>> when you think it is.
>> >>>>>
>> >>>>> Anyway, I will look at the links. Even I am interested in all of the
>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
>> >>>>> delivery,
>> >>>> but
>> >>>>> I doubt the latter is really possible to guarantee - though I really
>> >>>> would
>> >>>>> love to have that!
>> >>>>>
>> >>>>> Thanks,
>> >>>>> Hari
>> >>>>>
>> >>>>>
>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>> >>>>> <c...@koeninger.org>
>> >>>>> wrote:
>> >>>>>
>> >>>>>> Thanks for the replies.
>> >>>>>>
>> >>>>>> Regarding skipping WAL, it's not just about optimization.  If you
>> >>>>>> actually want exactly-once semantics, you need control of kafka
>> >>>>>> offsets
>> >>>> as
>> >>>>>> well, including the ability to not use zookeeper as the system of
>> >>>>>> record for offsets.  Kafka already is a reliable system that has
>> >>>>>> strong
>> >>>> ordering
>> >>>>>> guarantees (within a partition) and does not mandate the use of
>> >>>> zookeeper
>> >>>>>> to store offsets.  I think there should be a spark api that acts as
>> >>>>>> a
>> >>>> very
>> >>>>>> simple intermediary between Kafka and the user's choice of
>> >>>>>> downstream
>> >>>> store.
>> >>>>>>
>> >>>>>> Take a look at the links I posted - if there's already been 2
>> >>>> independent
>> >>>>>> implementations of the idea, chances are it's something people need.
>> >>>>>>
>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>> >>>>>> hshreedha...@cloudera.com> wrote:
>> >>>>>>>
>> >>>>>>> Hi Cody,
>> >>>>>>>
>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
>> >>>>>>> something pretty simple and lightweight for that one.
>> >>>>>>>
>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this is
>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a good
>> >>>> idea to
>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
>> >>>> optimization. For
>> >>>>>>> that reason, we must be careful in implementation. There are a
>> >>>>>>> couple
>> >>>> of
>> >>>>>>> issues that we need to ensure works properly - specifically
>> >>> ordering.
>> >>>> To
>> >>>>>>> ensure we pull messages from different topics and partitions in
>> >>>>>>> the
>> >>>> same
>> >>>>>>> order after failure, we’d still have to persist the metadata to
>> >>>>>>> HDFS
>> >>>> (or
>> >>>>>>> some other system) - this metadata must contain the order of
>> >>>>>>> messages consumed, so we know how to re-read the messages. I am
>> >>>>>>> planning to
>> >>>> explore
>> >>>>>>> this once I have some time (probably in Jan). In addition, we must
>> >>>>>>> also ensure bucketing functions work fine as well. I will file a
>> >>>>>>> placeholder jira for this one.
>> >>>>>>>
>> >>>>>>> I also wrote an API to write data back to Kafka a while back -
>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that this
>> >>>>>>> will get pulled in soon, as this is something I know people want.
>> >>>>>>> I am open
>> >>>> to
>> >>>>>>> feedback on that - anything that I can do to make it better.
>> >>>>>>>
>> >>>>>>> Thanks,
>> >>>>>>> Hari
>> >>>>>>>
>> >>>>>>>
>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>> >>>>>>> <pwend...@gmail.com>
>> >>>>>>> wrote:
>> >>>>>>>
>> >>>>>>>> Hey Cody,
>> >>>>>>>>
>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is TD -
>> >>>>>>>> he is traveling this week though so I can respond a bit. To the
>> >>>>>>>> high level point of whether Kafka is important - it definitely
>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for
>> >>>>>>>> Kafka is something we generally want in Spark and not a library.
>> >>>>>>>> In some cases IIRC there were user libraries that used unstable
>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to stabilize
>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be able
>> >>>>>>>> to use newer Kakfa versions. This is a high level impression only
>> >>>>>>>> though, I haven't talked to TD about this recently so it's worth
>> >>> revisiting given the developments in Kafka.
>> >>>>>>>>
>> >>>>>>>> Please do bring things up like this on the dev list if there are
>> >>>>>>>> blockers for your usage - thanks for pinging it.
>> >>>>>>>>
>> >>>>>>>> - Patrick
>> >>>>>>>>
>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>> >>>>>>>> <c...@koeninger.org>
>> >>>>>>>> wrote:
>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get
>> >>>>>>>>> some long-standing Kafka related issues resolved?
>> >>>>>>>>>
>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for our
>> >>>>>>>> production
>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because I've
>> >>>>>>>>> seen several different patches and libraries to fix the same
>> >>>>>>>>> things we've
>> >>>>>>>> been
>> >>>>>>>>> running into.
>> >>>>>>>>>
>> >>>>>>>>> Regarding flexibility
>> >>>>>>>>>
>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>> >>>>>>>>>
>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of
>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
>> >>>>>>>>> ourselves, then found
>> >>>>>>>> that
>> >>>>>>>>> PR and have been running it in production. We wouldn't be able
>> >>>>>>>>> to
>> >>>> get
>> >>>>>>>> our
>> >>>>>>>>> jobs done without it. It also allows users to solve a whole
>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary
>> >>>>>>>>> delay of
>> >>>>>>>> messages, etc).
>> >>>>>>>>>
>> >>>>>>>>> Regarding safety, I understand the motivation behind
>> >>>>>>>>> WriteAheadLog
>> >>>> as
>> >>>>>>>> a
>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka
>> >>>>>>>>> already
>> >>>>>>>> is a
>> >>>>>>>>> reliable source. I think there's a need for an api that treats
>> >>>>>>>>> it as such. Even aside from the performance issues of
>> >>>>>>>>> duplicating the write-ahead log in kafka into another
>> >>>>>>>>> write-ahead log in hdfs, I
>> >>>> need
>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
>> >>>>>>>>> failures
>> >>>> that
>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for instance).
>> >>>>>>>>>
>> >>>>>>>>> I've got an implementation i've been using
>> >>>>>>>>>
>> >>>>>>>>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>> >>>>>>>>>
>> >>>>>>>>> Tresata has something similar at
>> >>>>>>>> https://github.com/tresata/spark-kafka,
>> >>>>>>>>> and I know there were earlier attempts based on Storm code.
>> >>>>>>>>>
>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries rather
>> >>>>>>>>> than
>> >>>>>>>> patches
>> >>>>>>>>> to Spark is problematic, because large portions of the
>> >>>> implementation
>> >>>>>>>> are
>> >>>>>>>>> private[spark].
>> >>>>>>>>>
>> >>>>>>>>> I'd like to help, but i need to know whose attention to get.
>> >>>>>>>>
>> >>>>>>>> -----------------------------------------------------------------
>> >>>>>>>> ---- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For
>> >>>>>>>> additional commands, e-mail: dev-h...@spark.apache.org
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>
>> >>>
>> >>
>>
>>

Reply via email to