If the downstream store for the output data is idempotent or transactional,
and that downstream store also is the system of record for kafka offsets,
then you have exactly-once semantics.  Commit offsets with / after the data
is stored.  On any failure, restart from the last committed offsets.

Yes, this approach is biased towards the etl-like use cases rather than
near-realtime-analytics use cases.

On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan <hshreedha...@cloudera.com
> wrote:
>
> I get what you are saying. But getting exactly once right is an extremely
> hard problem - especially in presence of failure. The issue is failures can
> happen in a bunch of places. For example, before the notification of
> downstream store being successful reaches the receiver that updates the
> offsets, the node fails. The store was successful, but duplicates came in
> either way. This is something worth discussing by itself - but without
> uuids etc this might not really be solved even when you think it is.
>
> Anyway, I will look at the links. Even I am interested in all of the
> features you mentioned - no HDFS WAL for Kafka and once-only delivery, but
> I doubt the latter is really possible to guarantee - though I really would
> love to have that!
>
> Thanks,
> Hari
>
>
> On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger <c...@koeninger.org>
> wrote:
>
>> Thanks for the replies.
>>
>> Regarding skipping WAL, it's not just about optimization.  If you
>> actually want exactly-once semantics, you need control of kafka offsets as
>> well, including the ability to not use zookeeper as the system of record
>> for offsets.  Kafka already is a reliable system that has strong ordering
>> guarantees (within a partition) and does not mandate the use of zookeeper
>> to store offsets.  I think there should be a spark api that acts as a very
>> simple intermediary between Kafka and the user's choice of downstream store.
>>
>> Take a look at the links I posted - if there's already been 2 independent
>> implementations of the idea, chances are it's something people need.
>>
>> On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan <
>> hshreedha...@cloudera.com> wrote:
>>>
>>> Hi Cody,
>>>
>>> I am an absolute +1 on SPARK-3146. I think we can implement something
>>> pretty simple and lightweight for that one.
>>>
>>> For the Kafka DStream skipping the WAL implementation - this is
>>> something I discussed with TD a few weeks ago. Though it is a good idea to
>>> implement this to avoid unnecessary HDFS writes, it is an optimization. For
>>> that reason, we must be careful in implementation. There are a couple of
>>> issues that we need to ensure works properly - specifically ordering. To
>>> ensure we pull messages from different topics and partitions in the same
>>> order after failure, we’d still have to persist the metadata to HDFS (or
>>> some other system) - this metadata must contain the order of messages
>>> consumed, so we know how to re-read the messages. I am planning to explore
>>> this once I have some time (probably in Jan). In addition, we must also
>>> ensure bucketing functions work fine as well. I will file a placeholder
>>> jira for this one.
>>>
>>> I also wrote an API to write data back to Kafka a while back -
>>> https://github.com/apache/spark/pull/2994 . I am hoping that this will
>>> get pulled in soon, as this is something I know people want. I am open to
>>> feedback on that - anything that I can do to make it better.
>>>
>>> Thanks,
>>> Hari
>>>
>>>
>>> On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell <pwend...@gmail.com>
>>> wrote:
>>>
>>>>  Hey Cody,
>>>>
>>>> Thanks for reaching out with this. The lead on streaming is TD - he is
>>>> traveling this week though so I can respond a bit. To the high level
>>>> point of whether Kafka is important - it definitely is. Something like
>>>> 80% of Spark Streaming deployments (anecdotally) ingest data from
>>>> Kafka. Also, good support for Kafka is something we generally want in
>>>> Spark and not a library. In some cases IIRC there were user libraries
>>>> that used unstable Kafka API's and we were somewhat waiting on Kafka
>>>> to stabilize them to merge things upstream. Otherwise users wouldn't
>>>> be able to use newer Kakfa versions. This is a high level impression
>>>> only though, I haven't talked to TD about this recently so it's worth
>>>> revisiting given the developments in Kafka.
>>>>
>>>> Please do bring things up like this on the dev list if there are
>>>> blockers for your usage - thanks for pinging it.
>>>>
>>>> - Patrick
>>>>
>>>> On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger <c...@koeninger.org>
>>>> wrote:
>>>> > Now that 1.2 is finalized... who are the go-to people to get some
>>>> > long-standing Kafka related issues resolved?
>>>> >
>>>> > The existing api is not sufficiently safe nor flexible for our
>>>> production
>>>> > use. I don't think we're alone in this viewpoint, because I've seen
>>>> > several different patches and libraries to fix the same things we've
>>>> been
>>>> > running into.
>>>> >
>>>> > Regarding flexibility
>>>> >
>>>> > https://issues.apache.org/jira/browse/SPARK-3146
>>>> >
>>>> > has been outstanding since August, and IMHO an equivalent of this is
>>>> > absolutely necessary. We wrote a similar patch ourselves, then found
>>>> that
>>>> > PR and have been running it in production. We wouldn't be able to get
>>>> our
>>>> > jobs done without it. It also allows users to solve a whole class of
>>>> > problems for themselves (e.g. SPARK-2388, arbitrary delay of
>>>> messages, etc).
>>>> >
>>>> > Regarding safety, I understand the motivation behind WriteAheadLog as
>>>> a
>>>> > general solution for streaming unreliable sources, but Kafka already
>>>> is a
>>>> > reliable source. I think there's a need for an api that treats it as
>>>> > such. Even aside from the performance issues of duplicating the
>>>> > write-ahead log in kafka into another write-ahead log in hdfs, I need
>>>> > exactly-once semantics in the face of failure (I've had failures that
>>>> > prevented reloading a spark streaming checkpoint, for instance).
>>>> >
>>>> > I've got an implementation i've been using
>>>> >
>>>> > https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka
>>>> > /src/main/scala/org/apache/spark/rdd/kafka
>>>> >
>>>> > Tresata has something similar at
>>>> https://github.com/tresata/spark-kafka,
>>>> > and I know there were earlier attempts based on Storm code.
>>>> >
>>>> > Trying to distribute these kinds of fixes as libraries rather than
>>>> patches
>>>> > to Spark is problematic, because large portions of the implementation
>>>> are
>>>> > private[spark].
>>>> >
>>>> > I'd like to help, but i need to know whose attention to get.
>>>>
>>>> ---------------------------------------------------------------------
>>>> To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
>>>> For additional commands, e-mail: dev-h...@spark.apache.org
>>>>
>>>>
>>>
>

Reply via email to