If the downstream store for the output data is idempotent or transactional, and that downstream store also is the system of record for kafka offsets, then you have exactly-once semantics. Commit offsets with / after the data is stored. On any failure, restart from the last committed offsets.
Yes, this approach is biased towards the etl-like use cases rather than near-realtime-analytics use cases. On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <hshreedha...@cloudera.com > wrote: > > I get what you are saying. But getting exactly once right is an extremely > hard problem - especially in presence of failure. The issue is failures can > happen in a bunch of places. For example, before the notification of > downstream store being successful reaches the receiver that updates the > offsets, the node fails. The store was successful, but duplicates came in > either way. This is something worth discussing by itself - but without > uuids etc this might not really be solved even when you think it is. > > Anyway, I will look at the links. Even I am interested in all of the > features you mentioned - no HDFS WAL for Kafka and once-only delivery, but > I doubt the latter is really possible to guarantee - though I really would > love to have that! > > Thanks, > Hari > > > On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger <c...@koeninger.org> > wrote: > >> Thanks for the replies. >> >> Regarding skipping WAL, it's not just about optimization. If you >> actually want exactly-once semantics, you need control of kafka offsets as >> well, including the ability to not use zookeeper as the system of record >> for offsets. Kafka already is a reliable system that has strong ordering >> guarantees (within a partition) and does not mandate the use of zookeeper >> to store offsets. I think there should be a spark api that acts as a very >> simple intermediary between Kafka and the user's choice of downstream store. >> >> Take a look at the links I posted - if there's already been 2 independent >> implementations of the idea, chances are it's something people need. >> >> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan < >> hshreedha...@cloudera.com> wrote: >>> >>> Hi Cody, >>> >>> I am an absolute +1 on SPARK-3146. I think we can implement something >>> pretty simple and lightweight for that one. >>> >>> For the Kafka DStream skipping the WAL implementation - this is >>> something I discussed with TD a few weeks ago. Though it is a good idea to >>> implement this to avoid unnecessary HDFS writes, it is an optimization. For >>> that reason, we must be careful in implementation. There are a couple of >>> issues that we need to ensure works properly - specifically ordering. To >>> ensure we pull messages from different topics and partitions in the same >>> order after failure, we’d still have to persist the metadata to HDFS (or >>> some other system) - this metadata must contain the order of messages >>> consumed, so we know how to re-read the messages. I am planning to explore >>> this once I have some time (probably in Jan). In addition, we must also >>> ensure bucketing functions work fine as well. I will file a placeholder >>> jira for this one. >>> >>> I also wrote an API to write data back to Kafka a while back - >>> https://github.com/apache/spark/pull/2994 . I am hoping that this will >>> get pulled in soon, as this is something I know people want. I am open to >>> feedback on that - anything that I can do to make it better. >>> >>> Thanks, >>> Hari >>> >>> >>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell <pwend...@gmail.com> >>> wrote: >>> >>>> Hey Cody, >>>> >>>> Thanks for reaching out with this. The lead on streaming is TD - he is >>>> traveling this week though so I can respond a bit. To the high level >>>> point of whether Kafka is important - it definitely is. Something like >>>> 80% of Spark Streaming deployments (anecdotally) ingest data from >>>> Kafka. Also, good support for Kafka is something we generally want in >>>> Spark and not a library. In some cases IIRC there were user libraries >>>> that used unstable Kafka API's and we were somewhat waiting on Kafka >>>> to stabilize them to merge things upstream. Otherwise users wouldn't >>>> be able to use newer Kakfa versions. This is a high level impression >>>> only though, I haven't talked to TD about this recently so it's worth >>>> revisiting given the developments in Kafka. >>>> >>>> Please do bring things up like this on the dev list if there are >>>> blockers for your usage - thanks for pinging it. >>>> >>>> - Patrick >>>> >>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger <c...@koeninger.org> >>>> wrote: >>>> > Now that 1.2 is finalized... who are the go-to people to get some >>>> > long-standing Kafka related issues resolved? >>>> > >>>> > The existing api is not sufficiently safe nor flexible for our >>>> production >>>> > use. I don't think we're alone in this viewpoint, because I've seen >>>> > several different patches and libraries to fix the same things we've >>>> been >>>> > running into. >>>> > >>>> > Regarding flexibility >>>> > >>>> > https://issues.apache.org/jira/browse/SPARK-3146 >>>> > >>>> > has been outstanding since August, and IMHO an equivalent of this is >>>> > absolutely necessary. We wrote a similar patch ourselves, then found >>>> that >>>> > PR and have been running it in production. We wouldn't be able to get >>>> our >>>> > jobs done without it. It also allows users to solve a whole class of >>>> > problems for themselves (e.g. SPARK-2388, arbitrary delay of >>>> messages, etc). >>>> > >>>> > Regarding safety, I understand the motivation behind WriteAheadLog as >>>> a >>>> > general solution for streaming unreliable sources, but Kafka already >>>> is a >>>> > reliable source. I think there's a need for an api that treats it as >>>> > such. Even aside from the performance issues of duplicating the >>>> > write-ahead log in kafka into another write-ahead log in hdfs, I need >>>> > exactly-once semantics in the face of failure (I've had failures that >>>> > prevented reloading a spark streaming checkpoint, for instance). >>>> > >>>> > I've got an implementation i've been using >>>> > >>>> > https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka >>>> > /src/main/scala/org/apache/spark/rdd/kafka >>>> > >>>> > Tresata has something similar at >>>> https://github.com/tresata/spark-kafka, >>>> > and I know there were earlier attempts based on Storm code. >>>> > >>>> > Trying to distribute these kinds of fixes as libraries rather than >>>> patches >>>> > to Spark is problematic, because large portions of the implementation >>>> are >>>> > private[spark]. >>>> > >>>> > I'd like to help, but i need to know whose attention to get. >>>> >>>> --------------------------------------------------------------------- >>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org >>>> For additional commands, e-mail: dev-h...@spark.apache.org >>>> >>>> >>> >