After a long talk with Patrick and TD (thanks guys), I opened the following
jira

https://issues.apache.org/jira/browse/SPARK-4964

Sample PR has an impementation for the batch and the dstream case, and a
link to a project with example usage.

On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com> wrote:

> yup, we at tresata do the idempotent store the same way. very simple
> approach.
>
> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>>
>> That KafkaRDD code is dead simple.
>>
>> Given a user specified map
>>
>> (topic1, partition0) -> (startingOffset, endingOffset)
>> (topic1, partition1) -> (startingOffset, endingOffset)
>> ...
>> turn each one of those entries into a partition of an rdd, using the
>> simple
>> consumer.
>> That's it.  No recovery logic, no state, nothing - for any failures, bail
>> on the rdd and let it retry.
>> Spark stays out of the business of being a distributed database.
>>
>> The client code does any transformation it wants, then stores the data and
>> offsets.  There are two ways of doing this, either based on idempotence or
>> a transactional data store.
>>
>> For idempotent stores:
>>
>> 1.manipulate data
>> 2.save data to store
>> 3.save ending offsets to the same store
>>
>> If you fail between 2 and 3, the offsets haven't been stored, you start
>> again at the same beginning offsets, do the same calculations in the same
>> order, overwrite the same data, all is good.
>>
>>
>> For transactional stores:
>>
>> 1. manipulate data
>> 2. begin transaction
>> 3. save data to the store
>> 4. save offsets
>> 5. commit transaction
>>
>> If you fail before 5, the transaction rolls back.  To make this less
>> heavyweight, you can write the data outside the transaction and then
>> update
>> a pointer to the current data inside the transaction.
>>
>>
>> Again, spark has nothing much to do with guaranteeing exactly once.  In
>> fact, the current streaming api actively impedes my ability to do the
>> above.  I'm just suggesting providing an api that doesn't get in the way
>> of
>> exactly-once.
>>
>>
>>
>>
>>
>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan <
>> hshreedha...@cloudera.com
>> > wrote:
>>
>> > Can you explain your basic algorithm for the once-only-delivery? It is
>> > quite a bit of very Kafka-specific code, that would take more time to
>> read
>> > than I can currently afford? If you can explain your algorithm a bit, it
>> > might help.
>> >
>> > Thanks,
>> > Hari
>> >
>> >
>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <c...@koeninger.org>
>> > wrote:
>> >
>> >>
>> >> The problems you guys are discussing come from trying to store state in
>> >> spark, so don't do that.  Spark isn't a distributed database.
>> >>
>> >> Just map kafka partitions directly to rdds, llet user code specify the
>> >> range of offsets explicitly, and let them be in charge of committing
>> >> offsets.
>> >>
>> >> Using the simple consumer isn't that bad, I'm already using this in
>> >> production with the code I linked to, and tresata apparently has been
>> as
>> >> well.  Again, for everyone saying this is impossible, have you read
>> either
>> >> of those implementations and looked at the approach?
>> >>
>> >>
>> >>
>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <
>> >> sean.mcnam...@webtrends.com> wrote:
>> >>
>> >>> Please feel free to correct me if I’m wrong, but I think the exactly
>> >>> once spark streaming semantics can easily be solved using
>> updateStateByKey.
>> >>> Make the key going into updateStateByKey be a hash of the event, or
>> pluck
>> >>> off some uuid from the message.  The updateFunc would only emit the
>> message
>> >>> if the key did not exist, and the user has complete control over the
>> window
>> >>> of time / state lifecycle for detecting duplicates.  It also makes it
>> >>> really easy to detect and take action (alert?) when you DO see a
>> duplicate,
>> >>> or make memory tradeoffs within an error bound using a sketch
>> algorithm.
>> >>> The kafka simple consumer is insanely complex, if possible I think it
>> would
>> >>> be better (and vastly more flexible) to get reliability using the
>> >>> primitives that spark so elegantly provides.
>> >>>
>> >>> Cheers,
>> >>>
>> >>> Sean
>> >>>
>> >>>
>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan <
>> >>> hshreedha...@cloudera.com> wrote:
>> >>> >
>> >>> > Hi Dibyendu,
>> >>> >
>> >>> > Thanks for the details on the implementation. But I still do not
>> >>> believe
>> >>> > that it is no duplicates - what they achieve is that the same batch
>> is
>> >>> > processed exactly the same way every time (but see it may be
>> processed
>> >>> more
>> >>> > than once) - so it depends on the operation being idempotent. I
>> believe
>> >>> > Trident uses ZK to keep track of the transactions - a batch can be
>> >>> > processed multiple times in failure scenarios (for example, the
>> >>> transaction
>> >>> > is processed but before ZK is updated the machine fails, causing a
>> >>> "new"
>> >>> > node to process it again).
>> >>> >
>> >>> > I don't think it is impossible to do this in Spark Streaming as well
>> >>> and
>> >>> > I'd be really interested in working on it at some point in the near
>> >>> future.
>> >>> >
>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya <
>> >>> > dibyendu.bhattach...@gmail.com> wrote:
>> >>> >
>> >>> >> Hi,
>> >>> >>
>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The
>> Storm
>> >>> >> Trident has done the exact-once guarantee by processing the tuple
>> in a
>> >>> >> batch  and assigning same transaction-id for a given batch . The
>> >>> replay for
>> >>> >> a given batch with a transaction-id will have exact same set of
>> >>> tuples and
>> >>> >> replay of batches happen in exact same order before the failure.
>> >>> >>
>> >>> >> Having this paradigm, if downstream system process data for a given
>> >>> batch
>> >>> >> for having a given transaction-id , and if during failure if same
>> >>> batch is
>> >>> >> again emitted , you can check if same transaction-id is already
>> >>> processed
>> >>> >> or not and hence can guarantee exact once semantics.
>> >>> >>
>> >>> >> And this can only be achieved in Spark if we use Low Level Kafka
>> >>> consumer
>> >>> >> API to process the offsets. This low level Kafka Consumer (
>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented
>> the
>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the
>> >>> Kafka
>> >>> >> related logic has been taken from Storm-Kafka spout and which
>> manages
>> >>> all
>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata
>> >>> managements.
>> >>> >>
>> >>> >> Presently this Consumer maintains that during Receiver failure, it
>> >>> will
>> >>> >> re-emit the exact same Block with same set of messages . Every
>> >>> message have
>> >>> >> the details of its partition, offset and topic related details
>> which
>> >>> can
>> >>> >> tackle the SPARK-3146.
>> >>> >>
>> >>> >> As this Low Level consumer has complete control over the Kafka
>> >>> Offsets ,
>> >>> >> we can implement Trident like feature on top of it like having
>> >>> implement a
>> >>> >> transaction-id for a given block , and re-emit the same block with
>> >>> same set
>> >>> >> of message during Driver failure.
>> >>> >>
>> >>> >> Regards,
>> >>> >> Dibyendu
>> >>> >>
>> >>> >>
>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <
>> saisai.s...@intel.com>
>> >>> >> wrote:
>> >>> >>>
>> >>> >>> Hi all,
>> >>> >>>
>> >>> >>> I agree with Hari that Strong exact-once semantics is very hard to
>> >>> >>> guarantee, especially in the failure situation. From my
>> >>> understanding even
>> >>> >>> current implementation of ReliableKafkaReceiver cannot fully
>> >>> guarantee the
>> >>> >>> exact once semantics once failed, first is the ordering of data
>> >>> replaying
>> >>> >>> from last checkpoint, this is hard to guarantee when multiple
>> >>> partitions
>> >>> >>> are injected in; second is the design complexity of achieving
>> this,
>> >>> you can
>> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the very
>> >>> details
>> >>> >>> of Kafka metadata management system to achieve this, not to say
>> >>> rebalance
>> >>> >>> and fault-tolerance.
>> >>> >>>
>> >>> >>> Thanks
>> >>> >>> Jerry
>> >>> >>>
>> >>> >>> -----Original Message-----
>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM
>> >>> >>> To: Cody Koeninger
>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
>> >>> >>> Subject: Re: Which committers care about Kafka?
>> >>> >>>
>> >>> >>> But idempotency is not that easy t achieve sometimes. A strong
>> only
>> >>> once
>> >>> >>> semantic through a proper API would  be superuseful; but I'm not
>> >>> implying
>> >>> >>> this is easy to achieve.
>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <c...@koeninger.org>
>> wrote:
>> >>> >>>
>> >>> >>>> If the downstream store for the output data is idempotent or
>> >>> >>>> transactional, and that downstream store also is the system of
>> >>> record
>> >>> >>>> for kafka offsets, then you have exactly-once semantics.  Commit
>> >>> >>>> offsets with / after the data is stored.  On any failure, restart
>> >>> from
>> >>> >>> the last committed offsets.
>> >>> >>>>
>> >>> >>>> Yes, this approach is biased towards the etl-like use cases
>> rather
>> >>> >>>> than near-realtime-analytics use cases.
>> >>> >>>>
>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <
>> >>> >>>> hshreedha...@cloudera.com
>> >>> >>>>> wrote:
>> >>> >>>>>
>> >>> >>>>> I get what you are saying. But getting exactly once right is an
>> >>> >>>>> extremely hard problem - especially in presence of failure. The
>> >>> >>>>> issue is failures
>> >>> >>>> can
>> >>> >>>>> happen in a bunch of places. For example, before the
>> notification
>> >>> of
>> >>> >>>>> downstream store being successful reaches the receiver that
>> updates
>> >>> >>>>> the offsets, the node fails. The store was successful, but
>> >>> >>>>> duplicates came in either way. This is something worth
>> discussing
>> >>> by
>> >>> >>>>> itself - but without uuids etc this might not really be solved
>> even
>> >>> >>> when you think it is.
>> >>> >>>>>
>> >>> >>>>> Anyway, I will look at the links. Even I am interested in all of
>> >>> the
>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only
>> >>> >>>>> delivery,
>> >>> >>>> but
>> >>> >>>>> I doubt the latter is really possible to guarantee - though I
>> >>> really
>> >>> >>>> would
>> >>> >>>>> love to have that!
>> >>> >>>>>
>> >>> >>>>> Thanks,
>> >>> >>>>> Hari
>> >>> >>>>>
>> >>> >>>>>
>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
>> >>> >>>>> <c...@koeninger.org>
>> >>> >>>>> wrote:
>> >>> >>>>>
>> >>> >>>>>> Thanks for the replies.
>> >>> >>>>>>
>> >>> >>>>>> Regarding skipping WAL, it's not just about optimization.  If
>> you
>> >>> >>>>>> actually want exactly-once semantics, you need control of kafka
>> >>> >>>>>> offsets
>> >>> >>>> as
>> >>> >>>>>> well, including the ability to not use zookeeper as the system
>> of
>> >>> >>>>>> record for offsets.  Kafka already is a reliable system that
>> has
>> >>> >>>>>> strong
>> >>> >>>> ordering
>> >>> >>>>>> guarantees (within a partition) and does not mandate the use of
>> >>> >>>> zookeeper
>> >>> >>>>>> to store offsets.  I think there should be a spark api that
>> acts
>> >>> as
>> >>> >>>>>> a
>> >>> >>>> very
>> >>> >>>>>> simple intermediary between Kafka and the user's choice of
>> >>> >>>>>> downstream
>> >>> >>>> store.
>> >>> >>>>>>
>> >>> >>>>>> Take a look at the links I posted - if there's already been 2
>> >>> >>>> independent
>> >>> >>>>>> implementations of the idea, chances are it's something people
>> >>> need.
>> >>> >>>>>>
>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>> >>> >>>>>> hshreedha...@cloudera.com> wrote:
>> >>> >>>>>>>
>> >>> >>>>>>> Hi Cody,
>> >>> >>>>>>>
>> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement
>> >>> >>>>>>> something pretty simple and lightweight for that one.
>> >>> >>>>>>>
>> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this
>> is
>> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a
>> >>> good
>> >>> >>>> idea to
>> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an
>> >>> >>>> optimization. For
>> >>> >>>>>>> that reason, we must be careful in implementation. There are a
>> >>> >>>>>>> couple
>> >>> >>>> of
>> >>> >>>>>>> issues that we need to ensure works properly - specifically
>> >>> >>> ordering.
>> >>> >>>> To
>> >>> >>>>>>> ensure we pull messages from different topics and partitions
>> in
>> >>> >>>>>>> the
>> >>> >>>> same
>> >>> >>>>>>> order after failure, we’d still have to persist the metadata
>> to
>> >>> >>>>>>> HDFS
>> >>> >>>> (or
>> >>> >>>>>>> some other system) - this metadata must contain the order of
>> >>> >>>>>>> messages consumed, so we know how to re-read the messages. I
>> am
>> >>> >>>>>>> planning to
>> >>> >>>> explore
>> >>> >>>>>>> this once I have some time (probably in Jan). In addition, we
>> >>> must
>> >>> >>>>>>> also ensure bucketing functions work fine as well. I will
>> file a
>> >>> >>>>>>> placeholder jira for this one.
>> >>> >>>>>>>
>> >>> >>>>>>> I also wrote an API to write data back to Kafka a while back -
>> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that
>> >>> this
>> >>> >>>>>>> will get pulled in soon, as this is something I know people
>> want.
>> >>> >>>>>>> I am open
>> >>> >>>> to
>> >>> >>>>>>> feedback on that - anything that I can do to make it better.
>> >>> >>>>>>>
>> >>> >>>>>>> Thanks,
>> >>> >>>>>>> Hari
>> >>> >>>>>>>
>> >>> >>>>>>>
>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
>> >>> >>>>>>> <pwend...@gmail.com>
>> >>> >>>>>>> wrote:
>> >>> >>>>>>>
>> >>> >>>>>>>> Hey Cody,
>> >>> >>>>>>>>
>> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is
>> TD -
>> >>> >>>>>>>> he is traveling this week though so I can respond a bit. To
>> the
>> >>> >>>>>>>> high level point of whether Kafka is important - it
>> definitely
>> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments
>> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for
>> >>> >>>>>>>> Kafka is something we generally want in Spark and not a
>> library.
>> >>> >>>>>>>> In some cases IIRC there were user libraries that used
>> unstable
>> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to
>> stabilize
>> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be
>> able
>> >>> >>>>>>>> to use newer Kakfa versions. This is a high level impression
>> >>> only
>> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's
>> worth
>> >>> >>> revisiting given the developments in Kafka.
>> >>> >>>>>>>>
>> >>> >>>>>>>> Please do bring things up like this on the dev list if there
>> are
>> >>> >>>>>>>> blockers for your usage - thanks for pinging it.
>> >>> >>>>>>>>
>> >>> >>>>>>>> - Patrick
>> >>> >>>>>>>>
>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
>> >>> >>>>>>>> <c...@koeninger.org>
>> >>> >>>>>>>> wrote:
>> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get
>> >>> >>>>>>>>> some long-standing Kafka related issues resolved?
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for
>> our
>> >>> >>>>>>>> production
>> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because
>> I've
>> >>> >>>>>>>>> seen several different patches and libraries to fix the same
>> >>> >>>>>>>>> things we've
>> >>> >>>>>>>> been
>> >>> >>>>>>>>> running into.
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> Regarding flexibility
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of
>> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch
>> >>> >>>>>>>>> ourselves, then found
>> >>> >>>>>>>> that
>> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be
>> able
>> >>> >>>>>>>>> to
>> >>> >>>> get
>> >>> >>>>>>>> our
>> >>> >>>>>>>>> jobs done without it. It also allows users to solve a whole
>> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary
>> >>> >>>>>>>>> delay of
>> >>> >>>>>>>> messages, etc).
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> Regarding safety, I understand the motivation behind
>> >>> >>>>>>>>> WriteAheadLog
>> >>> >>>> as
>> >>> >>>>>>>> a
>> >>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka
>> >>> >>>>>>>>> already
>> >>> >>>>>>>> is a
>> >>> >>>>>>>>> reliable source. I think there's a need for an api that
>> treats
>> >>> >>>>>>>>> it as such. Even aside from the performance issues of
>> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another
>> >>> >>>>>>>>> write-ahead log in hdfs, I
>> >>> >>>> need
>> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had
>> >>> >>>>>>>>> failures
>> >>> >>>> that
>> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for
>> >>> instance).
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> I've got an implementation i've been using
>> >>> >>>>>>>>>
>> >>> >>>>>>>>>
>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf
>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> Tresata has something similar at
>> >>> >>>>>>>> https://github.com/tresata/spark-kafka,
>> >>> >>>>>>>>> and I know there were earlier attempts based on Storm code.
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries
>> rather
>> >>> >>>>>>>>> than
>> >>> >>>>>>>> patches
>> >>> >>>>>>>>> to Spark is problematic, because large portions of the
>> >>> >>>> implementation
>> >>> >>>>>>>> are
>> >>> >>>>>>>>> private[spark].
>> >>> >>>>>>>>>
>> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to get.
>> >>> >>>>>>>>
>> >>> >>>>>>>>
>> >>> -----------------------------------------------------------------
>> >>> >>>>>>>> ---- To unsubscribe, e-mail:
>> dev-unsubscr...@spark.apache.org
>> >>> For
>> >>> >>>>>>>> additional commands, e-mail: dev-h...@spark.apache.org
>> >>> >>>>>>>>
>> >>> >>>>>>>>
>> >>> >>>>>>>
>> >>> >>>>>
>> >>> >>>>
>> >>> >>>
>> >>> >>
>> >>>
>> >>>
>> >>
>> >
>>
>

Reply via email to