After a long talk with Patrick and TD (thanks guys), I opened the following jira
https://issues.apache.org/jira/browse/SPARK-4964 Sample PR has an impementation for the batch and the dstream case, and a link to a project with example usage. On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com> wrote: > yup, we at tresata do the idempotent store the same way. very simple > approach. > > On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <c...@koeninger.org> > wrote: >> >> That KafkaRDD code is dead simple. >> >> Given a user specified map >> >> (topic1, partition0) -> (startingOffset, endingOffset) >> (topic1, partition1) -> (startingOffset, endingOffset) >> ... >> turn each one of those entries into a partition of an rdd, using the >> simple >> consumer. >> That's it. No recovery logic, no state, nothing - for any failures, bail >> on the rdd and let it retry. >> Spark stays out of the business of being a distributed database. >> >> The client code does any transformation it wants, then stores the data and >> offsets. There are two ways of doing this, either based on idempotence or >> a transactional data store. >> >> For idempotent stores: >> >> 1.manipulate data >> 2.save data to store >> 3.save ending offsets to the same store >> >> If you fail between 2 and 3, the offsets haven't been stored, you start >> again at the same beginning offsets, do the same calculations in the same >> order, overwrite the same data, all is good. >> >> >> For transactional stores: >> >> 1. manipulate data >> 2. begin transaction >> 3. save data to the store >> 4. save offsets >> 5. commit transaction >> >> If you fail before 5, the transaction rolls back. To make this less >> heavyweight, you can write the data outside the transaction and then >> update >> a pointer to the current data inside the transaction. >> >> >> Again, spark has nothing much to do with guaranteeing exactly once. In >> fact, the current streaming api actively impedes my ability to do the >> above. I'm just suggesting providing an api that doesn't get in the way >> of >> exactly-once. >> >> >> >> >> >> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan < >> hshreedha...@cloudera.com >> > wrote: >> >> > Can you explain your basic algorithm for the once-only-delivery? It is >> > quite a bit of very Kafka-specific code, that would take more time to >> read >> > than I can currently afford? If you can explain your algorithm a bit, it >> > might help. >> > >> > Thanks, >> > Hari >> > >> > >> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <c...@koeninger.org> >> > wrote: >> > >> >> >> >> The problems you guys are discussing come from trying to store state in >> >> spark, so don't do that. Spark isn't a distributed database. >> >> >> >> Just map kafka partitions directly to rdds, llet user code specify the >> >> range of offsets explicitly, and let them be in charge of committing >> >> offsets. >> >> >> >> Using the simple consumer isn't that bad, I'm already using this in >> >> production with the code I linked to, and tresata apparently has been >> as >> >> well. Again, for everyone saying this is impossible, have you read >> either >> >> of those implementations and looked at the approach? >> >> >> >> >> >> >> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara < >> >> sean.mcnam...@webtrends.com> wrote: >> >> >> >>> Please feel free to correct me if I’m wrong, but I think the exactly >> >>> once spark streaming semantics can easily be solved using >> updateStateByKey. >> >>> Make the key going into updateStateByKey be a hash of the event, or >> pluck >> >>> off some uuid from the message. The updateFunc would only emit the >> message >> >>> if the key did not exist, and the user has complete control over the >> window >> >>> of time / state lifecycle for detecting duplicates. It also makes it >> >>> really easy to detect and take action (alert?) when you DO see a >> duplicate, >> >>> or make memory tradeoffs within an error bound using a sketch >> algorithm. >> >>> The kafka simple consumer is insanely complex, if possible I think it >> would >> >>> be better (and vastly more flexible) to get reliability using the >> >>> primitives that spark so elegantly provides. >> >>> >> >>> Cheers, >> >>> >> >>> Sean >> >>> >> >>> >> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan < >> >>> hshreedha...@cloudera.com> wrote: >> >>> > >> >>> > Hi Dibyendu, >> >>> > >> >>> > Thanks for the details on the implementation. But I still do not >> >>> believe >> >>> > that it is no duplicates - what they achieve is that the same batch >> is >> >>> > processed exactly the same way every time (but see it may be >> processed >> >>> more >> >>> > than once) - so it depends on the operation being idempotent. I >> believe >> >>> > Trident uses ZK to keep track of the transactions - a batch can be >> >>> > processed multiple times in failure scenarios (for example, the >> >>> transaction >> >>> > is processed but before ZK is updated the machine fails, causing a >> >>> "new" >> >>> > node to process it again). >> >>> > >> >>> > I don't think it is impossible to do this in Spark Streaming as well >> >>> and >> >>> > I'd be really interested in working on it at some point in the near >> >>> future. >> >>> > >> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya < >> >>> > dibyendu.bhattach...@gmail.com> wrote: >> >>> > >> >>> >> Hi, >> >>> >> >> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The >> Storm >> >>> >> Trident has done the exact-once guarantee by processing the tuple >> in a >> >>> >> batch and assigning same transaction-id for a given batch . The >> >>> replay for >> >>> >> a given batch with a transaction-id will have exact same set of >> >>> tuples and >> >>> >> replay of batches happen in exact same order before the failure. >> >>> >> >> >>> >> Having this paradigm, if downstream system process data for a given >> >>> batch >> >>> >> for having a given transaction-id , and if during failure if same >> >>> batch is >> >>> >> again emitted , you can check if same transaction-id is already >> >>> processed >> >>> >> or not and hence can guarantee exact once semantics. >> >>> >> >> >>> >> And this can only be achieved in Spark if we use Low Level Kafka >> >>> consumer >> >>> >> API to process the offsets. This low level Kafka Consumer ( >> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented >> the >> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the >> >>> Kafka >> >>> >> related logic has been taken from Storm-Kafka spout and which >> manages >> >>> all >> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata >> >>> managements. >> >>> >> >> >>> >> Presently this Consumer maintains that during Receiver failure, it >> >>> will >> >>> >> re-emit the exact same Block with same set of messages . Every >> >>> message have >> >>> >> the details of its partition, offset and topic related details >> which >> >>> can >> >>> >> tackle the SPARK-3146. >> >>> >> >> >>> >> As this Low Level consumer has complete control over the Kafka >> >>> Offsets , >> >>> >> we can implement Trident like feature on top of it like having >> >>> implement a >> >>> >> transaction-id for a given block , and re-emit the same block with >> >>> same set >> >>> >> of message during Driver failure. >> >>> >> >> >>> >> Regards, >> >>> >> Dibyendu >> >>> >> >> >>> >> >> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai < >> saisai.s...@intel.com> >> >>> >> wrote: >> >>> >>> >> >>> >>> Hi all, >> >>> >>> >> >>> >>> I agree with Hari that Strong exact-once semantics is very hard to >> >>> >>> guarantee, especially in the failure situation. From my >> >>> understanding even >> >>> >>> current implementation of ReliableKafkaReceiver cannot fully >> >>> guarantee the >> >>> >>> exact once semantics once failed, first is the ordering of data >> >>> replaying >> >>> >>> from last checkpoint, this is hard to guarantee when multiple >> >>> partitions >> >>> >>> are injected in; second is the design complexity of achieving >> this, >> >>> you can >> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the very >> >>> details >> >>> >>> of Kafka metadata management system to achieve this, not to say >> >>> rebalance >> >>> >>> and fault-tolerance. >> >>> >>> >> >>> >>> Thanks >> >>> >>> Jerry >> >>> >>> >> >>> >>> -----Original Message----- >> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] >> >>> >>> Sent: Friday, December 19, 2014 5:57 AM >> >>> >>> To: Cody Koeninger >> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org >> >>> >>> Subject: Re: Which committers care about Kafka? >> >>> >>> >> >>> >>> But idempotency is not that easy t achieve sometimes. A strong >> only >> >>> once >> >>> >>> semantic through a proper API would be superuseful; but I'm not >> >>> implying >> >>> >>> this is easy to achieve. >> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <c...@koeninger.org> >> wrote: >> >>> >>> >> >>> >>>> If the downstream store for the output data is idempotent or >> >>> >>>> transactional, and that downstream store also is the system of >> >>> record >> >>> >>>> for kafka offsets, then you have exactly-once semantics. Commit >> >>> >>>> offsets with / after the data is stored. On any failure, restart >> >>> from >> >>> >>> the last committed offsets. >> >>> >>>> >> >>> >>>> Yes, this approach is biased towards the etl-like use cases >> rather >> >>> >>>> than near-realtime-analytics use cases. >> >>> >>>> >> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan < >> >>> >>>> hshreedha...@cloudera.com >> >>> >>>>> wrote: >> >>> >>>>> >> >>> >>>>> I get what you are saying. But getting exactly once right is an >> >>> >>>>> extremely hard problem - especially in presence of failure. The >> >>> >>>>> issue is failures >> >>> >>>> can >> >>> >>>>> happen in a bunch of places. For example, before the >> notification >> >>> of >> >>> >>>>> downstream store being successful reaches the receiver that >> updates >> >>> >>>>> the offsets, the node fails. The store was successful, but >> >>> >>>>> duplicates came in either way. This is something worth >> discussing >> >>> by >> >>> >>>>> itself - but without uuids etc this might not really be solved >> even >> >>> >>> when you think it is. >> >>> >>>>> >> >>> >>>>> Anyway, I will look at the links. Even I am interested in all of >> >>> the >> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only >> >>> >>>>> delivery, >> >>> >>>> but >> >>> >>>>> I doubt the latter is really possible to guarantee - though I >> >>> really >> >>> >>>> would >> >>> >>>>> love to have that! >> >>> >>>>> >> >>> >>>>> Thanks, >> >>> >>>>> Hari >> >>> >>>>> >> >>> >>>>> >> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger >> >>> >>>>> <c...@koeninger.org> >> >>> >>>>> wrote: >> >>> >>>>> >> >>> >>>>>> Thanks for the replies. >> >>> >>>>>> >> >>> >>>>>> Regarding skipping WAL, it's not just about optimization. If >> you >> >>> >>>>>> actually want exactly-once semantics, you need control of kafka >> >>> >>>>>> offsets >> >>> >>>> as >> >>> >>>>>> well, including the ability to not use zookeeper as the system >> of >> >>> >>>>>> record for offsets. Kafka already is a reliable system that >> has >> >>> >>>>>> strong >> >>> >>>> ordering >> >>> >>>>>> guarantees (within a partition) and does not mandate the use of >> >>> >>>> zookeeper >> >>> >>>>>> to store offsets. I think there should be a spark api that >> acts >> >>> as >> >>> >>>>>> a >> >>> >>>> very >> >>> >>>>>> simple intermediary between Kafka and the user's choice of >> >>> >>>>>> downstream >> >>> >>>> store. >> >>> >>>>>> >> >>> >>>>>> Take a look at the links I posted - if there's already been 2 >> >>> >>>> independent >> >>> >>>>>> implementations of the idea, chances are it's something people >> >>> need. >> >>> >>>>>> >> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan < >> >>> >>>>>> hshreedha...@cloudera.com> wrote: >> >>> >>>>>>> >> >>> >>>>>>> Hi Cody, >> >>> >>>>>>> >> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement >> >>> >>>>>>> something pretty simple and lightweight for that one. >> >>> >>>>>>> >> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this >> is >> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a >> >>> good >> >>> >>>> idea to >> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an >> >>> >>>> optimization. For >> >>> >>>>>>> that reason, we must be careful in implementation. There are a >> >>> >>>>>>> couple >> >>> >>>> of >> >>> >>>>>>> issues that we need to ensure works properly - specifically >> >>> >>> ordering. >> >>> >>>> To >> >>> >>>>>>> ensure we pull messages from different topics and partitions >> in >> >>> >>>>>>> the >> >>> >>>> same >> >>> >>>>>>> order after failure, we’d still have to persist the metadata >> to >> >>> >>>>>>> HDFS >> >>> >>>> (or >> >>> >>>>>>> some other system) - this metadata must contain the order of >> >>> >>>>>>> messages consumed, so we know how to re-read the messages. I >> am >> >>> >>>>>>> planning to >> >>> >>>> explore >> >>> >>>>>>> this once I have some time (probably in Jan). In addition, we >> >>> must >> >>> >>>>>>> also ensure bucketing functions work fine as well. I will >> file a >> >>> >>>>>>> placeholder jira for this one. >> >>> >>>>>>> >> >>> >>>>>>> I also wrote an API to write data back to Kafka a while back - >> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that >> >>> this >> >>> >>>>>>> will get pulled in soon, as this is something I know people >> want. >> >>> >>>>>>> I am open >> >>> >>>> to >> >>> >>>>>>> feedback on that - anything that I can do to make it better. >> >>> >>>>>>> >> >>> >>>>>>> Thanks, >> >>> >>>>>>> Hari >> >>> >>>>>>> >> >>> >>>>>>> >> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell >> >>> >>>>>>> <pwend...@gmail.com> >> >>> >>>>>>> wrote: >> >>> >>>>>>> >> >>> >>>>>>>> Hey Cody, >> >>> >>>>>>>> >> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is >> TD - >> >>> >>>>>>>> he is traveling this week though so I can respond a bit. To >> the >> >>> >>>>>>>> high level point of whether Kafka is important - it >> definitely >> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments >> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for >> >>> >>>>>>>> Kafka is something we generally want in Spark and not a >> library. >> >>> >>>>>>>> In some cases IIRC there were user libraries that used >> unstable >> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to >> stabilize >> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be >> able >> >>> >>>>>>>> to use newer Kakfa versions. This is a high level impression >> >>> only >> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's >> worth >> >>> >>> revisiting given the developments in Kafka. >> >>> >>>>>>>> >> >>> >>>>>>>> Please do bring things up like this on the dev list if there >> are >> >>> >>>>>>>> blockers for your usage - thanks for pinging it. >> >>> >>>>>>>> >> >>> >>>>>>>> - Patrick >> >>> >>>>>>>> >> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger >> >>> >>>>>>>> <c...@koeninger.org> >> >>> >>>>>>>> wrote: >> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get >> >>> >>>>>>>>> some long-standing Kafka related issues resolved? >> >>> >>>>>>>>> >> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for >> our >> >>> >>>>>>>> production >> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because >> I've >> >>> >>>>>>>>> seen several different patches and libraries to fix the same >> >>> >>>>>>>>> things we've >> >>> >>>>>>>> been >> >>> >>>>>>>>> running into. >> >>> >>>>>>>>> >> >>> >>>>>>>>> Regarding flexibility >> >>> >>>>>>>>> >> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146 >> >>> >>>>>>>>> >> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of >> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch >> >>> >>>>>>>>> ourselves, then found >> >>> >>>>>>>> that >> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be >> able >> >>> >>>>>>>>> to >> >>> >>>> get >> >>> >>>>>>>> our >> >>> >>>>>>>>> jobs done without it. It also allows users to solve a whole >> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary >> >>> >>>>>>>>> delay of >> >>> >>>>>>>> messages, etc). >> >>> >>>>>>>>> >> >>> >>>>>>>>> Regarding safety, I understand the motivation behind >> >>> >>>>>>>>> WriteAheadLog >> >>> >>>> as >> >>> >>>>>>>> a >> >>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka >> >>> >>>>>>>>> already >> >>> >>>>>>>> is a >> >>> >>>>>>>>> reliable source. I think there's a need for an api that >> treats >> >>> >>>>>>>>> it as such. Even aside from the performance issues of >> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another >> >>> >>>>>>>>> write-ahead log in hdfs, I >> >>> >>>> need >> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had >> >>> >>>>>>>>> failures >> >>> >>>> that >> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for >> >>> instance). >> >>> >>>>>>>>> >> >>> >>>>>>>>> I've got an implementation i've been using >> >>> >>>>>>>>> >> >>> >>>>>>>>> >> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf >> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka >> >>> >>>>>>>>> >> >>> >>>>>>>>> Tresata has something similar at >> >>> >>>>>>>> https://github.com/tresata/spark-kafka, >> >>> >>>>>>>>> and I know there were earlier attempts based on Storm code. >> >>> >>>>>>>>> >> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries >> rather >> >>> >>>>>>>>> than >> >>> >>>>>>>> patches >> >>> >>>>>>>>> to Spark is problematic, because large portions of the >> >>> >>>> implementation >> >>> >>>>>>>> are >> >>> >>>>>>>>> private[spark]. >> >>> >>>>>>>>> >> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to get. >> >>> >>>>>>>> >> >>> >>>>>>>> >> >>> ----------------------------------------------------------------- >> >>> >>>>>>>> ---- To unsubscribe, e-mail: >> dev-unsubscr...@spark.apache.org >> >>> For >> >>> >>>>>>>> additional commands, e-mail: dev-h...@spark.apache.org >> >>> >>>>>>>> >> >>> >>>>>>>> >> >>> >>>>>>> >> >>> >>>>> >> >>> >>>> >> >>> >>> >> >>> >> >> >>> >> >>> >> >> >> > >> >