yup, we at tresata do the idempotent store the same way. very simple approach.
On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <c...@koeninger.org> wrote: > > That KafkaRDD code is dead simple. > > Given a user specified map > > (topic1, partition0) -> (startingOffset, endingOffset) > (topic1, partition1) -> (startingOffset, endingOffset) > ... > turn each one of those entries into a partition of an rdd, using the simple > consumer. > That's it. No recovery logic, no state, nothing - for any failures, bail > on the rdd and let it retry. > Spark stays out of the business of being a distributed database. > > The client code does any transformation it wants, then stores the data and > offsets. There are two ways of doing this, either based on idempotence or > a transactional data store. > > For idempotent stores: > > 1.manipulate data > 2.save data to store > 3.save ending offsets to the same store > > If you fail between 2 and 3, the offsets haven't been stored, you start > again at the same beginning offsets, do the same calculations in the same > order, overwrite the same data, all is good. > > > For transactional stores: > > 1. manipulate data > 2. begin transaction > 3. save data to the store > 4. save offsets > 5. commit transaction > > If you fail before 5, the transaction rolls back. To make this less > heavyweight, you can write the data outside the transaction and then update > a pointer to the current data inside the transaction. > > > Again, spark has nothing much to do with guaranteeing exactly once. In > fact, the current streaming api actively impedes my ability to do the > above. I'm just suggesting providing an api that doesn't get in the way of > exactly-once. > > > > > > On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan < > hshreedha...@cloudera.com > > wrote: > > > Can you explain your basic algorithm for the once-only-delivery? It is > > quite a bit of very Kafka-specific code, that would take more time to > read > > than I can currently afford? If you can explain your algorithm a bit, it > > might help. > > > > Thanks, > > Hari > > > > > > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <c...@koeninger.org> > > wrote: > > > >> > >> The problems you guys are discussing come from trying to store state in > >> spark, so don't do that. Spark isn't a distributed database. > >> > >> Just map kafka partitions directly to rdds, llet user code specify the > >> range of offsets explicitly, and let them be in charge of committing > >> offsets. > >> > >> Using the simple consumer isn't that bad, I'm already using this in > >> production with the code I linked to, and tresata apparently has been as > >> well. Again, for everyone saying this is impossible, have you read > either > >> of those implementations and looked at the approach? > >> > >> > >> > >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara < > >> sean.mcnam...@webtrends.com> wrote: > >> > >>> Please feel free to correct me if I’m wrong, but I think the exactly > >>> once spark streaming semantics can easily be solved using > updateStateByKey. > >>> Make the key going into updateStateByKey be a hash of the event, or > pluck > >>> off some uuid from the message. The updateFunc would only emit the > message > >>> if the key did not exist, and the user has complete control over the > window > >>> of time / state lifecycle for detecting duplicates. It also makes it > >>> really easy to detect and take action (alert?) when you DO see a > duplicate, > >>> or make memory tradeoffs within an error bound using a sketch > algorithm. > >>> The kafka simple consumer is insanely complex, if possible I think it > would > >>> be better (and vastly more flexible) to get reliability using the > >>> primitives that spark so elegantly provides. > >>> > >>> Cheers, > >>> > >>> Sean > >>> > >>> > >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan < > >>> hshreedha...@cloudera.com> wrote: > >>> > > >>> > Hi Dibyendu, > >>> > > >>> > Thanks for the details on the implementation. But I still do not > >>> believe > >>> > that it is no duplicates - what they achieve is that the same batch > is > >>> > processed exactly the same way every time (but see it may be > processed > >>> more > >>> > than once) - so it depends on the operation being idempotent. I > believe > >>> > Trident uses ZK to keep track of the transactions - a batch can be > >>> > processed multiple times in failure scenarios (for example, the > >>> transaction > >>> > is processed but before ZK is updated the machine fails, causing a > >>> "new" > >>> > node to process it again). > >>> > > >>> > I don't think it is impossible to do this in Spark Streaming as well > >>> and > >>> > I'd be really interested in working on it at some point in the near > >>> future. > >>> > > >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya < > >>> > dibyendu.bhattach...@gmail.com> wrote: > >>> > > >>> >> Hi, > >>> >> > >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The > Storm > >>> >> Trident has done the exact-once guarantee by processing the tuple > in a > >>> >> batch and assigning same transaction-id for a given batch . The > >>> replay for > >>> >> a given batch with a transaction-id will have exact same set of > >>> tuples and > >>> >> replay of batches happen in exact same order before the failure. > >>> >> > >>> >> Having this paradigm, if downstream system process data for a given > >>> batch > >>> >> for having a given transaction-id , and if during failure if same > >>> batch is > >>> >> again emitted , you can check if same transaction-id is already > >>> processed > >>> >> or not and hence can guarantee exact once semantics. > >>> >> > >>> >> And this can only be achieved in Spark if we use Low Level Kafka > >>> consumer > >>> >> API to process the offsets. This low level Kafka Consumer ( > >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented > the > >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the > >>> Kafka > >>> >> related logic has been taken from Storm-Kafka spout and which > manages > >>> all > >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata > >>> managements. > >>> >> > >>> >> Presently this Consumer maintains that during Receiver failure, it > >>> will > >>> >> re-emit the exact same Block with same set of messages . Every > >>> message have > >>> >> the details of its partition, offset and topic related details which > >>> can > >>> >> tackle the SPARK-3146. > >>> >> > >>> >> As this Low Level consumer has complete control over the Kafka > >>> Offsets , > >>> >> we can implement Trident like feature on top of it like having > >>> implement a > >>> >> transaction-id for a given block , and re-emit the same block with > >>> same set > >>> >> of message during Driver failure. > >>> >> > >>> >> Regards, > >>> >> Dibyendu > >>> >> > >>> >> > >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai < > saisai.s...@intel.com> > >>> >> wrote: > >>> >>> > >>> >>> Hi all, > >>> >>> > >>> >>> I agree with Hari that Strong exact-once semantics is very hard to > >>> >>> guarantee, especially in the failure situation. From my > >>> understanding even > >>> >>> current implementation of ReliableKafkaReceiver cannot fully > >>> guarantee the > >>> >>> exact once semantics once failed, first is the ordering of data > >>> replaying > >>> >>> from last checkpoint, this is hard to guarantee when multiple > >>> partitions > >>> >>> are injected in; second is the design complexity of achieving this, > >>> you can > >>> >>> refer to the Kafka Spout in Trident, we have to dig into the very > >>> details > >>> >>> of Kafka metadata management system to achieve this, not to say > >>> rebalance > >>> >>> and fault-tolerance. > >>> >>> > >>> >>> Thanks > >>> >>> Jerry > >>> >>> > >>> >>> -----Original Message----- > >>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] > >>> >>> Sent: Friday, December 19, 2014 5:57 AM > >>> >>> To: Cody Koeninger > >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org > >>> >>> Subject: Re: Which committers care about Kafka? > >>> >>> > >>> >>> But idempotency is not that easy t achieve sometimes. A strong only > >>> once > >>> >>> semantic through a proper API would be superuseful; but I'm not > >>> implying > >>> >>> this is easy to achieve. > >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <c...@koeninger.org> wrote: > >>> >>> > >>> >>>> If the downstream store for the output data is idempotent or > >>> >>>> transactional, and that downstream store also is the system of > >>> record > >>> >>>> for kafka offsets, then you have exactly-once semantics. Commit > >>> >>>> offsets with / after the data is stored. On any failure, restart > >>> from > >>> >>> the last committed offsets. > >>> >>>> > >>> >>>> Yes, this approach is biased towards the etl-like use cases rather > >>> >>>> than near-realtime-analytics use cases. > >>> >>>> > >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan < > >>> >>>> hshreedha...@cloudera.com > >>> >>>>> wrote: > >>> >>>>> > >>> >>>>> I get what you are saying. But getting exactly once right is an > >>> >>>>> extremely hard problem - especially in presence of failure. The > >>> >>>>> issue is failures > >>> >>>> can > >>> >>>>> happen in a bunch of places. For example, before the notification > >>> of > >>> >>>>> downstream store being successful reaches the receiver that > updates > >>> >>>>> the offsets, the node fails. The store was successful, but > >>> >>>>> duplicates came in either way. This is something worth discussing > >>> by > >>> >>>>> itself - but without uuids etc this might not really be solved > even > >>> >>> when you think it is. > >>> >>>>> > >>> >>>>> Anyway, I will look at the links. Even I am interested in all of > >>> the > >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only > >>> >>>>> delivery, > >>> >>>> but > >>> >>>>> I doubt the latter is really possible to guarantee - though I > >>> really > >>> >>>> would > >>> >>>>> love to have that! > >>> >>>>> > >>> >>>>> Thanks, > >>> >>>>> Hari > >>> >>>>> > >>> >>>>> > >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger > >>> >>>>> <c...@koeninger.org> > >>> >>>>> wrote: > >>> >>>>> > >>> >>>>>> Thanks for the replies. > >>> >>>>>> > >>> >>>>>> Regarding skipping WAL, it's not just about optimization. If > you > >>> >>>>>> actually want exactly-once semantics, you need control of kafka > >>> >>>>>> offsets > >>> >>>> as > >>> >>>>>> well, including the ability to not use zookeeper as the system > of > >>> >>>>>> record for offsets. Kafka already is a reliable system that has > >>> >>>>>> strong > >>> >>>> ordering > >>> >>>>>> guarantees (within a partition) and does not mandate the use of > >>> >>>> zookeeper > >>> >>>>>> to store offsets. I think there should be a spark api that acts > >>> as > >>> >>>>>> a > >>> >>>> very > >>> >>>>>> simple intermediary between Kafka and the user's choice of > >>> >>>>>> downstream > >>> >>>> store. > >>> >>>>>> > >>> >>>>>> Take a look at the links I posted - if there's already been 2 > >>> >>>> independent > >>> >>>>>> implementations of the idea, chances are it's something people > >>> need. > >>> >>>>>> > >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan < > >>> >>>>>> hshreedha...@cloudera.com> wrote: > >>> >>>>>>> > >>> >>>>>>> Hi Cody, > >>> >>>>>>> > >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement > >>> >>>>>>> something pretty simple and lightweight for that one. > >>> >>>>>>> > >>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this is > >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a > >>> good > >>> >>>> idea to > >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an > >>> >>>> optimization. For > >>> >>>>>>> that reason, we must be careful in implementation. There are a > >>> >>>>>>> couple > >>> >>>> of > >>> >>>>>>> issues that we need to ensure works properly - specifically > >>> >>> ordering. > >>> >>>> To > >>> >>>>>>> ensure we pull messages from different topics and partitions in > >>> >>>>>>> the > >>> >>>> same > >>> >>>>>>> order after failure, we’d still have to persist the metadata to > >>> >>>>>>> HDFS > >>> >>>> (or > >>> >>>>>>> some other system) - this metadata must contain the order of > >>> >>>>>>> messages consumed, so we know how to re-read the messages. I am > >>> >>>>>>> planning to > >>> >>>> explore > >>> >>>>>>> this once I have some time (probably in Jan). In addition, we > >>> must > >>> >>>>>>> also ensure bucketing functions work fine as well. I will file > a > >>> >>>>>>> placeholder jira for this one. > >>> >>>>>>> > >>> >>>>>>> I also wrote an API to write data back to Kafka a while back - > >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that > >>> this > >>> >>>>>>> will get pulled in soon, as this is something I know people > want. > >>> >>>>>>> I am open > >>> >>>> to > >>> >>>>>>> feedback on that - anything that I can do to make it better. > >>> >>>>>>> > >>> >>>>>>> Thanks, > >>> >>>>>>> Hari > >>> >>>>>>> > >>> >>>>>>> > >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell > >>> >>>>>>> <pwend...@gmail.com> > >>> >>>>>>> wrote: > >>> >>>>>>> > >>> >>>>>>>> Hey Cody, > >>> >>>>>>>> > >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is > TD - > >>> >>>>>>>> he is traveling this week though so I can respond a bit. To > the > >>> >>>>>>>> high level point of whether Kafka is important - it definitely > >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments > >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for > >>> >>>>>>>> Kafka is something we generally want in Spark and not a > library. > >>> >>>>>>>> In some cases IIRC there were user libraries that used > unstable > >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to stabilize > >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be > able > >>> >>>>>>>> to use newer Kakfa versions. This is a high level impression > >>> only > >>> >>>>>>>> though, I haven't talked to TD about this recently so it's > worth > >>> >>> revisiting given the developments in Kafka. > >>> >>>>>>>> > >>> >>>>>>>> Please do bring things up like this on the dev list if there > are > >>> >>>>>>>> blockers for your usage - thanks for pinging it. > >>> >>>>>>>> > >>> >>>>>>>> - Patrick > >>> >>>>>>>> > >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger > >>> >>>>>>>> <c...@koeninger.org> > >>> >>>>>>>> wrote: > >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get > >>> >>>>>>>>> some long-standing Kafka related issues resolved? > >>> >>>>>>>>> > >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for > our > >>> >>>>>>>> production > >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because > I've > >>> >>>>>>>>> seen several different patches and libraries to fix the same > >>> >>>>>>>>> things we've > >>> >>>>>>>> been > >>> >>>>>>>>> running into. > >>> >>>>>>>>> > >>> >>>>>>>>> Regarding flexibility > >>> >>>>>>>>> > >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146 > >>> >>>>>>>>> > >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of > >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch > >>> >>>>>>>>> ourselves, then found > >>> >>>>>>>> that > >>> >>>>>>>>> PR and have been running it in production. We wouldn't be > able > >>> >>>>>>>>> to > >>> >>>> get > >>> >>>>>>>> our > >>> >>>>>>>>> jobs done without it. It also allows users to solve a whole > >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary > >>> >>>>>>>>> delay of > >>> >>>>>>>> messages, etc). > >>> >>>>>>>>> > >>> >>>>>>>>> Regarding safety, I understand the motivation behind > >>> >>>>>>>>> WriteAheadLog > >>> >>>> as > >>> >>>>>>>> a > >>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka > >>> >>>>>>>>> already > >>> >>>>>>>> is a > >>> >>>>>>>>> reliable source. I think there's a need for an api that > treats > >>> >>>>>>>>> it as such. Even aside from the performance issues of > >>> >>>>>>>>> duplicating the write-ahead log in kafka into another > >>> >>>>>>>>> write-ahead log in hdfs, I > >>> >>>> need > >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had > >>> >>>>>>>>> failures > >>> >>>> that > >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for > >>> instance). > >>> >>>>>>>>> > >>> >>>>>>>>> I've got an implementation i've been using > >>> >>>>>>>>> > >>> >>>>>>>>> > >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf > >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka > >>> >>>>>>>>> > >>> >>>>>>>>> Tresata has something similar at > >>> >>>>>>>> https://github.com/tresata/spark-kafka, > >>> >>>>>>>>> and I know there were earlier attempts based on Storm code. > >>> >>>>>>>>> > >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries rather > >>> >>>>>>>>> than > >>> >>>>>>>> patches > >>> >>>>>>>>> to Spark is problematic, because large portions of the > >>> >>>> implementation > >>> >>>>>>>> are > >>> >>>>>>>>> private[spark]. > >>> >>>>>>>>> > >>> >>>>>>>>> I'd like to help, but i need to know whose attention to get. > >>> >>>>>>>> > >>> >>>>>>>> > >>> ----------------------------------------------------------------- > >>> >>>>>>>> ---- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > >>> For > >>> >>>>>>>> additional commands, e-mail: dev-h...@spark.apache.org > >>> >>>>>>>> > >>> >>>>>>>> > >>> >>>>>>> > >>> >>>>> > >>> >>>> > >>> >>> > >>> >> > >>> > >>> > >> > > >