In general such discussions happen or is posted on the dev lists. Could you please post a summary? Thanks.
Thanks, Hari On Wed, Dec 24, 2014 at 11:46 PM, Cody Koeninger <c...@koeninger.org> wrote: > After a long talk with Patrick and TD (thanks guys), I opened the following > jira > https://issues.apache.org/jira/browse/SPARK-4964 > Sample PR has an impementation for the batch and the dstream case, and a > link to a project with example usage. > On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers <ko...@tresata.com> wrote: >> yup, we at tresata do the idempotent store the same way. very simple >> approach. >> >> On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger <c...@koeninger.org> >> wrote: >>> >>> That KafkaRDD code is dead simple. >>> >>> Given a user specified map >>> >>> (topic1, partition0) -> (startingOffset, endingOffset) >>> (topic1, partition1) -> (startingOffset, endingOffset) >>> ... >>> turn each one of those entries into a partition of an rdd, using the >>> simple >>> consumer. >>> That's it. No recovery logic, no state, nothing - for any failures, bail >>> on the rdd and let it retry. >>> Spark stays out of the business of being a distributed database. >>> >>> The client code does any transformation it wants, then stores the data and >>> offsets. There are two ways of doing this, either based on idempotence or >>> a transactional data store. >>> >>> For idempotent stores: >>> >>> 1.manipulate data >>> 2.save data to store >>> 3.save ending offsets to the same store >>> >>> If you fail between 2 and 3, the offsets haven't been stored, you start >>> again at the same beginning offsets, do the same calculations in the same >>> order, overwrite the same data, all is good. >>> >>> >>> For transactional stores: >>> >>> 1. manipulate data >>> 2. begin transaction >>> 3. save data to the store >>> 4. save offsets >>> 5. commit transaction >>> >>> If you fail before 5, the transaction rolls back. To make this less >>> heavyweight, you can write the data outside the transaction and then >>> update >>> a pointer to the current data inside the transaction. >>> >>> >>> Again, spark has nothing much to do with guaranteeing exactly once. In >>> fact, the current streaming api actively impedes my ability to do the >>> above. I'm just suggesting providing an api that doesn't get in the way >>> of >>> exactly-once. >>> >>> >>> >>> >>> >>> On Fri, Dec 19, 2014 at 3:57 PM, Hari Shreedharan < >>> hshreedha...@cloudera.com >>> > wrote: >>> >>> > Can you explain your basic algorithm for the once-only-delivery? It is >>> > quite a bit of very Kafka-specific code, that would take more time to >>> read >>> > than I can currently afford? If you can explain your algorithm a bit, it >>> > might help. >>> > >>> > Thanks, >>> > Hari >>> > >>> > >>> > On Fri, Dec 19, 2014 at 1:48 PM, Cody Koeninger <c...@koeninger.org> >>> > wrote: >>> > >>> >> >>> >> The problems you guys are discussing come from trying to store state in >>> >> spark, so don't do that. Spark isn't a distributed database. >>> >> >>> >> Just map kafka partitions directly to rdds, llet user code specify the >>> >> range of offsets explicitly, and let them be in charge of committing >>> >> offsets. >>> >> >>> >> Using the simple consumer isn't that bad, I'm already using this in >>> >> production with the code I linked to, and tresata apparently has been >>> as >>> >> well. Again, for everyone saying this is impossible, have you read >>> either >>> >> of those implementations and looked at the approach? >>> >> >>> >> >>> >> >>> >> On Fri, Dec 19, 2014 at 2:27 PM, Sean McNamara < >>> >> sean.mcnam...@webtrends.com> wrote: >>> >> >>> >>> Please feel free to correct me if I’m wrong, but I think the exactly >>> >>> once spark streaming semantics can easily be solved using >>> updateStateByKey. >>> >>> Make the key going into updateStateByKey be a hash of the event, or >>> pluck >>> >>> off some uuid from the message. The updateFunc would only emit the >>> message >>> >>> if the key did not exist, and the user has complete control over the >>> window >>> >>> of time / state lifecycle for detecting duplicates. It also makes it >>> >>> really easy to detect and take action (alert?) when you DO see a >>> duplicate, >>> >>> or make memory tradeoffs within an error bound using a sketch >>> algorithm. >>> >>> The kafka simple consumer is insanely complex, if possible I think it >>> would >>> >>> be better (and vastly more flexible) to get reliability using the >>> >>> primitives that spark so elegantly provides. >>> >>> >>> >>> Cheers, >>> >>> >>> >>> Sean >>> >>> >>> >>> >>> >>> > On Dec 19, 2014, at 12:06 PM, Hari Shreedharan < >>> >>> hshreedha...@cloudera.com> wrote: >>> >>> > >>> >>> > Hi Dibyendu, >>> >>> > >>> >>> > Thanks for the details on the implementation. But I still do not >>> >>> believe >>> >>> > that it is no duplicates - what they achieve is that the same batch >>> is >>> >>> > processed exactly the same way every time (but see it may be >>> processed >>> >>> more >>> >>> > than once) - so it depends on the operation being idempotent. I >>> believe >>> >>> > Trident uses ZK to keep track of the transactions - a batch can be >>> >>> > processed multiple times in failure scenarios (for example, the >>> >>> transaction >>> >>> > is processed but before ZK is updated the machine fails, causing a >>> >>> "new" >>> >>> > node to process it again). >>> >>> > >>> >>> > I don't think it is impossible to do this in Spark Streaming as well >>> >>> and >>> >>> > I'd be really interested in working on it at some point in the near >>> >>> future. >>> >>> > >>> >>> > On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya < >>> >>> > dibyendu.bhattach...@gmail.com> wrote: >>> >>> > >>> >>> >> Hi, >>> >>> >> >>> >>> >> Thanks to Jerry for mentioning the Kafka Spout for Trident. The >>> Storm >>> >>> >> Trident has done the exact-once guarantee by processing the tuple >>> in a >>> >>> >> batch and assigning same transaction-id for a given batch . The >>> >>> replay for >>> >>> >> a given batch with a transaction-id will have exact same set of >>> >>> tuples and >>> >>> >> replay of batches happen in exact same order before the failure. >>> >>> >> >>> >>> >> Having this paradigm, if downstream system process data for a given >>> >>> batch >>> >>> >> for having a given transaction-id , and if during failure if same >>> >>> batch is >>> >>> >> again emitted , you can check if same transaction-id is already >>> >>> processed >>> >>> >> or not and hence can guarantee exact once semantics. >>> >>> >> >>> >>> >> And this can only be achieved in Spark if we use Low Level Kafka >>> >>> consumer >>> >>> >> API to process the offsets. This low level Kafka Consumer ( >>> >>> >> https://github.com/dibbhatt/kafka-spark-consumer) has implemented >>> the >>> >>> >> Spark Kafka consumer which uses Kafka Low Level APIs . All of the >>> >>> Kafka >>> >>> >> related logic has been taken from Storm-Kafka spout and which >>> manages >>> >>> all >>> >>> >> Kafka re-balance and fault tolerant aspects and Kafka metadata >>> >>> managements. >>> >>> >> >>> >>> >> Presently this Consumer maintains that during Receiver failure, it >>> >>> will >>> >>> >> re-emit the exact same Block with same set of messages . Every >>> >>> message have >>> >>> >> the details of its partition, offset and topic related details >>> which >>> >>> can >>> >>> >> tackle the SPARK-3146. >>> >>> >> >>> >>> >> As this Low Level consumer has complete control over the Kafka >>> >>> Offsets , >>> >>> >> we can implement Trident like feature on top of it like having >>> >>> implement a >>> >>> >> transaction-id for a given block , and re-emit the same block with >>> >>> same set >>> >>> >> of message during Driver failure. >>> >>> >> >>> >>> >> Regards, >>> >>> >> Dibyendu >>> >>> >> >>> >>> >> >>> >>> >> On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai < >>> saisai.s...@intel.com> >>> >>> >> wrote: >>> >>> >>> >>> >>> >>> Hi all, >>> >>> >>> >>> >>> >>> I agree with Hari that Strong exact-once semantics is very hard to >>> >>> >>> guarantee, especially in the failure situation. From my >>> >>> understanding even >>> >>> >>> current implementation of ReliableKafkaReceiver cannot fully >>> >>> guarantee the >>> >>> >>> exact once semantics once failed, first is the ordering of data >>> >>> replaying >>> >>> >>> from last checkpoint, this is hard to guarantee when multiple >>> >>> partitions >>> >>> >>> are injected in; second is the design complexity of achieving >>> this, >>> >>> you can >>> >>> >>> refer to the Kafka Spout in Trident, we have to dig into the very >>> >>> details >>> >>> >>> of Kafka metadata management system to achieve this, not to say >>> >>> rebalance >>> >>> >>> and fault-tolerance. >>> >>> >>> >>> >>> >>> Thanks >>> >>> >>> Jerry >>> >>> >>> >>> >>> >>> -----Original Message----- >>> >>> >>> From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] >>> >>> >>> Sent: Friday, December 19, 2014 5:57 AM >>> >>> >>> To: Cody Koeninger >>> >>> >>> Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org >>> >>> >>> Subject: Re: Which committers care about Kafka? >>> >>> >>> >>> >>> >>> But idempotency is not that easy t achieve sometimes. A strong >>> only >>> >>> once >>> >>> >>> semantic through a proper API would be superuseful; but I'm not >>> >>> implying >>> >>> >>> this is easy to achieve. >>> >>> >>> On 18 Dec 2014 21:52, "Cody Koeninger" <c...@koeninger.org> >>> wrote: >>> >>> >>> >>> >>> >>>> If the downstream store for the output data is idempotent or >>> >>> >>>> transactional, and that downstream store also is the system of >>> >>> record >>> >>> >>>> for kafka offsets, then you have exactly-once semantics. Commit >>> >>> >>>> offsets with / after the data is stored. On any failure, restart >>> >>> from >>> >>> >>> the last committed offsets. >>> >>> >>>> >>> >>> >>>> Yes, this approach is biased towards the etl-like use cases >>> rather >>> >>> >>>> than near-realtime-analytics use cases. >>> >>> >>>> >>> >>> >>>> On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan < >>> >>> >>>> hshreedha...@cloudera.com >>> >>> >>>>> wrote: >>> >>> >>>>> >>> >>> >>>>> I get what you are saying. But getting exactly once right is an >>> >>> >>>>> extremely hard problem - especially in presence of failure. The >>> >>> >>>>> issue is failures >>> >>> >>>> can >>> >>> >>>>> happen in a bunch of places. For example, before the >>> notification >>> >>> of >>> >>> >>>>> downstream store being successful reaches the receiver that >>> updates >>> >>> >>>>> the offsets, the node fails. The store was successful, but >>> >>> >>>>> duplicates came in either way. This is something worth >>> discussing >>> >>> by >>> >>> >>>>> itself - but without uuids etc this might not really be solved >>> even >>> >>> >>> when you think it is. >>> >>> >>>>> >>> >>> >>>>> Anyway, I will look at the links. Even I am interested in all of >>> >>> the >>> >>> >>>>> features you mentioned - no HDFS WAL for Kafka and once-only >>> >>> >>>>> delivery, >>> >>> >>>> but >>> >>> >>>>> I doubt the latter is really possible to guarantee - though I >>> >>> really >>> >>> >>>> would >>> >>> >>>>> love to have that! >>> >>> >>>>> >>> >>> >>>>> Thanks, >>> >>> >>>>> Hari >>> >>> >>>>> >>> >>> >>>>> >>> >>> >>>>> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger >>> >>> >>>>> <c...@koeninger.org> >>> >>> >>>>> wrote: >>> >>> >>>>> >>> >>> >>>>>> Thanks for the replies. >>> >>> >>>>>> >>> >>> >>>>>> Regarding skipping WAL, it's not just about optimization. If >>> you >>> >>> >>>>>> actually want exactly-once semantics, you need control of kafka >>> >>> >>>>>> offsets >>> >>> >>>> as >>> >>> >>>>>> well, including the ability to not use zookeeper as the system >>> of >>> >>> >>>>>> record for offsets. Kafka already is a reliable system that >>> has >>> >>> >>>>>> strong >>> >>> >>>> ordering >>> >>> >>>>>> guarantees (within a partition) and does not mandate the use of >>> >>> >>>> zookeeper >>> >>> >>>>>> to store offsets. I think there should be a spark api that >>> acts >>> >>> as >>> >>> >>>>>> a >>> >>> >>>> very >>> >>> >>>>>> simple intermediary between Kafka and the user's choice of >>> >>> >>>>>> downstream >>> >>> >>>> store. >>> >>> >>>>>> >>> >>> >>>>>> Take a look at the links I posted - if there's already been 2 >>> >>> >>>> independent >>> >>> >>>>>> implementations of the idea, chances are it's something people >>> >>> need. >>> >>> >>>>>> >>> >>> >>>>>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan < >>> >>> >>>>>> hshreedha...@cloudera.com> wrote: >>> >>> >>>>>>> >>> >>> >>>>>>> Hi Cody, >>> >>> >>>>>>> >>> >>> >>>>>>> I am an absolute +1 on SPARK-3146. I think we can implement >>> >>> >>>>>>> something pretty simple and lightweight for that one. >>> >>> >>>>>>> >>> >>> >>>>>>> For the Kafka DStream skipping the WAL implementation - this >>> is >>> >>> >>>>>>> something I discussed with TD a few weeks ago. Though it is a >>> >>> good >>> >>> >>>> idea to >>> >>> >>>>>>> implement this to avoid unnecessary HDFS writes, it is an >>> >>> >>>> optimization. For >>> >>> >>>>>>> that reason, we must be careful in implementation. There are a >>> >>> >>>>>>> couple >>> >>> >>>> of >>> >>> >>>>>>> issues that we need to ensure works properly - specifically >>> >>> >>> ordering. >>> >>> >>>> To >>> >>> >>>>>>> ensure we pull messages from different topics and partitions >>> in >>> >>> >>>>>>> the >>> >>> >>>> same >>> >>> >>>>>>> order after failure, we’d still have to persist the metadata >>> to >>> >>> >>>>>>> HDFS >>> >>> >>>> (or >>> >>> >>>>>>> some other system) - this metadata must contain the order of >>> >>> >>>>>>> messages consumed, so we know how to re-read the messages. I >>> am >>> >>> >>>>>>> planning to >>> >>> >>>> explore >>> >>> >>>>>>> this once I have some time (probably in Jan). In addition, we >>> >>> must >>> >>> >>>>>>> also ensure bucketing functions work fine as well. I will >>> file a >>> >>> >>>>>>> placeholder jira for this one. >>> >>> >>>>>>> >>> >>> >>>>>>> I also wrote an API to write data back to Kafka a while back - >>> >>> >>>>>>> https://github.com/apache/spark/pull/2994 . I am hoping that >>> >>> this >>> >>> >>>>>>> will get pulled in soon, as this is something I know people >>> want. >>> >>> >>>>>>> I am open >>> >>> >>>> to >>> >>> >>>>>>> feedback on that - anything that I can do to make it better. >>> >>> >>>>>>> >>> >>> >>>>>>> Thanks, >>> >>> >>>>>>> Hari >>> >>> >>>>>>> >>> >>> >>>>>>> >>> >>> >>>>>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell >>> >>> >>>>>>> <pwend...@gmail.com> >>> >>> >>>>>>> wrote: >>> >>> >>>>>>> >>> >>> >>>>>>>> Hey Cody, >>> >>> >>>>>>>> >>> >>> >>>>>>>> Thanks for reaching out with this. The lead on streaming is >>> TD - >>> >>> >>>>>>>> he is traveling this week though so I can respond a bit. To >>> the >>> >>> >>>>>>>> high level point of whether Kafka is important - it >>> definitely >>> >>> >>>>>>>> is. Something like 80% of Spark Streaming deployments >>> >>> >>>>>>>> (anecdotally) ingest data from Kafka. Also, good support for >>> >>> >>>>>>>> Kafka is something we generally want in Spark and not a >>> library. >>> >>> >>>>>>>> In some cases IIRC there were user libraries that used >>> unstable >>> >>> >>>>>>>> Kafka API's and we were somewhat waiting on Kafka to >>> stabilize >>> >>> >>>>>>>> them to merge things upstream. Otherwise users wouldn't be >>> able >>> >>> >>>>>>>> to use newer Kakfa versions. This is a high level impression >>> >>> only >>> >>> >>>>>>>> though, I haven't talked to TD about this recently so it's >>> worth >>> >>> >>> revisiting given the developments in Kafka. >>> >>> >>>>>>>> >>> >>> >>>>>>>> Please do bring things up like this on the dev list if there >>> are >>> >>> >>>>>>>> blockers for your usage - thanks for pinging it. >>> >>> >>>>>>>> >>> >>> >>>>>>>> - Patrick >>> >>> >>>>>>>> >>> >>> >>>>>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger >>> >>> >>>>>>>> <c...@koeninger.org> >>> >>> >>>>>>>> wrote: >>> >>> >>>>>>>>> Now that 1.2 is finalized... who are the go-to people to get >>> >>> >>>>>>>>> some long-standing Kafka related issues resolved? >>> >>> >>>>>>>>> >>> >>> >>>>>>>>> The existing api is not sufficiently safe nor flexible for >>> our >>> >>> >>>>>>>> production >>> >>> >>>>>>>>> use. I don't think we're alone in this viewpoint, because >>> I've >>> >>> >>>>>>>>> seen several different patches and libraries to fix the same >>> >>> >>>>>>>>> things we've >>> >>> >>>>>>>> been >>> >>> >>>>>>>>> running into. >>> >>> >>>>>>>>> >>> >>> >>>>>>>>> Regarding flexibility >>> >>> >>>>>>>>> >>> >>> >>>>>>>>> https://issues.apache.org/jira/browse/SPARK-3146 >>> >>> >>>>>>>>> >>> >>> >>>>>>>>> has been outstanding since August, and IMHO an equivalent of >>> >>> >>>>>>>>> this is absolutely necessary. We wrote a similar patch >>> >>> >>>>>>>>> ourselves, then found >>> >>> >>>>>>>> that >>> >>> >>>>>>>>> PR and have been running it in production. We wouldn't be >>> able >>> >>> >>>>>>>>> to >>> >>> >>>> get >>> >>> >>>>>>>> our >>> >>> >>>>>>>>> jobs done without it. It also allows users to solve a whole >>> >>> >>>>>>>>> class of problems for themselves (e.g. SPARK-2388, arbitrary >>> >>> >>>>>>>>> delay of >>> >>> >>>>>>>> messages, etc). >>> >>> >>>>>>>>> >>> >>> >>>>>>>>> Regarding safety, I understand the motivation behind >>> >>> >>>>>>>>> WriteAheadLog >>> >>> >>>> as >>> >>> >>>>>>>> a >>> >>> >>>>>>>>> general solution for streaming unreliable sources, but Kafka >>> >>> >>>>>>>>> already >>> >>> >>>>>>>> is a >>> >>> >>>>>>>>> reliable source. I think there's a need for an api that >>> treats >>> >>> >>>>>>>>> it as such. Even aside from the performance issues of >>> >>> >>>>>>>>> duplicating the write-ahead log in kafka into another >>> >>> >>>>>>>>> write-ahead log in hdfs, I >>> >>> >>>> need >>> >>> >>>>>>>>> exactly-once semantics in the face of failure (I've had >>> >>> >>>>>>>>> failures >>> >>> >>>> that >>> >>> >>>>>>>>> prevented reloading a spark streaming checkpoint, for >>> >>> instance). >>> >>> >>>>>>>>> >>> >>> >>>>>>>>> I've got an implementation i've been using >>> >>> >>>>>>>>> >>> >>> >>>>>>>>> >>> >>> https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kaf >>> >>> >>>>>>>>> ka /src/main/scala/org/apache/spark/rdd/kafka >>> >>> >>>>>>>>> >>> >>> >>>>>>>>> Tresata has something similar at >>> >>> >>>>>>>> https://github.com/tresata/spark-kafka, >>> >>> >>>>>>>>> and I know there were earlier attempts based on Storm code. >>> >>> >>>>>>>>> >>> >>> >>>>>>>>> Trying to distribute these kinds of fixes as libraries >>> rather >>> >>> >>>>>>>>> than >>> >>> >>>>>>>> patches >>> >>> >>>>>>>>> to Spark is problematic, because large portions of the >>> >>> >>>> implementation >>> >>> >>>>>>>> are >>> >>> >>>>>>>>> private[spark]. >>> >>> >>>>>>>>> >>> >>> >>>>>>>>> I'd like to help, but i need to know whose attention to get. >>> >>> >>>>>>>> >>> >>> >>>>>>>> >>> >>> ----------------------------------------------------------------- >>> >>> >>>>>>>> ---- To unsubscribe, e-mail: >>> dev-unsubscr...@spark.apache.org >>> >>> For >>> >>> >>>>>>>> additional commands, e-mail: dev-h...@spark.apache.org >>> >>> >>>>>>>> >>> >>> >>>>>>>> >>> >>> >>>>>>> >>> >>> >>>>> >>> >>> >>>> >>> >>> >>> >>> >>> >> >>> >>> >>> >>> >>> >> >>> > >>> >>