Can you explain your basic algorithm for the once-only-delivery? It is quite a bit of very Kafka-specific code, that would take more time to read than I can currently afford? If you can explain your algorithm a bit, it might help.
Thanks, Hari On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <c...@koeninger.org> wrote: > The problems you guys are discussing come from trying to store state in > spark, so don't do that. Spark isn't a distributed database. > Just map kafka partitions directly to rdds, llet user code specify the > range of offsets explicitly, and let them be in charge of committing > offsets. > Using the simple consumer isn't that bad, I'm already using this in > production with the code I linked to, and tresata apparently has been as > well. Again, for everyone saying this is impossible, have you read either > of those implementations and looked at the approach? > On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara <sean.mcnam...@webtrends.com> > wrote: >> Please feel free to correct me if I’m wrong, but I think the exactly once >> spark streaming semantics can easily be solved using updateStateByKey. Make >> the key going into updateStateByKey be a hash of the event, or pluck off >> some uuid from the message. The updateFunc would only emit the message if >> the key did not exist, and the user has complete control over the window of >> time / state lifecycle for detecting duplicates. It also makes it really >> easy to detect and take action (alert?) when you DO see a duplicate, or >> make memory tradeoffs within an error bound using a sketch algorithm. The >> kafka simple consumer is insanely complex, if possible I think it would be >> better (and vastly more flexible) to get reliability using the primitives >> that spark so elegantly provides. >> >> Cheers, >> >> Sean >> >> >> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan < >> hshreedha...@cloudera.com> wrote: >> > >> > Hi Dibyendu, >> > >> > Thanks for the details on the implementation. But I still do not believe >> > that it is no duplicates - what they achieve is that the same batch is >> > processed exactly the same way every time (but see it may be processed >> more >> > than once) - so it depends on the operation being idempotent. I believe >> > Trident uses ZK to keep track of the transactions - a batch can be >> > processed multiple times in failure scenarios (for example, the >> transaction >> > is processed but before ZK is updated the machine fails, causing a "new" >> > node to process it again). >> > >> > I don't think it is impossible to do this in Spark Streaming as well and >> > I'd be really interested in working on it at some point in the near >> future. >> > >> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya < >> > dibyendu.bhattach...@gmail.com> wrote: >> > >> >> Hi, >> >> >> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm >> >> Trident has done the exact-once guarantee by processing the tuple in a >> >> batch and assigning same transaction-id for a given batch . The replay >> for >> >> a given batch with a transaction-id will have exact same set of tuples >> and >> >> replay of batches happen in exact same order before the failure. >> >> >> >> Having this paradigm, if downstream system process data for a given >> batch >> >> for having a given transaction-id , and if during failure if same batch >> is >> >> again emitted , you can check if same transaction-id is already >> processed >> >> or not and hence can guarantee exact once semantics. >> >> >> >> And this can only be achieved in Spark if we use Low Level Kafka >> consumer >> >> API to process the offsets. This low level Kafka Consumer ( >> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented the >> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the Kafka >> >> related logic has been taken from Storm-Kafka spout and which manages >> all >> >> Kafka re-balance and fault tolerant aspects and Kafka metadata >> managements. >> >> >> >> Presently this Consumer maintains that during Receiver failure, it will >> >> re-emit the exact same Block with same set of messages . Every message >> have >> >> the details of its partition, offset and topic related details which can >> >> tackle the SPARK-3146. >> >> >> >> As this Low Level consumer has complete control over the Kafka Offsets , >> >> we can implement Trident like feature on top of it like having >> implement a >> >> transaction-id for a given block , and re-emit the same block with same >> set >> >> of message during Driver failure. >> >> >> >> Regards, >> >> Dibyendu >> >> >> >> >> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai <saisai.s...@intel.com> >> >> wrote: >> >>> >> >>> Hi all, >> >>> >> >>> I agree with Hari that Strong exact-once semantics is very hard to >> >>> guarantee, especially in the failure situation. From my understanding >> even >> >>> current implementation of ReliableKafkaReceiver cannot fully guarantee >> the >> >>> exact once semantics once failed, first is the ordering of data >> replaying >> >>> from last checkpoint, this is hard to guarantee when multiple >> partitions >> >>> are injected in; second is the design complexity of achieving this, >> you can >> >>> refer to the Kafka Spout in Trident, we have to dig into the very >> details >> >>> of Kafka metadata management system to achieve this, not to say >> rebalance >> >>> and fault-tolerance. >> >>> >> >>> Thanks >> >>> Jerry >> >>> >> >>> -----Original Message----- >> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] >> >>> Sent: Friday, December 19, 2014 5:57 AM >> >>> To: Cody Koeninger >> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org >> >>> Subject: Re: Which committers care about Kafka? >> >>> >> >>> But idempotency is not that easy t achieve sometimes. A strong only >> once >> >>> semantic through a proper API would be superuseful; but I'm not >> implying >> >>> this is easy to achieve. >> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <c...@koeninger.org> wrote: >> >>> >> >>>> If the downstream store for the output data is idempotent or >> >>>> transactional, and that downstream store also is the system of record >> >>>> for kafka offsets, then you have exactly-once semantics. Commit >> >>>> offsets with / after the data is stored. On any failure, restart from >> >>> the last committed offsets. >> >>>> >> >>>> Yes, this approach is biased towards the etl-like use cases rather >> >>>> than near-realtime-analytics use cases. >> >>>> >> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan < >> >>>> hshreedha...@cloudera.com >> >>>>> wrote: >> >>>>> >> >>>>> I get what you are saying. But getting exactly once right is an >> >>>>> extremely hard problem - especially in presence of failure. The >> >>>>> issue is failures >> >>>> can >> >>>>> happen in a bunch of places. For example, before the notification of >> >>>>> downstream store being successful reaches the receiver that updates >> >>>>> the offsets, the node fails. The store was successful, but >> >>>>> duplicates came in either way. This is something worth discussing by >> >>>>> itself - but without uuids etc this might not really be solved even >> >>> when you think it is. >> >>>>> >> >>>>> Anyway, I will look at the links. Even I am interested in all of the >> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only >> >>>>> delivery, >> >>>> but >> >>>>> I doubt the latter is really possible to guarantee - though I really >> >>>> would >> >>>>> love to have that! >> >>>>> >> >>>>> Thanks, >> >>>>> Hari >> >>>>> >> >>>>> >> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger >> >>>>> <c...@koeninger.org> >> >>>>> wrote: >> >>>>> >> >>>>>> Thanks for the replies. >> >>>>>> >> >>>>>> Regarding skipping WAL, it's not just about optimization. If you >> >>>>>> actually want exactly-once semantics, you need control of kafka >> >>>>>> offsets >> >>>> as >> >>>>>> well, including the ability to not use zookeeper as the system of >> >>>>>> record for offsets. Kafka already is a reliable system that has >> >>>>>> strong >> >>>> ordering >> >>>>>> guarantees (within a partition) and does not mandate the use of >> >>>> zookeeper >> >>>>>> to store offsets. I think there should be a spark api that acts as >> >>>>>> a >> >>>> very >> >>>>>> simple intermediary between Kafka and the user's choice of >> >>>>>> downstream >> >>>> store. >> >>>>>> >> >>>>>> Take a look at the links I posted - if there's already been 2 >> >>>> independent >> >>>>>> implementations of the idea, chances are it's something people need. >> >>>>>> >> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan < >> >>>>>> hshreedha...@cloudera.com> wrote: >> >>>>>>> >> >>>>>>> Hi Cody, >> >>>>>>> >> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement >> >>>>>>> something pretty simple and lightweight for that one. >> >>>>>>> >> >>>>>>> For the Kafka DStream skipping the WAL implementation - this is >> >>>>>>> something I discussed with TD a few weeks ago. Though it is a good >> >>>> idea to >> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an >> >>>> optimization. For >> >>>>>>> that reason, we must be careful in implementation. There are a >> >>>>>>> couple >> >>>> of >> >>>>>>> issues that we need to ensure works properly - specifically >> >>> ordering. >> >>>> To >> >>>>>>> ensure we pull messages from different topics and partitions in >> >>>>>>> the >> >>>> same >> >>>>>>> order after failure, we’d still have to persist the metadata to >> >>>>>>> HDFS >> >>>> (or >> >>>>>>> some other system) - this metadata must contain the order of >> >>>>>>> messages consumed, so we know how to re-read the messages. I am >> >>>>>>> planning to >> >>>> explore >> >>>>>>> this once I have some time (probably in Jan). In addition, we must >> >>>>>>> also ensure bucketing functions work fine as well. I will file a >> >>>>>>> placeholder jira for this one. >> >>>>>>> >> >>>>>>> I also wrote an API to write data back to Kafka a while back - >> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that this >> >>>>>>> will get pulled in soon, as this is something I know people want. >> >>>>>>> I am open >> >>>> to >> >>>>>>> feedback on that - anything that I can do to make it better. >> >>>>>>> >> >>>>>>> Thanks, >> >>>>>>> Hari >> >>>>>>> >> >>>>>>> >> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell >> >>>>>>> <pwend...@gmail.com> >> >>>>>>> wrote: >> >>>>>>> >> >>>>>>>> Hey Cody, >> >>>>>>>> >> >>>>>>>> Thanks for reaching out with this. The lead on streaming is TD - >> >>>>>>>> he is traveling this week though so I can respond a bit. To the >> >>>>>>>> high level point of whether Kafka is important - it definitely >> >>>>>>>> is. Something like 80% of Spark Streaming deployments >> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for >> >>>>>>>> Kafka is something we generally want in Spark and not a library. >> >>>>>>>> In some cases IIRC there were user libraries that used unstable >> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to stabilize >> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be able >> >>>>>>>> to use newer Kakfa versions. This is a high level impression only >> >>>>>>>> though, I haven't talked to TD about this recently so it's worth >> >>> revisiting given the developments in Kafka. >> >>>>>>>> >> >>>>>>>> Please do bring things up like this on the dev list if there are >> >>>>>>>> blockers for your usage - thanks for pinging it. >> >>>>>>>> >> >>>>>>>> - Patrick >> >>>>>>>> >> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger >> >>>>>>>> <c...@koeninger.org> >> >>>>>>>> wrote: >> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get >> >>>>>>>>> some long-standing Kafka related issues resolved? >> >>>>>>>>> >> >>>>>>>>> The existing api is not sufficiently safe nor flexible for our >> >>>>>>>> production >> >>>>>>>>> use. I don't think we're alone in this viewpoint, because I've >> >>>>>>>>> seen several different patches and libraries to fix the same >> >>>>>>>>> things we've >> >>>>>>>> been >> >>>>>>>>> running into. >> >>>>>>>>> >> >>>>>>>>> Regarding flexibility >> >>>>>>>>> >> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146 >> >>>>>>>>> >> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of >> >>>>>>>>> this is absolutely necessary. We wrote a similar patch >> >>>>>>>>> ourselves, then found >> >>>>>>>> that >> >>>>>>>>> PR and have been running it in production. We wouldn't be able >> >>>>>>>>> to >> >>>> get >> >>>>>>>> our >> >>>>>>>>> jobs done without it. It also allows users to solve a whole >> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary >> >>>>>>>>> delay of >> >>>>>>>> messages, etc). >> >>>>>>>>> >> >>>>>>>>> Regarding safety, I understand the motivation behind >> >>>>>>>>> WriteAheadLog >> >>>> as >> >>>>>>>> a >> >>>>>>>>> general solution for streaming unreliable sources, but Kafka >> >>>>>>>>> already >> >>>>>>>> is a >> >>>>>>>>> reliable source. I think there's a need for an api that treats >> >>>>>>>>> it as such. Even aside from the performance issues of >> >>>>>>>>> duplicating the write-ahead log in kafka into another >> >>>>>>>>> write-ahead log in hdfs, I >> >>>> need >> >>>>>>>>> exactly-once semantics in the face of failure (I've had >> >>>>>>>>> failures >> >>>> that >> >>>>>>>>> prevented reloading a spark streaming checkpoint, for instance). >> >>>>>>>>> >> >>>>>>>>> I've got an implementation i've been using >> >>>>>>>>> >> >>>>>>>>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf >> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka >> >>>>>>>>> >> >>>>>>>>> Tresata has something similar at >> >>>>>>>> https://github.com/tresata/spark-kafka, >> >>>>>>>>> and I know there were earlier attempts based on Storm code. >> >>>>>>>>> >> >>>>>>>>> Trying to distribute these kinds of fixes as libraries rather >> >>>>>>>>> than >> >>>>>>>> patches >> >>>>>>>>> to Spark is problematic, because large portions of the >> >>>> implementation >> >>>>>>>> are >> >>>>>>>>> private[spark]. >> >>>>>>>>> >> >>>>>>>>> I'd like to help, but i need to know whose attention to get. >> >>>>>>>> >> >>>>>>>> ----------------------------------------------------------------- >> >>>>>>>> ---- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For >> >>>>>>>> additional commands, e-mail: dev-h...@spark.apache.org >> >>>>>>>> >> >>>>>>>> >> >>>>>>> >> >>>>> >> >>>> >> >>> >> >> >> >>