But idempotency is not that easy t achieve sometimes. A strong only once semantic through a proper API would be superuseful; but I'm not implying this is easy to achieve. On 18 Dec 2014 21:52, "Cody Koeninger" <c...@koeninger.org> wrote:
> If the downstream store for the output data is idempotent or transactional, > and that downstream store also is the system of record for kafka offsets, > then you have exactly-once semantics. Commit offsets with / after the data > is stored. On any failure, restart from the last committed offsets. > > Yes, this approach is biased towards the etl-like use cases rather than > near-realtime-analytics use cases. > > On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan < > hshreedha...@cloudera.com > > wrote: > > > > I get what you are saying. But getting exactly once right is an extremely > > hard problem - especially in presence of failure. The issue is failures > can > > happen in a bunch of places. For example, before the notification of > > downstream store being successful reaches the receiver that updates the > > offsets, the node fails. The store was successful, but duplicates came in > > either way. This is something worth discussing by itself - but without > > uuids etc this might not really be solved even when you think it is. > > > > Anyway, I will look at the links. Even I am interested in all of the > > features you mentioned - no HDFS WAL for Kafka and once-only delivery, > but > > I doubt the latter is really possible to guarantee - though I really > would > > love to have that! > > > > Thanks, > > Hari > > > > > > On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger <c...@koeninger.org> > > wrote: > > > >> Thanks for the replies. > >> > >> Regarding skipping WAL, it's not just about optimization. If you > >> actually want exactly-once semantics, you need control of kafka offsets > as > >> well, including the ability to not use zookeeper as the system of record > >> for offsets. Kafka already is a reliable system that has strong > ordering > >> guarantees (within a partition) and does not mandate the use of > zookeeper > >> to store offsets. I think there should be a spark api that acts as a > very > >> simple intermediary between Kafka and the user's choice of downstream > store. > >> > >> Take a look at the links I posted - if there's already been 2 > independent > >> implementations of the idea, chances are it's something people need. > >> > >> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan < > >> hshreedha...@cloudera.com> wrote: > >>> > >>> Hi Cody, > >>> > >>> I am an absolute +1 on SPARK-3146. I think we can implement something > >>> pretty simple and lightweight for that one. > >>> > >>> For the Kafka DStream skipping the WAL implementation - this is > >>> something I discussed with TD a few weeks ago. Though it is a good > idea to > >>> implement this to avoid unnecessary HDFS writes, it is an > optimization. For > >>> that reason, we must be careful in implementation. There are a couple > of > >>> issues that we need to ensure works properly - specifically ordering. > To > >>> ensure we pull messages from different topics and partitions in the > same > >>> order after failure, we’d still have to persist the metadata to HDFS > (or > >>> some other system) - this metadata must contain the order of messages > >>> consumed, so we know how to re-read the messages. I am planning to > explore > >>> this once I have some time (probably in Jan). In addition, we must also > >>> ensure bucketing functions work fine as well. I will file a placeholder > >>> jira for this one. > >>> > >>> I also wrote an API to write data back to Kafka a while back - > >>> https://github.com/apache/spark/pull/2994 . I am hoping that this will > >>> get pulled in soon, as this is something I know people want. I am open > to > >>> feedback on that - anything that I can do to make it better. > >>> > >>> Thanks, > >>> Hari > >>> > >>> > >>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell <pwend...@gmail.com> > >>> wrote: > >>> > >>>> Hey Cody, > >>>> > >>>> Thanks for reaching out with this. The lead on streaming is TD - he is > >>>> traveling this week though so I can respond a bit. To the high level > >>>> point of whether Kafka is important - it definitely is. Something like > >>>> 80% of Spark Streaming deployments (anecdotally) ingest data from > >>>> Kafka. Also, good support for Kafka is something we generally want in > >>>> Spark and not a library. In some cases IIRC there were user libraries > >>>> that used unstable Kafka API's and we were somewhat waiting on Kafka > >>>> to stabilize them to merge things upstream. Otherwise users wouldn't > >>>> be able to use newer Kakfa versions. This is a high level impression > >>>> only though, I haven't talked to TD about this recently so it's worth > >>>> revisiting given the developments in Kafka. > >>>> > >>>> Please do bring things up like this on the dev list if there are > >>>> blockers for your usage - thanks for pinging it. > >>>> > >>>> - Patrick > >>>> > >>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger <c...@koeninger.org> > >>>> wrote: > >>>> > Now that 1.2 is finalized... who are the go-to people to get some > >>>> > long-standing Kafka related issues resolved? > >>>> > > >>>> > The existing api is not sufficiently safe nor flexible for our > >>>> production > >>>> > use. I don't think we're alone in this viewpoint, because I've seen > >>>> > several different patches and libraries to fix the same things we've > >>>> been > >>>> > running into. > >>>> > > >>>> > Regarding flexibility > >>>> > > >>>> > https://issues.apache.org/jira/browse/SPARK-3146 > >>>> > > >>>> > has been outstanding since August, and IMHO an equivalent of this is > >>>> > absolutely necessary. We wrote a similar patch ourselves, then found > >>>> that > >>>> > PR and have been running it in production. We wouldn't be able to > get > >>>> our > >>>> > jobs done without it. It also allows users to solve a whole class of > >>>> > problems for themselves (e.g. SPARK-2388, arbitrary delay of > >>>> messages, etc). > >>>> > > >>>> > Regarding safety, I understand the motivation behind WriteAheadLog > as > >>>> a > >>>> > general solution for streaming unreliable sources, but Kafka already > >>>> is a > >>>> > reliable source. I think there's a need for an api that treats it as > >>>> > such. Even aside from the performance issues of duplicating the > >>>> > write-ahead log in kafka into another write-ahead log in hdfs, I > need > >>>> > exactly-once semantics in the face of failure (I've had failures > that > >>>> > prevented reloading a spark streaming checkpoint, for instance). > >>>> > > >>>> > I've got an implementation i've been using > >>>> > > >>>> > https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka > >>>> > /src/main/scala/org/apache/spark/rdd/kafka > >>>> > > >>>> > Tresata has something similar at > >>>> https://github.com/tresata/spark-kafka, > >>>> > and I know there were earlier attempts based on Storm code. > >>>> > > >>>> > Trying to distribute these kinds of fixes as libraries rather than > >>>> patches > >>>> > to Spark is problematic, because large portions of the > implementation > >>>> are > >>>> > private[spark]. > >>>> > > >>>> > I'd like to help, but i need to know whose attention to get. > >>>> > >>>> --------------------------------------------------------------------- > >>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org > >>>> For additional commands, e-mail: dev-h...@spark.apache.org > >>>> > >>>> > >>> > > >