Re: Which committers care about Kafka?

2014-12-29 Thread Tathagata Das
 to achieve this, not to say
  rebalance
   and fault-tolerance.
  
   Thanks
   Jerry
  
   -Original Message-
   From: Luis Ángel Vicente Sánchez [mailto:
 langel.gro...@gmail.com]
   Sent: Friday, December 19, 2014 5:57 AM
   To: Cody Koeninger
   Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
   Subject: Re: Which committers care about Kafka?
  
   But idempotency is not that easy t achieve sometimes. A strong
 only
  once
   semantic through a proper API would  be superuseful; but I'm not
  implying
   this is easy to achieve.
   On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org
 wrote:
  
   If the downstream store for the output data is idempotent or
   transactional, and that downstream store also is the system of
  record
   for kafka offsets, then you have exactly-once semantics.
 Commit
   offsets with / after the data is stored.  On any failure,
 restart
  from
   the last committed offsets.
  
   Yes, this approach is biased towards the etl-like use cases
 rather
   than near-realtime-analytics use cases.
  
   On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
   hshreedha...@cloudera.com
   wrote:
  
   I get what you are saying. But getting exactly once right is
 an
   extremely hard problem - especially in presence of failure.
 The
   issue is failures
   can
   happen in a bunch of places. For example, before the
 notification
  of
   downstream store being successful reaches the receiver that
 updates
   the offsets, the node fails. The store was successful, but
   duplicates came in either way. This is something worth
 discussing
  by
   itself - but without uuids etc this might not really be
 solved even
   when you think it is.
  
   Anyway, I will look at the links. Even I am interested in all
 of
  the
   features you mentioned - no HDFS WAL for Kafka and once-only
   delivery,
   but
   I doubt the latter is really possible to guarantee - though I
  really
   would
   love to have that!
  
   Thanks,
   Hari
  
  
   On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
   c...@koeninger.org
   wrote:
  
   Thanks for the replies.
  
   Regarding skipping WAL, it's not just about optimization.
 If you
   actually want exactly-once semantics, you need control of
 kafka
   offsets
   as
   well, including the ability to not use zookeeper as the
 system of
   record for offsets.  Kafka already is a reliable system that
 has
   strong
   ordering
   guarantees (within a partition) and does not mandate the use
 of
   zookeeper
   to store offsets.  I think there should be a spark api that
 acts
  as
   a
   very
   simple intermediary between Kafka and the user's choice of
   downstream
   store.
  
   Take a look at the links I posted - if there's already been 2
   independent
   implementations of the idea, chances are it's something
 people
  need.
  
   On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan 
   hshreedha...@cloudera.com wrote:
  
   Hi Cody,
  
   I am an absolute +1 on SPARK-3146. I think we can implement
   something pretty simple and lightweight for that one.
  
   For the Kafka DStream skipping the WAL implementation -
 this is
   something I discussed with TD a few weeks ago. Though it is
 a
  good
   idea to
   implement this to avoid unnecessary HDFS writes, it is an
   optimization. For
   that reason, we must be careful in implementation. There
 are a
   couple
   of
   issues that we need to ensure works properly - specifically
   ordering.
   To
   ensure we pull messages from different topics and
 partitions in
   the
   same
   order after failure, we’d still have to persist the
 metadata to
   HDFS
   (or
   some other system) - this metadata must contain the order of
   messages consumed, so we know how to re-read the messages.
 I am
   planning to
   explore
   this once I have some time (probably in Jan). In addition,
 we
  must
   also ensure bucketing functions work fine as well. I will
 file a
   placeholder jira for this one.
  
   I also wrote an API to write data back to Kafka a while
 back -
   https://github.com/apache/spark/pull/2994 . I am hoping
 that
  this
   will get pulled in soon, as this is something I know people
 want.
   I am open
   to
   feedback on that - anything that I can do to make it better.
  
   Thanks,
   Hari
  
  
   On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
   pwend...@gmail.com
   wrote:
  
   Hey Cody,
  
   Thanks for reaching out with this. The lead on streaming
 is TD -
   he is traveling this week though so I can respond a bit.
 To the
   high level point of whether Kafka is important - it
 definitely
   is. Something like 80% of Spark Streaming deployments
   (anecdotally) ingest data from Kafka. Also, good support
 for
   Kafka is something we generally want in Spark and not a
 library.
   In some cases IIRC there were user libraries that used
 unstable
   Kafka API's and we were somewhat waiting on Kafka to
 stabilize
   them to merge things upstream. Otherwise users wouldn't be
 able

Re: Which committers care about Kafka?

2014-12-29 Thread Cody Koeninger
.
   
As this Low Level consumer has complete control over the Kafka
   Offsets ,
we can implement Trident like feature on top of it like having
   implement a
transaction-id for a given block , and re-emit the same block
  with
   same set
of message during Driver failure.
   
Regards,
Dibyendu
   
   
On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai 
  saisai.s...@intel.com
wrote:
   
Hi all,
   
I agree with Hari that Strong exact-once semantics is very
 hard
  to
guarantee, especially in the failure situation. From my
   understanding even
current implementation of ReliableKafkaReceiver cannot fully
   guarantee the
exact once semantics once failed, first is the ordering of
 data
   replaying
from last checkpoint, this is hard to guarantee when multiple
   partitions
are injected in; second is the design complexity of achieving
  this,
   you can
refer to the Kafka Spout in Trident, we have to dig into the
  very
   details
of Kafka metadata management system to achieve this, not to
 say
   rebalance
and fault-tolerance.
   
Thanks
Jerry
   
-Original Message-
From: Luis Ángel Vicente Sánchez [mailto:
  langel.gro...@gmail.com]
Sent: Friday, December 19, 2014 5:57 AM
To: Cody Koeninger
Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
Subject: Re: Which committers care about Kafka?
   
But idempotency is not that easy t achieve sometimes. A
 strong
  only
   once
semantic through a proper API would  be superuseful; but I'm
 not
   implying
this is easy to achieve.
On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org
  wrote:
   
If the downstream store for the output data is idempotent or
transactional, and that downstream store also is the system
 of
   record
for kafka offsets, then you have exactly-once semantics.
  Commit
offsets with / after the data is stored.  On any failure,
  restart
   from
the last committed offsets.
   
Yes, this approach is biased towards the etl-like use cases
  rather
than near-realtime-analytics use cases.
   
On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
hshreedha...@cloudera.com
wrote:
   
I get what you are saying. But getting exactly once right
 is
  an
extremely hard problem - especially in presence of failure.
  The
issue is failures
can
happen in a bunch of places. For example, before the
  notification
   of
downstream store being successful reaches the receiver that
  updates
the offsets, the node fails. The store was successful, but
duplicates came in either way. This is something worth
  discussing
   by
itself - but without uuids etc this might not really be
  solved even
when you think it is.
   
Anyway, I will look at the links. Even I am interested in
 all
  of
   the
features you mentioned - no HDFS WAL for Kafka and
 once-only
delivery,
but
I doubt the latter is really possible to guarantee -
 though I
   really
would
love to have that!
   
Thanks,
Hari
   
   
On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
c...@koeninger.org
wrote:
   
Thanks for the replies.
   
Regarding skipping WAL, it's not just about optimization.
  If you
actually want exactly-once semantics, you need control of
  kafka
offsets
as
well, including the ability to not use zookeeper as the
  system of
record for offsets.  Kafka already is a reliable system
 that
  has
strong
ordering
guarantees (within a partition) and does not mandate the
 use
  of
zookeeper
to store offsets.  I think there should be a spark api
 that
  acts
   as
a
very
simple intermediary between Kafka and the user's choice of
downstream
store.
   
Take a look at the links I posted - if there's already
 been 2
independent
implementations of the idea, chances are it's something
  people
   need.
   
On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan 
hshreedha...@cloudera.com wrote:
   
Hi Cody,
   
I am an absolute +1 on SPARK-3146. I think we can
 implement
something pretty simple and lightweight for that one.
   
For the Kafka DStream skipping the WAL implementation -
  this is
something I discussed with TD a few weeks ago. Though it
 is
  a
   good
idea to
implement this to avoid unnecessary HDFS writes, it is an
optimization. For
that reason, we must be careful in implementation. There
  are a
couple
of
issues that we need to ensure works properly -
 specifically
ordering.
To
ensure we pull messages from different topics and
  partitions in
the
same
order after failure, we’d still have to persist the
  metadata to
HDFS
(or
some other system) - this metadata must contain the
 order of
messages consumed, so we know how to re-read the
 messages.
  I am
planning to
explore

RE: Which committers care about Kafka?

2014-12-29 Thread Shao, Saisai
Hi Cody,

From my understanding rate control is an optional configuration in Spark 
Streaming and is disabled by default, so user can reach maximum throughput 
without any configuration.

The reason why rate control is so important in streaming processing is that 
Spark Streaming and other streaming frameworks are easily prone to unexpected 
behavior and failure situation due to network boost and other un-controllable 
inject rate.

Especially for Spark Streaming,  the large amount of processed data will delay 
the processing time, which will further delay the ongoing job, and finally lead 
to failure.

Thanks
Jerry

From: Cody Koeninger [mailto:c...@koeninger.org]
Sent: Tuesday, December 30, 2014 6:50 AM
To: Tathagata Das
Cc: Hari Shreedharan; Shao, Saisai; Sean McNamara; Patrick Wendell; Luis Ángel 
Vicente Sánchez; Dibyendu Bhattacharya; dev@spark.apache.org; Koert Kuipers
Subject: Re: Which committers care about Kafka?

Can you give a little more clarification on exactly what is meant by

1. Data rate control

If someone wants to clamp the maximum number of messages per RDD partition in 
my solution, it would be very straightforward to do so.

Regarding the holy grail, I'm pretty certain you can't have end-to-end 
transactional semantics without the client code being in charge of offset 
state.  That means the client code is going to also need to be in charge of 
setting up an initial state for updateStateByKey that makes sense; as long as 
they can do that, the job should be safe to restart from arbitrary failures.

On Mon, Dec 29, 2014 at 4:33 PM, Tathagata Das 
tathagata.das1...@gmail.commailto:tathagata.das1...@gmail.com wrote:
Hey all,

Some wrap up thoughts on this thread.

Let me first reiterate what Patrick said, that Kafka is super super
important as it forms the largest fraction of Spark Streaming user
base. So we really want to improve the Kafka + Spark Streaming
integration. To this end, some of the things that needs to be
considered can be broadly classified into the following to sort
facilitate the discussion.

1. Data rate control
2. Receiver failure semantics - partially achieving this gives
at-least once, completely achieving this gives exactly-once
3. Driver failure semantics - partially achieving this gives at-least
once, completely achieving this gives exactly-once

Here is a run down of what is achieved by different implementations
(based on what I think).

1. Prior to WAL in Spark 1.2, the KafkaReceiver could handle 3, could
handle 1 partially (some duplicate data), and could NOT handle 2 (all
previously received data lost).

2. In Spark 1.2 with WAL enabled, the Saisai's ReliableKafkaReceiver
can handle 3, can almost completely handle 1 and 2 (except few corner
cases which prevents it from completely guaranteeing exactly-once).

3. I believe Dibyendu's solution (correct me if i am wrong) can handle
1 and 2 perfectly. And 3 can be partially solved with WAL, or possibly
completely solved by extending the solution further.

4. Cody's solution (again, correct me if I am wrong) does not use
receivers at all (so eliminates 2). It can handle 3 completely for
simple operations like map and filter, but not sure if it works
completely for stateful ops like windows and updateStateByKey. Also it
does not handle 1.

The real challenge for Kafka is in achieving 3 completely for stateful
operations while also handling 1.  (i.e., use receivers, but still get
driver failure guarantees). Solving this will give us our holy grail
solution, and this is what I want to achieve.

On that note, Cody submitted a PR on his style of achieving
exactly-once semantics - https://github.com/apache/spark/pull/3798 . I
am reviewing it. Please follow the PR if you are interested.

TD

On Wed, Dec 24, 2014 at 11:59 PM, Cody Koeninger 
c...@koeninger.orgmailto:c...@koeninger.org wrote:
 The conversation was mostly getting TD up to speed on this thread since he
 had just gotten back from his trip and hadn't seen it.

 The jira has a summary of the requirements we discussed, I'm sure TD or
 Patrick can add to the ticket if I missed something.
 On Dec 25, 2014 1:54 AM, Hari Shreedharan 
 hshreedha...@cloudera.commailto:hshreedha...@cloudera.com
 wrote:

 In general such discussions happen or is posted on the dev lists. Could
 you please post a summary? Thanks.

 Thanks,
 Hari


 On Wed, Dec 24, 2014 at 11:46 PM, Cody Koeninger 
 c...@koeninger.orgmailto:c...@koeninger.org
 wrote:

  After a long talk with Patrick and TD (thanks guys), I opened the
 following jira

 https://issues.apache.org/jira/browse/SPARK-4964

 Sample PR has an impementation for the batch and the dstream case, and a
 link to a project with example usage.

 On Fri, Dec 19, 2014 at 4:36 PM, Koert Kuipers 
 ko...@tresata.commailto:ko...@tresata.com wrote:

 yup, we at tresata do the idempotent store the same way. very simple
 approach.

 On Fri, Dec 19, 2014 at 5:32 PM, Cody Koeninger 
 c...@koeninger.orgmailto:c...@koeninger.org
 wrote

Re: Which committers care about Kafka?

2014-12-29 Thread Cody Koeninger
Assuming you're talking about spark.streaming.receiver.maxRate, I just
updated my PR to configure rate limiting based on that setting.  So
hopefully that's issue 1 sorted.

Regarding issue 3, as far as I can tell regarding the odd semantics of
stateful or windowed operations in the face of failure, my solution is no
worse than existing classes such as FileStream that use inputdstream
directly rather than a receiver.  Can we get some specific cases that are a
concern?

Regarding the WAL solutions TD mentioned, one of the disadvantages of them
is that they rely on checkpointing, unlike my approach.  As I noted in this
thread and in the jira ticket, I need something that can recover even when
a checkpoint is lost, and I've already seen multiple situations in
production where a checkpoint cannot be recovered (e.g. because code needs
to be updated).

On Mon, Dec 29, 2014 at 7:50 PM, Shao, Saisai saisai.s...@intel.com wrote:

  Hi Cody,



 From my understanding rate control is an optional configuration in Spark
 Streaming and is disabled by default, so user can reach maximum throughput
 without any configuration.



 The reason why rate control is so important in streaming processing is
 that Spark Streaming and other streaming frameworks are easily prone to
 unexpected behavior and failure situation due to network boost and other
 un-controllable inject rate.



 Especially for Spark Streaming,  the large amount of processed data will
 delay the processing time, which will further delay the ongoing job, and
 finally lead to failure.



 Thanks

 Jerry



 *From:* Cody Koeninger [mailto:c...@koeninger.org]
 *Sent:* Tuesday, December 30, 2014 6:50 AM
 *To:* Tathagata Das
 *Cc:* Hari Shreedharan; Shao, Saisai; Sean McNamara; Patrick Wendell;
 Luis Ángel Vicente Sánchez; Dibyendu Bhattacharya; dev@spark.apache.org;
 Koert Kuipers

 *Subject:* Re: Which committers care about Kafka?



 Can you give a little more clarification on exactly what is meant by



 1. Data rate control



 If someone wants to clamp the maximum number of messages per RDD partition
 in my solution, it would be very straightforward to do so.



 Regarding the holy grail, I'm pretty certain you can't have end-to-end
 transactional semantics without the client code being in charge of offset
 state.  That means the client code is going to also need to be in charge of
 setting up an initial state for updateStateByKey that makes sense; as long
 as they can do that, the job should be safe to restart from arbitrary
 failures.



 On Mon, Dec 29, 2014 at 4:33 PM, Tathagata Das 
 tathagata.das1...@gmail.com wrote:

 Hey all,

 Some wrap up thoughts on this thread.

 Let me first reiterate what Patrick said, that Kafka is super super
 important as it forms the largest fraction of Spark Streaming user
 base. So we really want to improve the Kafka + Spark Streaming
 integration. To this end, some of the things that needs to be
 considered can be broadly classified into the following to sort
 facilitate the discussion.

 1. Data rate control
 2. Receiver failure semantics - partially achieving this gives
 at-least once, completely achieving this gives exactly-once
 3. Driver failure semantics - partially achieving this gives at-least
 once, completely achieving this gives exactly-once

 Here is a run down of what is achieved by different implementations
 (based on what I think).

 1. Prior to WAL in Spark 1.2, the KafkaReceiver could handle 3, could
 handle 1 partially (some duplicate data), and could NOT handle 2 (all
 previously received data lost).

 2. In Spark 1.2 with WAL enabled, the Saisai's ReliableKafkaReceiver
 can handle 3, can almost completely handle 1 and 2 (except few corner
 cases which prevents it from completely guaranteeing exactly-once).

 3. I believe Dibyendu's solution (correct me if i am wrong) can handle
 1 and 2 perfectly. And 3 can be partially solved with WAL, or possibly
 completely solved by extending the solution further.

 4. Cody's solution (again, correct me if I am wrong) does not use
 receivers at all (so eliminates 2). It can handle 3 completely for
 simple operations like map and filter, but not sure if it works
 completely for stateful ops like windows and updateStateByKey. Also it
 does not handle 1.

 The real challenge for Kafka is in achieving 3 completely for stateful
 operations while also handling 1.  (i.e., use receivers, but still get
 driver failure guarantees). Solving this will give us our holy grail
 solution, and this is what I want to achieve.

 On that note, Cody submitted a PR on his style of achieving
 exactly-once semantics - https://github.com/apache/spark/pull/3798 . I
 am reviewing it. Please follow the PR if you are interested.

 TD


 On Wed, Dec 24, 2014 at 11:59 PM, Cody Koeninger c...@koeninger.org
 wrote:
  The conversation was mostly getting TD up to speed on this thread since
 he
  had just gotten back from his trip and hadn't seen it.
 
  The jira has a summary of the requirements we

Re: Which committers care about Kafka?

2014-12-25 Thread Cody Koeninger
 in failure scenarios (for example, the
  transaction
   is processed but before ZK is updated the machine fails, causing a
  new
   node to process it again).
  
   I don't think it is impossible to do this in Spark Streaming as
 well
  and
   I'd be really interested in working on it at some point in the
 near
  future.
  
   On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya 
   dibyendu.bhattach...@gmail.com wrote:
  
   Hi,
  
   Thanks to Jerry for mentioning the Kafka Spout for Trident. The
 Storm
   Trident has done the exact-once guarantee by processing the
 tuple in a
   batch  and assigning same transaction-id for a given batch . The
  replay for
   a given batch with a transaction-id will have exact same set of
  tuples and
   replay of batches happen in exact same order before the failure.
  
   Having this paradigm, if downstream system process data for a
 given
  batch
   for having a given transaction-id , and if during failure if same
  batch is
   again emitted , you can check if same transaction-id is already
  processed
   or not and hence can guarantee exact once semantics.
  
   And this can only be achieved in Spark if we use Low Level Kafka
  consumer
   API to process the offsets. This low level Kafka Consumer (
   https://github.com/dibbhatt/kafka-spark-consumer) has
 implemented the
   Spark Kafka consumer which uses Kafka Low Level APIs . All of the
  Kafka
   related logic has been taken from Storm-Kafka spout and which
 manages
  all
   Kafka re-balance and fault tolerant aspects and Kafka metadata
  managements.
  
   Presently this Consumer maintains that during Receiver failure,
 it
  will
   re-emit the exact same Block with same set of messages . Every
  message have
   the details of its partition, offset and topic related details
 which
  can
   tackle the SPARK-3146.
  
   As this Low Level consumer has complete control over the Kafka
  Offsets ,
   we can implement Trident like feature on top of it like having
  implement a
   transaction-id for a given block , and re-emit the same block
 with
  same set
   of message during Driver failure.
  
   Regards,
   Dibyendu
  
  
   On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai 
 saisai.s...@intel.com
   wrote:
  
   Hi all,
  
   I agree with Hari that Strong exact-once semantics is very hard
 to
   guarantee, especially in the failure situation. From my
  understanding even
   current implementation of ReliableKafkaReceiver cannot fully
  guarantee the
   exact once semantics once failed, first is the ordering of data
  replaying
   from last checkpoint, this is hard to guarantee when multiple
  partitions
   are injected in; second is the design complexity of achieving
 this,
  you can
   refer to the Kafka Spout in Trident, we have to dig into the
 very
  details
   of Kafka metadata management system to achieve this, not to say
  rebalance
   and fault-tolerance.
  
   Thanks
   Jerry
  
   -Original Message-
   From: Luis Ángel Vicente Sánchez [mailto:
 langel.gro...@gmail.com]
   Sent: Friday, December 19, 2014 5:57 AM
   To: Cody Koeninger
   Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
   Subject: Re: Which committers care about Kafka?
  
   But idempotency is not that easy t achieve sometimes. A strong
 only
  once
   semantic through a proper API would  be superuseful; but I'm not
  implying
   this is easy to achieve.
   On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org
 wrote:
  
   If the downstream store for the output data is idempotent or
   transactional, and that downstream store also is the system of
  record
   for kafka offsets, then you have exactly-once semantics.
 Commit
   offsets with / after the data is stored.  On any failure,
 restart
  from
   the last committed offsets.
  
   Yes, this approach is biased towards the etl-like use cases
 rather
   than near-realtime-analytics use cases.
  
   On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
   hshreedha...@cloudera.com
   wrote:
  
   I get what you are saying. But getting exactly once right is
 an
   extremely hard problem - especially in presence of failure.
 The
   issue is failures
   can
   happen in a bunch of places. For example, before the
 notification
  of
   downstream store being successful reaches the receiver that
 updates
   the offsets, the node fails. The store was successful, but
   duplicates came in either way. This is something worth
 discussing
  by
   itself - but without uuids etc this might not really be
 solved even
   when you think it is.
  
   Anyway, I will look at the links. Even I am interested in all
 of
  the
   features you mentioned - no HDFS WAL for Kafka and once-only
   delivery,
   but
   I doubt the latter is really possible to guarantee - though I
  really
   would
   love to have that!
  
   Thanks,
   Hari
  
  
   On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
   c...@koeninger.org
   wrote:
  
   Thanks for the replies.
  
   Regarding skipping WAL, it's not just about

Re: Which committers care about Kafka?

2014-12-24 Thread Cody Koeninger
 by processing the tuple
 in a
   batch  and assigning same transaction-id for a given batch . The
  replay for
   a given batch with a transaction-id will have exact same set of
  tuples and
   replay of batches happen in exact same order before the failure.
  
   Having this paradigm, if downstream system process data for a given
  batch
   for having a given transaction-id , and if during failure if same
  batch is
   again emitted , you can check if same transaction-id is already
  processed
   or not and hence can guarantee exact once semantics.
  
   And this can only be achieved in Spark if we use Low Level Kafka
  consumer
   API to process the offsets. This low level Kafka Consumer (
   https://github.com/dibbhatt/kafka-spark-consumer) has implemented
 the
   Spark Kafka consumer which uses Kafka Low Level APIs . All of the
  Kafka
   related logic has been taken from Storm-Kafka spout and which
 manages
  all
   Kafka re-balance and fault tolerant aspects and Kafka metadata
  managements.
  
   Presently this Consumer maintains that during Receiver failure, it
  will
   re-emit the exact same Block with same set of messages . Every
  message have
   the details of its partition, offset and topic related details
 which
  can
   tackle the SPARK-3146.
  
   As this Low Level consumer has complete control over the Kafka
  Offsets ,
   we can implement Trident like feature on top of it like having
  implement a
   transaction-id for a given block , and re-emit the same block with
  same set
   of message during Driver failure.
  
   Regards,
   Dibyendu
  
  
   On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai 
 saisai.s...@intel.com
   wrote:
  
   Hi all,
  
   I agree with Hari that Strong exact-once semantics is very hard to
   guarantee, especially in the failure situation. From my
  understanding even
   current implementation of ReliableKafkaReceiver cannot fully
  guarantee the
   exact once semantics once failed, first is the ordering of data
  replaying
   from last checkpoint, this is hard to guarantee when multiple
  partitions
   are injected in; second is the design complexity of achieving
 this,
  you can
   refer to the Kafka Spout in Trident, we have to dig into the very
  details
   of Kafka metadata management system to achieve this, not to say
  rebalance
   and fault-tolerance.
  
   Thanks
   Jerry
  
   -Original Message-
   From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
   Sent: Friday, December 19, 2014 5:57 AM
   To: Cody Koeninger
   Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
   Subject: Re: Which committers care about Kafka?
  
   But idempotency is not that easy t achieve sometimes. A strong
 only
  once
   semantic through a proper API would  be superuseful; but I'm not
  implying
   this is easy to achieve.
   On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org
 wrote:
  
   If the downstream store for the output data is idempotent or
   transactional, and that downstream store also is the system of
  record
   for kafka offsets, then you have exactly-once semantics.  Commit
   offsets with / after the data is stored.  On any failure, restart
  from
   the last committed offsets.
  
   Yes, this approach is biased towards the etl-like use cases
 rather
   than near-realtime-analytics use cases.
  
   On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
   hshreedha...@cloudera.com
   wrote:
  
   I get what you are saying. But getting exactly once right is an
   extremely hard problem - especially in presence of failure. The
   issue is failures
   can
   happen in a bunch of places. For example, before the
 notification
  of
   downstream store being successful reaches the receiver that
 updates
   the offsets, the node fails. The store was successful, but
   duplicates came in either way. This is something worth
 discussing
  by
   itself - but without uuids etc this might not really be solved
 even
   when you think it is.
  
   Anyway, I will look at the links. Even I am interested in all of
  the
   features you mentioned - no HDFS WAL for Kafka and once-only
   delivery,
   but
   I doubt the latter is really possible to guarantee - though I
  really
   would
   love to have that!
  
   Thanks,
   Hari
  
  
   On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
   c...@koeninger.org
   wrote:
  
   Thanks for the replies.
  
   Regarding skipping WAL, it's not just about optimization.  If
 you
   actually want exactly-once semantics, you need control of kafka
   offsets
   as
   well, including the ability to not use zookeeper as the system
 of
   record for offsets.  Kafka already is a reliable system that
 has
   strong
   ordering
   guarantees (within a partition) and does not mandate the use of
   zookeeper
   to store offsets.  I think there should be a spark api that
 acts
  as
   a
   very
   simple intermediary between Kafka and the user's choice of
   downstream
   store.
  
   Take a look at the links I posted

Re: Which committers care about Kafka?

2014-12-24 Thread Hari Shreedharan
:44 AM, Dibyendu Bhattacharya 
   dibyendu.bhattach...@gmail.com wrote:
  
   Hi,
  
   Thanks to Jerry for mentioning the Kafka Spout for Trident. The
 Storm
   Trident has done the exact-once guarantee by processing the tuple
 in a
   batch  and assigning same transaction-id for a given batch . The
  replay for
   a given batch with a transaction-id will have exact same set of
  tuples and
   replay of batches happen in exact same order before the failure.
  
   Having this paradigm, if downstream system process data for a given
  batch
   for having a given transaction-id , and if during failure if same
  batch is
   again emitted , you can check if same transaction-id is already
  processed
   or not and hence can guarantee exact once semantics.
  
   And this can only be achieved in Spark if we use Low Level Kafka
  consumer
   API to process the offsets. This low level Kafka Consumer (
   https://github.com/dibbhatt/kafka-spark-consumer) has implemented
 the
   Spark Kafka consumer which uses Kafka Low Level APIs . All of the
  Kafka
   related logic has been taken from Storm-Kafka spout and which
 manages
  all
   Kafka re-balance and fault tolerant aspects and Kafka metadata
  managements.
  
   Presently this Consumer maintains that during Receiver failure, it
  will
   re-emit the exact same Block with same set of messages . Every
  message have
   the details of its partition, offset and topic related details
 which
  can
   tackle the SPARK-3146.
  
   As this Low Level consumer has complete control over the Kafka
  Offsets ,
   we can implement Trident like feature on top of it like having
  implement a
   transaction-id for a given block , and re-emit the same block with
  same set
   of message during Driver failure.
  
   Regards,
   Dibyendu
  
  
   On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai 
 saisai.s...@intel.com
   wrote:
  
   Hi all,
  
   I agree with Hari that Strong exact-once semantics is very hard to
   guarantee, especially in the failure situation. From my
  understanding even
   current implementation of ReliableKafkaReceiver cannot fully
  guarantee the
   exact once semantics once failed, first is the ordering of data
  replaying
   from last checkpoint, this is hard to guarantee when multiple
  partitions
   are injected in; second is the design complexity of achieving
 this,
  you can
   refer to the Kafka Spout in Trident, we have to dig into the very
  details
   of Kafka metadata management system to achieve this, not to say
  rebalance
   and fault-tolerance.
  
   Thanks
   Jerry
  
   -Original Message-
   From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
   Sent: Friday, December 19, 2014 5:57 AM
   To: Cody Koeninger
   Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
   Subject: Re: Which committers care about Kafka?
  
   But idempotency is not that easy t achieve sometimes. A strong
 only
  once
   semantic through a proper API would  be superuseful; but I'm not
  implying
   this is easy to achieve.
   On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org
 wrote:
  
   If the downstream store for the output data is idempotent or
   transactional, and that downstream store also is the system of
  record
   for kafka offsets, then you have exactly-once semantics.  Commit
   offsets with / after the data is stored.  On any failure, restart
  from
   the last committed offsets.
  
   Yes, this approach is biased towards the etl-like use cases
 rather
   than near-realtime-analytics use cases.
  
   On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
   hshreedha...@cloudera.com
   wrote:
  
   I get what you are saying. But getting exactly once right is an
   extremely hard problem - especially in presence of failure. The
   issue is failures
   can
   happen in a bunch of places. For example, before the
 notification
  of
   downstream store being successful reaches the receiver that
 updates
   the offsets, the node fails. The store was successful, but
   duplicates came in either way. This is something worth
 discussing
  by
   itself - but without uuids etc this might not really be solved
 even
   when you think it is.
  
   Anyway, I will look at the links. Even I am interested in all of
  the
   features you mentioned - no HDFS WAL for Kafka and once-only
   delivery,
   but
   I doubt the latter is really possible to guarantee - though I
  really
   would
   love to have that!
  
   Thanks,
   Hari
  
  
   On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
   c...@koeninger.org
   wrote:
  
   Thanks for the replies.
  
   Regarding skipping WAL, it's not just about optimization.  If
 you
   actually want exactly-once semantics, you need control of kafka
   offsets
   as
   well, including the ability to not use zookeeper as the system
 of
   record for offsets.  Kafka already is a reliable system that
 has
   strong
   ordering
   guarantees (within a partition) and does not mandate the use of
   zookeeper
   to store

Re: Which committers care about Kafka?

2014-12-19 Thread Dibyendu Bhattacharya
Hi,

Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
Trident has done the exact-once guarantee by processing the tuple in a
batch  and assigning same transaction-id for a given batch . The replay for
a given batch with a transaction-id will have exact same set of tuples and
replay of batches happen in exact same order before the failure.

Having this paradigm, if downstream system process data for a given batch
for having a given transaction-id , and if during failure if same batch is
again emitted , you can check if same transaction-id is already processed
or not and hence can guarantee exact once semantics.

And this can only be achieved in Spark if we use Low Level Kafka consumer
API to process the offsets. This low level Kafka Consumer (
https://github.com/dibbhatt/kafka-spark-consumer) has implemented the Spark
Kafka consumer which uses Kafka Low Level APIs . All of the Kafka related
logic has been taken from Storm-Kafka spout and which manages all Kafka
re-balance and fault tolerant aspects and Kafka metadata managements.

Presently this Consumer maintains that during Receiver failure, it will
re-emit the exact same Block with same set of messages . Every message have
the details of its partition, offset and topic related details which can
tackle the SPARK-3146.

As this Low Level consumer has complete control over the Kafka Offsets , we
can implement Trident like feature on top of it like having implement a
transaction-id for a given block , and re-emit the same block with same set
of message during Driver failure.

Regards,
Dibyendu


On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai saisai.s...@intel.com wrote:

 Hi all,

 I agree with Hari that Strong exact-once semantics is very hard to
 guarantee, especially in the failure situation. From my understanding even
 current implementation of ReliableKafkaReceiver cannot fully guarantee the
 exact once semantics once failed, first is the ordering of data replaying
 from last checkpoint, this is hard to guarantee when multiple partitions
 are injected in; second is the design complexity of achieving this, you can
 refer to the Kafka Spout in Trident, we have to dig into the very details
 of Kafka metadata management system to achieve this, not to say rebalance
 and fault-tolerance.

 Thanks
 Jerry

 -Original Message-
 From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 Sent: Friday, December 19, 2014 5:57 AM
 To: Cody Koeninger
 Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
 Subject: Re: Which committers care about Kafka?

 But idempotency is not that easy t achieve sometimes. A strong only once
 semantic through a proper API would  be superuseful; but I'm not implying
 this is easy to achieve.
 On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org wrote:

  If the downstream store for the output data is idempotent or
  transactional, and that downstream store also is the system of record
  for kafka offsets, then you have exactly-once semantics.  Commit
  offsets with / after the data is stored.  On any failure, restart from
 the last committed offsets.
 
  Yes, this approach is biased towards the etl-like use cases rather
  than near-realtime-analytics use cases.
 
  On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
  hshreedha...@cloudera.com
   wrote:
  
   I get what you are saying. But getting exactly once right is an
   extremely hard problem - especially in presence of failure. The
   issue is failures
  can
   happen in a bunch of places. For example, before the notification of
   downstream store being successful reaches the receiver that updates
   the offsets, the node fails. The store was successful, but
   duplicates came in either way. This is something worth discussing by
   itself - but without uuids etc this might not really be solved even
 when you think it is.
  
   Anyway, I will look at the links. Even I am interested in all of the
   features you mentioned - no HDFS WAL for Kafka and once-only
   delivery,
  but
   I doubt the latter is really possible to guarantee - though I really
  would
   love to have that!
  
   Thanks,
   Hari
  
  
   On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
   c...@koeninger.org
   wrote:
  
   Thanks for the replies.
  
   Regarding skipping WAL, it's not just about optimization.  If you
   actually want exactly-once semantics, you need control of kafka
   offsets
  as
   well, including the ability to not use zookeeper as the system of
   record for offsets.  Kafka already is a reliable system that has
   strong
  ordering
   guarantees (within a partition) and does not mandate the use of
  zookeeper
   to store offsets.  I think there should be a spark api that acts as
   a
  very
   simple intermediary between Kafka and the user's choice of
   downstream
  store.
  
   Take a look at the links I posted - if there's already been 2
  independent
   implementations of the idea, chances are it's something people need.
  
   On Thu, Dec

Re: Which committers care about Kafka?

2014-12-19 Thread Hari Shreedharan
Hi Dibyendu,

Thanks for the details on the implementation. But I still do not believe
that it is no duplicates - what they achieve is that the same batch is
processed exactly the same way every time (but see it may be processed more
than once) - so it depends on the operation being idempotent. I believe
Trident uses ZK to keep track of the transactions - a batch can be
processed multiple times in failure scenarios (for example, the transaction
is processed but before ZK is updated the machine fails, causing a new
node to process it again).

I don't think it is impossible to do this in Spark Streaming as well and
I'd be really interested in working on it at some point in the near future.

On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya 
dibyendu.bhattach...@gmail.com wrote:

 Hi,

 Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
 Trident has done the exact-once guarantee by processing the tuple in a
 batch  and assigning same transaction-id for a given batch . The replay for
 a given batch with a transaction-id will have exact same set of tuples and
 replay of batches happen in exact same order before the failure.

 Having this paradigm, if downstream system process data for a given batch
 for having a given transaction-id , and if during failure if same batch is
 again emitted , you can check if same transaction-id is already processed
 or not and hence can guarantee exact once semantics.

 And this can only be achieved in Spark if we use Low Level Kafka consumer
 API to process the offsets. This low level Kafka Consumer (
 https://github.com/dibbhatt/kafka-spark-consumer) has implemented the
 Spark Kafka consumer which uses Kafka Low Level APIs . All of the Kafka
 related logic has been taken from Storm-Kafka spout and which manages all
 Kafka re-balance and fault tolerant aspects and Kafka metadata managements.

 Presently this Consumer maintains that during Receiver failure, it will
 re-emit the exact same Block with same set of messages . Every message have
 the details of its partition, offset and topic related details which can
 tackle the SPARK-3146.

 As this Low Level consumer has complete control over the Kafka Offsets ,
 we can implement Trident like feature on top of it like having implement a
 transaction-id for a given block , and re-emit the same block with same set
 of message during Driver failure.

 Regards,
 Dibyendu


 On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai saisai.s...@intel.com
 wrote:

 Hi all,

 I agree with Hari that Strong exact-once semantics is very hard to
 guarantee, especially in the failure situation. From my understanding even
 current implementation of ReliableKafkaReceiver cannot fully guarantee the
 exact once semantics once failed, first is the ordering of data replaying
 from last checkpoint, this is hard to guarantee when multiple partitions
 are injected in; second is the design complexity of achieving this, you can
 refer to the Kafka Spout in Trident, we have to dig into the very details
 of Kafka metadata management system to achieve this, not to say rebalance
 and fault-tolerance.

 Thanks
 Jerry

 -Original Message-
 From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 Sent: Friday, December 19, 2014 5:57 AM
 To: Cody Koeninger
 Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
 Subject: Re: Which committers care about Kafka?

 But idempotency is not that easy t achieve sometimes. A strong only once
 semantic through a proper API would  be superuseful; but I'm not implying
 this is easy to achieve.
 On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org wrote:

  If the downstream store for the output data is idempotent or
  transactional, and that downstream store also is the system of record
  for kafka offsets, then you have exactly-once semantics.  Commit
  offsets with / after the data is stored.  On any failure, restart from
 the last committed offsets.
 
  Yes, this approach is biased towards the etl-like use cases rather
  than near-realtime-analytics use cases.
 
  On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
  hshreedha...@cloudera.com
   wrote:
  
   I get what you are saying. But getting exactly once right is an
   extremely hard problem - especially in presence of failure. The
   issue is failures
  can
   happen in a bunch of places. For example, before the notification of
   downstream store being successful reaches the receiver that updates
   the offsets, the node fails. The store was successful, but
   duplicates came in either way. This is something worth discussing by
   itself - but without uuids etc this might not really be solved even
 when you think it is.
  
   Anyway, I will look at the links. Even I am interested in all of the
   features you mentioned - no HDFS WAL for Kafka and once-only
   delivery,
  but
   I doubt the latter is really possible to guarantee - though I really
  would
   love to have that!
  
   Thanks,
   Hari
  
  
   On Thu, Dec 18, 2014

Re: Which committers care about Kafka?

2014-12-19 Thread Sean McNamara
Please feel free to correct me if I’m wrong, but I think the exactly once spark 
streaming semantics can easily be solved using updateStateByKey. Make the key 
going into updateStateByKey be a hash of the event, or pluck off some uuid from 
the message.  The updateFunc would only emit the message if the key did not 
exist, and the user has complete control over the window of time / state 
lifecycle for detecting duplicates.  It also makes it really easy to detect and 
take action (alert?) when you DO see a duplicate, or make memory tradeoffs 
within an error bound using a sketch algorithm.  The kafka simple consumer is 
insanely complex, if possible I think it would be better (and vastly more 
flexible) to get reliability using the primitives that spark so elegantly 
provides.

Cheers,

Sean


 On Dec 19, 2014, at 12:06 PM, Hari Shreedharan hshreedha...@cloudera.com 
 wrote:
 
 Hi Dibyendu,
 
 Thanks for the details on the implementation. But I still do not believe
 that it is no duplicates - what they achieve is that the same batch is
 processed exactly the same way every time (but see it may be processed more
 than once) - so it depends on the operation being idempotent. I believe
 Trident uses ZK to keep track of the transactions - a batch can be
 processed multiple times in failure scenarios (for example, the transaction
 is processed but before ZK is updated the machine fails, causing a new
 node to process it again).
 
 I don't think it is impossible to do this in Spark Streaming as well and
 I'd be really interested in working on it at some point in the near future.
 
 On Fri, Dec 19, 2014 at 1:44 AM, Dibyendu Bhattacharya 
 dibyendu.bhattach...@gmail.com wrote:
 
 Hi,
 
 Thanks to Jerry for mentioning the Kafka Spout for Trident. The Storm
 Trident has done the exact-once guarantee by processing the tuple in a
 batch  and assigning same transaction-id for a given batch . The replay for
 a given batch with a transaction-id will have exact same set of tuples and
 replay of batches happen in exact same order before the failure.
 
 Having this paradigm, if downstream system process data for a given batch
 for having a given transaction-id , and if during failure if same batch is
 again emitted , you can check if same transaction-id is already processed
 or not and hence can guarantee exact once semantics.
 
 And this can only be achieved in Spark if we use Low Level Kafka consumer
 API to process the offsets. This low level Kafka Consumer (
 https://github.com/dibbhatt/kafka-spark-consumer) has implemented the
 Spark Kafka consumer which uses Kafka Low Level APIs . All of the Kafka
 related logic has been taken from Storm-Kafka spout and which manages all
 Kafka re-balance and fault tolerant aspects and Kafka metadata managements.
 
 Presently this Consumer maintains that during Receiver failure, it will
 re-emit the exact same Block with same set of messages . Every message have
 the details of its partition, offset and topic related details which can
 tackle the SPARK-3146.
 
 As this Low Level consumer has complete control over the Kafka Offsets ,
 we can implement Trident like feature on top of it like having implement a
 transaction-id for a given block , and re-emit the same block with same set
 of message during Driver failure.
 
 Regards,
 Dibyendu
 
 
 On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai saisai.s...@intel.com
 wrote:
 
 Hi all,
 
 I agree with Hari that Strong exact-once semantics is very hard to
 guarantee, especially in the failure situation. From my understanding even
 current implementation of ReliableKafkaReceiver cannot fully guarantee the
 exact once semantics once failed, first is the ordering of data replaying
 from last checkpoint, this is hard to guarantee when multiple partitions
 are injected in; second is the design complexity of achieving this, you can
 refer to the Kafka Spout in Trident, we have to dig into the very details
 of Kafka metadata management system to achieve this, not to say rebalance
 and fault-tolerance.
 
 Thanks
 Jerry
 
 -Original Message-
 From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
 Sent: Friday, December 19, 2014 5:57 AM
 To: Cody Koeninger
 Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
 Subject: Re: Which committers care about Kafka?
 
 But idempotency is not that easy t achieve sometimes. A strong only once
 semantic through a proper API would  be superuseful; but I'm not implying
 this is easy to achieve.
 On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org wrote:
 
 If the downstream store for the output data is idempotent or
 transactional, and that downstream store also is the system of record
 for kafka offsets, then you have exactly-once semantics.  Commit
 offsets with / after the data is stored.  On any failure, restart from
 the last committed offsets.
 
 Yes, this approach is biased towards the etl-like use cases rather
 than near-realtime-analytics use cases.
 
 On Thu, Dec 18, 2014

Re: Which committers care about Kafka?

2014-12-19 Thread Cody Koeninger
  Subject: Re: Which committers care about Kafka?
 
  But idempotency is not that easy t achieve sometimes. A strong only
 once
  semantic through a proper API would  be superuseful; but I'm not
 implying
  this is easy to achieve.
  On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org wrote:
 
  If the downstream store for the output data is idempotent or
  transactional, and that downstream store also is the system of record
  for kafka offsets, then you have exactly-once semantics.  Commit
  offsets with / after the data is stored.  On any failure, restart from
  the last committed offsets.
 
  Yes, this approach is biased towards the etl-like use cases rather
  than near-realtime-analytics use cases.
 
  On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
  hshreedha...@cloudera.com
  wrote:
 
  I get what you are saying. But getting exactly once right is an
  extremely hard problem - especially in presence of failure. The
  issue is failures
  can
  happen in a bunch of places. For example, before the notification of
  downstream store being successful reaches the receiver that updates
  the offsets, the node fails. The store was successful, but
  duplicates came in either way. This is something worth discussing by
  itself - but without uuids etc this might not really be solved even
  when you think it is.
 
  Anyway, I will look at the links. Even I am interested in all of the
  features you mentioned - no HDFS WAL for Kafka and once-only
  delivery,
  but
  I doubt the latter is really possible to guarantee - though I really
  would
  love to have that!
 
  Thanks,
  Hari
 
 
  On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
  c...@koeninger.org
  wrote:
 
  Thanks for the replies.
 
  Regarding skipping WAL, it's not just about optimization.  If you
  actually want exactly-once semantics, you need control of kafka
  offsets
  as
  well, including the ability to not use zookeeper as the system of
  record for offsets.  Kafka already is a reliable system that has
  strong
  ordering
  guarantees (within a partition) and does not mandate the use of
  zookeeper
  to store offsets.  I think there should be a spark api that acts as
  a
  very
  simple intermediary between Kafka and the user's choice of
  downstream
  store.
 
  Take a look at the links I posted - if there's already been 2
  independent
  implementations of the idea, chances are it's something people need.
 
  On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan 
  hshreedha...@cloudera.com wrote:
 
  Hi Cody,
 
  I am an absolute +1 on SPARK-3146. I think we can implement
  something pretty simple and lightweight for that one.
 
  For the Kafka DStream skipping the WAL implementation - this is
  something I discussed with TD a few weeks ago. Though it is a good
  idea to
  implement this to avoid unnecessary HDFS writes, it is an
  optimization. For
  that reason, we must be careful in implementation. There are a
  couple
  of
  issues that we need to ensure works properly - specifically
  ordering.
  To
  ensure we pull messages from different topics and partitions in
  the
  same
  order after failure, we’d still have to persist the metadata to
  HDFS
  (or
  some other system) - this metadata must contain the order of
  messages consumed, so we know how to re-read the messages. I am
  planning to
  explore
  this once I have some time (probably in Jan). In addition, we must
  also ensure bucketing functions work fine as well. I will file a
  placeholder jira for this one.
 
  I also wrote an API to write data back to Kafka a while back -
  https://github.com/apache/spark/pull/2994 . I am hoping that this
  will get pulled in soon, as this is something I know people want.
  I am open
  to
  feedback on that - anything that I can do to make it better.
 
  Thanks,
  Hari
 
 
  On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
  pwend...@gmail.com
  wrote:
 
  Hey Cody,
 
  Thanks for reaching out with this. The lead on streaming is TD -
  he is traveling this week though so I can respond a bit. To the
  high level point of whether Kafka is important - it definitely
  is. Something like 80% of Spark Streaming deployments
  (anecdotally) ingest data from Kafka. Also, good support for
  Kafka is something we generally want in Spark and not a library.
  In some cases IIRC there were user libraries that used unstable
  Kafka API's and we were somewhat waiting on Kafka to stabilize
  them to merge things upstream. Otherwise users wouldn't be able
  to use newer Kakfa versions. This is a high level impression only
  though, I haven't talked to TD about this recently so it's worth
  revisiting given the developments in Kafka.
 
  Please do bring things up like this on the dev list if there are
  blockers for your usage - thanks for pinging it.
 
  - Patrick
 
  On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger
  c...@koeninger.org
  wrote:
  Now that 1.2 is finalized... who are the go-to people to get
  some long-standing Kafka related issues resolved

Re: Which committers care about Kafka?

2014-12-19 Thread Hari Shreedharan
 metadata management system to achieve this, not to say
 rebalance
  and fault-tolerance.
 
  Thanks
  Jerry
 
  -Original Message-
  From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
  Sent: Friday, December 19, 2014 5:57 AM
  To: Cody Koeninger
  Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
  Subject: Re: Which committers care about Kafka?
 
  But idempotency is not that easy t achieve sometimes. A strong only
 once
  semantic through a proper API would  be superuseful; but I'm not
 implying
  this is easy to achieve.
  On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org wrote:
 
  If the downstream store for the output data is idempotent or
  transactional, and that downstream store also is the system of record
  for kafka offsets, then you have exactly-once semantics.  Commit
  offsets with / after the data is stored.  On any failure, restart from
  the last committed offsets.
 
  Yes, this approach is biased towards the etl-like use cases rather
  than near-realtime-analytics use cases.
 
  On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
  hshreedha...@cloudera.com
  wrote:
 
  I get what you are saying. But getting exactly once right is an
  extremely hard problem - especially in presence of failure. The
  issue is failures
  can
  happen in a bunch of places. For example, before the notification of
  downstream store being successful reaches the receiver that updates
  the offsets, the node fails. The store was successful, but
  duplicates came in either way. This is something worth discussing by
  itself - but without uuids etc this might not really be solved even
  when you think it is.
 
  Anyway, I will look at the links. Even I am interested in all of the
  features you mentioned - no HDFS WAL for Kafka and once-only
  delivery,
  but
  I doubt the latter is really possible to guarantee - though I really
  would
  love to have that!
 
  Thanks,
  Hari
 
 
  On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
  c...@koeninger.org
  wrote:
 
  Thanks for the replies.
 
  Regarding skipping WAL, it's not just about optimization.  If you
  actually want exactly-once semantics, you need control of kafka
  offsets
  as
  well, including the ability to not use zookeeper as the system of
  record for offsets.  Kafka already is a reliable system that has
  strong
  ordering
  guarantees (within a partition) and does not mandate the use of
  zookeeper
  to store offsets.  I think there should be a spark api that acts as
  a
  very
  simple intermediary between Kafka and the user's choice of
  downstream
  store.
 
  Take a look at the links I posted - if there's already been 2
  independent
  implementations of the idea, chances are it's something people need.
 
  On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan 
  hshreedha...@cloudera.com wrote:
 
  Hi Cody,
 
  I am an absolute +1 on SPARK-3146. I think we can implement
  something pretty simple and lightweight for that one.
 
  For the Kafka DStream skipping the WAL implementation - this is
  something I discussed with TD a few weeks ago. Though it is a good
  idea to
  implement this to avoid unnecessary HDFS writes, it is an
  optimization. For
  that reason, we must be careful in implementation. There are a
  couple
  of
  issues that we need to ensure works properly - specifically
  ordering.
  To
  ensure we pull messages from different topics and partitions in
  the
  same
  order after failure, we’d still have to persist the metadata to
  HDFS
  (or
  some other system) - this metadata must contain the order of
  messages consumed, so we know how to re-read the messages. I am
  planning to
  explore
  this once I have some time (probably in Jan). In addition, we must
  also ensure bucketing functions work fine as well. I will file a
  placeholder jira for this one.
 
  I also wrote an API to write data back to Kafka a while back -
  https://github.com/apache/spark/pull/2994 . I am hoping that this
  will get pulled in soon, as this is something I know people want.
  I am open
  to
  feedback on that - anything that I can do to make it better.
 
  Thanks,
  Hari
 
 
  On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell
  pwend...@gmail.com
  wrote:
 
  Hey Cody,
 
  Thanks for reaching out with this. The lead on streaming is TD -
  he is traveling this week though so I can respond a bit. To the
  high level point of whether Kafka is important - it definitely
  is. Something like 80% of Spark Streaming deployments
  (anecdotally) ingest data from Kafka. Also, good support for
  Kafka is something we generally want in Spark and not a library.
  In some cases IIRC there were user libraries that used unstable
  Kafka API's and we were somewhat waiting on Kafka to stabilize
  them to merge things upstream. Otherwise users wouldn't be able
  to use newer Kakfa versions. This is a high level impression only
  though, I haven't talked to TD about this recently so it's worth
  revisiting given the developments in Kafka

Re: Which committers care about Kafka?

2014-12-19 Thread Cody Koeninger
 Kafka
 consumer
  API to process the offsets. This low level Kafka Consumer (
  https://github.com/dibbhatt/kafka-spark-consumer) has implemented the
  Spark Kafka consumer which uses Kafka Low Level APIs . All of the
 Kafka
  related logic has been taken from Storm-Kafka spout and which manages
 all
  Kafka re-balance and fault tolerant aspects and Kafka metadata
 managements.
 
  Presently this Consumer maintains that during Receiver failure, it
 will
  re-emit the exact same Block with same set of messages . Every
 message have
  the details of its partition, offset and topic related details which
 can
  tackle the SPARK-3146.
 
  As this Low Level consumer has complete control over the Kafka
 Offsets ,
  we can implement Trident like feature on top of it like having
 implement a
  transaction-id for a given block , and re-emit the same block with
 same set
  of message during Driver failure.
 
  Regards,
  Dibyendu
 
 
  On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai saisai.s...@intel.com
  wrote:
 
  Hi all,
 
  I agree with Hari that Strong exact-once semantics is very hard to
  guarantee, especially in the failure situation. From my
 understanding even
  current implementation of ReliableKafkaReceiver cannot fully
 guarantee the
  exact once semantics once failed, first is the ordering of data
 replaying
  from last checkpoint, this is hard to guarantee when multiple
 partitions
  are injected in; second is the design complexity of achieving this,
 you can
  refer to the Kafka Spout in Trident, we have to dig into the very
 details
  of Kafka metadata management system to achieve this, not to say
 rebalance
  and fault-tolerance.
 
  Thanks
  Jerry
 
  -Original Message-
  From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
  Sent: Friday, December 19, 2014 5:57 AM
  To: Cody Koeninger
  Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
  Subject: Re: Which committers care about Kafka?
 
  But idempotency is not that easy t achieve sometimes. A strong only
 once
  semantic through a proper API would  be superuseful; but I'm not
 implying
  this is easy to achieve.
  On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org wrote:
 
  If the downstream store for the output data is idempotent or
  transactional, and that downstream store also is the system of
 record
  for kafka offsets, then you have exactly-once semantics.  Commit
  offsets with / after the data is stored.  On any failure, restart
 from
  the last committed offsets.
 
  Yes, this approach is biased towards the etl-like use cases rather
  than near-realtime-analytics use cases.
 
  On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
  hshreedha...@cloudera.com
  wrote:
 
  I get what you are saying. But getting exactly once right is an
  extremely hard problem - especially in presence of failure. The
  issue is failures
  can
  happen in a bunch of places. For example, before the notification
 of
  downstream store being successful reaches the receiver that updates
  the offsets, the node fails. The store was successful, but
  duplicates came in either way. This is something worth discussing
 by
  itself - but without uuids etc this might not really be solved even
  when you think it is.
 
  Anyway, I will look at the links. Even I am interested in all of
 the
  features you mentioned - no HDFS WAL for Kafka and once-only
  delivery,
  but
  I doubt the latter is really possible to guarantee - though I
 really
  would
  love to have that!
 
  Thanks,
  Hari
 
 
  On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
  c...@koeninger.org
  wrote:
 
  Thanks for the replies.
 
  Regarding skipping WAL, it's not just about optimization.  If you
  actually want exactly-once semantics, you need control of kafka
  offsets
  as
  well, including the ability to not use zookeeper as the system of
  record for offsets.  Kafka already is a reliable system that has
  strong
  ordering
  guarantees (within a partition) and does not mandate the use of
  zookeeper
  to store offsets.  I think there should be a spark api that acts
 as
  a
  very
  simple intermediary between Kafka and the user's choice of
  downstream
  store.
 
  Take a look at the links I posted - if there's already been 2
  independent
  implementations of the idea, chances are it's something people
 need.
 
  On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan 
  hshreedha...@cloudera.com wrote:
 
  Hi Cody,
 
  I am an absolute +1 on SPARK-3146. I think we can implement
  something pretty simple and lightweight for that one.
 
  For the Kafka DStream skipping the WAL implementation - this is
  something I discussed with TD a few weeks ago. Though it is a
 good
  idea to
  implement this to avoid unnecessary HDFS writes, it is an
  optimization. For
  that reason, we must be careful in implementation. There are a
  couple
  of
  issues that we need to ensure works properly - specifically
  ordering.
  To
  ensure we pull messages from different topics

Re: Which committers care about Kafka?

2014-12-19 Thread Koert Kuipers
 for a given
  batch
   for having a given transaction-id , and if during failure if same
  batch is
   again emitted , you can check if same transaction-id is already
  processed
   or not and hence can guarantee exact once semantics.
  
   And this can only be achieved in Spark if we use Low Level Kafka
  consumer
   API to process the offsets. This low level Kafka Consumer (
   https://github.com/dibbhatt/kafka-spark-consumer) has implemented
 the
   Spark Kafka consumer which uses Kafka Low Level APIs . All of the
  Kafka
   related logic has been taken from Storm-Kafka spout and which
 manages
  all
   Kafka re-balance and fault tolerant aspects and Kafka metadata
  managements.
  
   Presently this Consumer maintains that during Receiver failure, it
  will
   re-emit the exact same Block with same set of messages . Every
  message have
   the details of its partition, offset and topic related details which
  can
   tackle the SPARK-3146.
  
   As this Low Level consumer has complete control over the Kafka
  Offsets ,
   we can implement Trident like feature on top of it like having
  implement a
   transaction-id for a given block , and re-emit the same block with
  same set
   of message during Driver failure.
  
   Regards,
   Dibyendu
  
  
   On Fri, Dec 19, 2014 at 7:33 AM, Shao, Saisai 
 saisai.s...@intel.com
   wrote:
  
   Hi all,
  
   I agree with Hari that Strong exact-once semantics is very hard to
   guarantee, especially in the failure situation. From my
  understanding even
   current implementation of ReliableKafkaReceiver cannot fully
  guarantee the
   exact once semantics once failed, first is the ordering of data
  replaying
   from last checkpoint, this is hard to guarantee when multiple
  partitions
   are injected in; second is the design complexity of achieving this,
  you can
   refer to the Kafka Spout in Trident, we have to dig into the very
  details
   of Kafka metadata management system to achieve this, not to say
  rebalance
   and fault-tolerance.
  
   Thanks
   Jerry
  
   -Original Message-
   From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com]
   Sent: Friday, December 19, 2014 5:57 AM
   To: Cody Koeninger
   Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
   Subject: Re: Which committers care about Kafka?
  
   But idempotency is not that easy t achieve sometimes. A strong only
  once
   semantic through a proper API would  be superuseful; but I'm not
  implying
   this is easy to achieve.
   On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org wrote:
  
   If the downstream store for the output data is idempotent or
   transactional, and that downstream store also is the system of
  record
   for kafka offsets, then you have exactly-once semantics.  Commit
   offsets with / after the data is stored.  On any failure, restart
  from
   the last committed offsets.
  
   Yes, this approach is biased towards the etl-like use cases rather
   than near-realtime-analytics use cases.
  
   On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
   hshreedha...@cloudera.com
   wrote:
  
   I get what you are saying. But getting exactly once right is an
   extremely hard problem - especially in presence of failure. The
   issue is failures
   can
   happen in a bunch of places. For example, before the notification
  of
   downstream store being successful reaches the receiver that
 updates
   the offsets, the node fails. The store was successful, but
   duplicates came in either way. This is something worth discussing
  by
   itself - but without uuids etc this might not really be solved
 even
   when you think it is.
  
   Anyway, I will look at the links. Even I am interested in all of
  the
   features you mentioned - no HDFS WAL for Kafka and once-only
   delivery,
   but
   I doubt the latter is really possible to guarantee - though I
  really
   would
   love to have that!
  
   Thanks,
   Hari
  
  
   On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger
   c...@koeninger.org
   wrote:
  
   Thanks for the replies.
  
   Regarding skipping WAL, it's not just about optimization.  If
 you
   actually want exactly-once semantics, you need control of kafka
   offsets
   as
   well, including the ability to not use zookeeper as the system
 of
   record for offsets.  Kafka already is a reliable system that has
   strong
   ordering
   guarantees (within a partition) and does not mandate the use of
   zookeeper
   to store offsets.  I think there should be a spark api that acts
  as
   a
   very
   simple intermediary between Kafka and the user's choice of
   downstream
   store.
  
   Take a look at the links I posted - if there's already been 2
   independent
   implementations of the idea, chances are it's something people
  need.
  
   On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan 
   hshreedha...@cloudera.com wrote:
  
   Hi Cody,
  
   I am an absolute +1 on SPARK-3146. I think we can implement
   something pretty simple and lightweight

Re: Which committers care about Kafka?

2014-12-18 Thread Patrick Wendell
Hey Cody,

Thanks for reaching out with this. The lead on streaming is TD - he is
traveling this week though so I can respond a bit. To the high level
point of whether Kafka is important - it definitely is. Something like
80% of Spark Streaming deployments (anecdotally) ingest data from
Kafka. Also, good support for Kafka is something we generally want in
Spark and not a library. In some cases IIRC there were user libraries
that used unstable Kafka API's and we were somewhat waiting on Kafka
to stabilize them to merge things upstream. Otherwise users wouldn't
be able to use newer Kakfa versions. This is a high level impression
only though, I haven't talked to TD about this recently so it's worth
revisiting given the developments in Kafka.

Please do bring things up like this on the dev list if there are
blockers for your usage - thanks for pinging it.

- Patrick

On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger c...@koeninger.org wrote:
 Now that 1.2 is finalized...  who are the go-to people to get some
 long-standing Kafka related issues resolved?

 The existing api is not sufficiently safe nor flexible for our production
 use.  I don't think we're alone in this viewpoint, because I've seen
 several different patches and libraries to fix the same things we've been
 running into.

 Regarding flexibility

 https://issues.apache.org/jira/browse/SPARK-3146

 has been outstanding since August, and IMHO an equivalent of this is
 absolutely necessary.  We wrote a similar patch ourselves, then found that
 PR and have been running it in production.  We wouldn't be able to get our
 jobs done without it.  It also allows users to solve a whole class of
 problems for themselves (e.g. SPARK-2388, arbitrary delay of messages, etc).

 Regarding safety, I understand the motivation behind WriteAheadLog as a
 general solution for streaming unreliable sources, but Kafka already is a
 reliable source.  I think there's a need for an api that treats it as
 such.  Even aside from the performance issues of duplicating the
 write-ahead log in kafka into another write-ahead log in hdfs, I need
 exactly-once semantics in the face of failure (I've had failures that
 prevented reloading a spark streaming checkpoint, for instance).

 I've got an implementation i've been using

 https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka
 /src/main/scala/org/apache/spark/rdd/kafka

 Tresata has something similar at https://github.com/tresata/spark-kafka,
 and I know there were earlier attempts based on Storm code.

 Trying to distribute these kinds of fixes as libraries rather than patches
 to Spark is problematic, because large portions of the implementation are
 private[spark].

  I'd like to help, but i need to know whose attention to get.

-
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org



Re: Which committers care about Kafka?

2014-12-18 Thread Cody Koeninger
Thanks for the replies.

Regarding skipping WAL, it's not just about optimization.  If you actually
want exactly-once semantics, you need control of kafka offsets as well,
including the ability to not use zookeeper as the system of record for
offsets.  Kafka already is a reliable system that has strong ordering
guarantees (within a partition) and does not mandate the use of zookeeper
to store offsets.  I think there should be a spark api that acts as a very
simple intermediary between Kafka and the user's choice of downstream store.

Take a look at the links I posted - if there's already been 2 independent
implementations of the idea, chances are it's something people need.

On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan hshreedha...@cloudera.com
 wrote:

 Hi Cody,

 I am an absolute +1 on SPARK-3146. I think we can implement something
 pretty simple and lightweight for that one.

 For the Kafka DStream skipping the WAL implementation - this is something
 I discussed with TD a few weeks ago. Though it is a good idea to implement
 this to avoid unnecessary HDFS writes, it is an optimization. For that
 reason, we must be careful in implementation. There are a couple of issues
 that we need to ensure works properly - specifically ordering. To ensure we
 pull messages from different topics and partitions in the same order after
 failure, we’d still have to persist the metadata to HDFS (or some other
 system) - this metadata must contain the order of messages consumed, so we
 know how to re-read the messages. I am planning to explore this once I have
 some time (probably in Jan). In addition, we must also ensure bucketing
 functions work fine as well. I will file a placeholder jira for this one.

 I also wrote an API to write data back to Kafka a while back -
 https://github.com/apache/spark/pull/2994 . I am hoping that this will
 get pulled in soon, as this is something I know people want. I am open to
 feedback on that - anything that I can do to make it better.

 Thanks,
 Hari


 On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 Hey Cody,

 Thanks for reaching out with this. The lead on streaming is TD - he is
 traveling this week though so I can respond a bit. To the high level
 point of whether Kafka is important - it definitely is. Something like
 80% of Spark Streaming deployments (anecdotally) ingest data from
 Kafka. Also, good support for Kafka is something we generally want in
 Spark and not a library. In some cases IIRC there were user libraries
 that used unstable Kafka API's and we were somewhat waiting on Kafka
 to stabilize them to merge things upstream. Otherwise users wouldn't
 be able to use newer Kakfa versions. This is a high level impression
 only though, I haven't talked to TD about this recently so it's worth
 revisiting given the developments in Kafka.

 Please do bring things up like this on the dev list if there are
 blockers for your usage - thanks for pinging it.

 - Patrick

 On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger c...@koeninger.org
 wrote:
  Now that 1.2 is finalized... who are the go-to people to get some
  long-standing Kafka related issues resolved?
 
  The existing api is not sufficiently safe nor flexible for our
 production
  use. I don't think we're alone in this viewpoint, because I've seen
  several different patches and libraries to fix the same things we've
 been
  running into.
 
  Regarding flexibility
 
  https://issues.apache.org/jira/browse/SPARK-3146
 
  has been outstanding since August, and IMHO an equivalent of this is
  absolutely necessary. We wrote a similar patch ourselves, then found
 that
  PR and have been running it in production. We wouldn't be able to get
 our
  jobs done without it. It also allows users to solve a whole class of
  problems for themselves (e.g. SPARK-2388, arbitrary delay of messages,
 etc).
 
  Regarding safety, I understand the motivation behind WriteAheadLog as a
  general solution for streaming unreliable sources, but Kafka already is
 a
  reliable source. I think there's a need for an api that treats it as
  such. Even aside from the performance issues of duplicating the
  write-ahead log in kafka into another write-ahead log in hdfs, I need
  exactly-once semantics in the face of failure (I've had failures that
  prevented reloading a spark streaming checkpoint, for instance).
 
  I've got an implementation i've been using
 
  https://github.com/koeninger/spark-1/tree/kafkaRdd/external/kafka
  /src/main/scala/org/apache/spark/rdd/kafka
 
  Tresata has something similar at https://github.com/tresata/spark-kafka,

  and I know there were earlier attempts based on Storm code.
 
  Trying to distribute these kinds of fixes as libraries rather than
 patches
  to Spark is problematic, because large portions of the implementation
 are
  private[spark].
 
  I'd like to help, but i need to know whose attention to get.

 -
 To 

Re: Which committers care about Kafka?

2014-12-18 Thread Hari Shreedharan
I get what you are saying. But getting exactly once right is an extremely hard 
problem - especially in presence of failure. The issue is failures can happen 
in a bunch of places. For example, before the notification of downstream store 
being successful reaches the receiver that updates the offsets, the node fails. 
The store was successful, but duplicates came in either way. This is something 
worth discussing by itself - but without uuids etc this might not really be 
solved even when you think it is.




Anyway, I will look at the links. Even I am interested in all of the features 
you mentioned - no HDFS WAL for Kafka and once-only delivery, but I doubt the 
latter is really possible to guarantee - though I really would love to have 
that!




Thanks, Hari

On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger c...@koeninger.org
wrote:

 Thanks for the replies.
 Regarding skipping WAL, it's not just about optimization.  If you actually
 want exactly-once semantics, you need control of kafka offsets as well,
 including the ability to not use zookeeper as the system of record for
 offsets.  Kafka already is a reliable system that has strong ordering
 guarantees (within a partition) and does not mandate the use of zookeeper
 to store offsets.  I think there should be a spark api that acts as a very
 simple intermediary between Kafka and the user's choice of downstream store.
 Take a look at the links I posted - if there's already been 2 independent
 implementations of the idea, chances are it's something people need.
 On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan hshreedha...@cloudera.com
 wrote:

 Hi Cody,

 I am an absolute +1 on SPARK-3146. I think we can implement something
 pretty simple and lightweight for that one.

 For the Kafka DStream skipping the WAL implementation - this is something
 I discussed with TD a few weeks ago. Though it is a good idea to implement
 this to avoid unnecessary HDFS writes, it is an optimization. For that
 reason, we must be careful in implementation. There are a couple of issues
 that we need to ensure works properly - specifically ordering. To ensure we
 pull messages from different topics and partitions in the same order after
 failure, we’d still have to persist the metadata to HDFS (or some other
 system) - this metadata must contain the order of messages consumed, so we
 know how to re-read the messages. I am planning to explore this once I have
 some time (probably in Jan). In addition, we must also ensure bucketing
 functions work fine as well. I will file a placeholder jira for this one.

 I also wrote an API to write data back to Kafka a while back -
 https://github.com/apache/spark/pull/2994 . I am hoping that this will
 get pulled in soon, as this is something I know people want. I am open to
 feedback on that - anything that I can do to make it better.

 Thanks,
 Hari


 On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell pwend...@gmail.com
 wrote:

 Hey Cody,

 Thanks for reaching out with this. The lead on streaming is TD - he is
 traveling this week though so I can respond a bit. To the high level
 point of whether Kafka is important - it definitely is. Something like
 80% of Spark Streaming deployments (anecdotally) ingest data from
 Kafka. Also, good support for Kafka is something we generally want in
 Spark and not a library. In some cases IIRC there were user libraries
 that used unstable Kafka API's and we were somewhat waiting on Kafka
 to stabilize them to merge things upstream. Otherwise users wouldn't
 be able to use newer Kakfa versions. This is a high level impression
 only though, I haven't talked to TD about this recently so it's worth
 revisiting given the developments in Kafka.

 Please do bring things up like this on the dev list if there are
 blockers for your usage - thanks for pinging it.

 - Patrick

 On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger c...@koeninger.org
 wrote:
  Now that 1.2 is finalized... who are the go-to people to get some
  long-standing Kafka related issues resolved?
 
  The existing api is not sufficiently safe nor flexible for our
 production
  use. I don't think we're alone in this viewpoint, because I've seen
  several different patches and libraries to fix the same things we've
 been
  running into.
 
  Regarding flexibility
 
  https://issues.apache.org/jira/browse/SPARK-3146
 
  has been outstanding since August, and IMHO an equivalent of this is
  absolutely necessary. We wrote a similar patch ourselves, then found
 that
  PR and have been running it in production. We wouldn't be able to get
 our
  jobs done without it. It also allows users to solve a whole class of
  problems for themselves (e.g. SPARK-2388, arbitrary delay of messages,
 etc).
 
  Regarding safety, I understand the motivation behind WriteAheadLog as a
  general solution for streaming unreliable sources, but Kafka already is
 a
  reliable source. I think there's a need for an api that treats it as
  such. Even aside from the performance 

Re: Which committers care about Kafka?

2014-12-18 Thread Cody Koeninger
If the downstream store for the output data is idempotent or transactional,
and that downstream store also is the system of record for kafka offsets,
then you have exactly-once semantics.  Commit offsets with / after the data
is stored.  On any failure, restart from the last committed offsets.

Yes, this approach is biased towards the etl-like use cases rather than
near-realtime-analytics use cases.

On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan hshreedha...@cloudera.com
 wrote:

 I get what you are saying. But getting exactly once right is an extremely
 hard problem - especially in presence of failure. The issue is failures can
 happen in a bunch of places. For example, before the notification of
 downstream store being successful reaches the receiver that updates the
 offsets, the node fails. The store was successful, but duplicates came in
 either way. This is something worth discussing by itself - but without
 uuids etc this might not really be solved even when you think it is.

 Anyway, I will look at the links. Even I am interested in all of the
 features you mentioned - no HDFS WAL for Kafka and once-only delivery, but
 I doubt the latter is really possible to guarantee - though I really would
 love to have that!

 Thanks,
 Hari


 On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger c...@koeninger.org
 wrote:

 Thanks for the replies.

 Regarding skipping WAL, it's not just about optimization.  If you
 actually want exactly-once semantics, you need control of kafka offsets as
 well, including the ability to not use zookeeper as the system of record
 for offsets.  Kafka already is a reliable system that has strong ordering
 guarantees (within a partition) and does not mandate the use of zookeeper
 to store offsets.  I think there should be a spark api that acts as a very
 simple intermediary between Kafka and the user's choice of downstream store.

 Take a look at the links I posted - if there's already been 2 independent
 implementations of the idea, chances are it's something people need.

 On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan 
 hshreedha...@cloudera.com wrote:

 Hi Cody,

 I am an absolute +1 on SPARK-3146. I think we can implement something
 pretty simple and lightweight for that one.

 For the Kafka DStream skipping the WAL implementation - this is
 something I discussed with TD a few weeks ago. Though it is a good idea to
 implement this to avoid unnecessary HDFS writes, it is an optimization. For
 that reason, we must be careful in implementation. There are a couple of
 issues that we need to ensure works properly - specifically ordering. To
 ensure we pull messages from different topics and partitions in the same
 order after failure, we’d still have to persist the metadata to HDFS (or
 some other system) - this metadata must contain the order of messages
 consumed, so we know how to re-read the messages. I am planning to explore
 this once I have some time (probably in Jan). In addition, we must also
 ensure bucketing functions work fine as well. I will file a placeholder
 jira for this one.

 I also wrote an API to write data back to Kafka a while back -
 https://github.com/apache/spark/pull/2994 . I am hoping that this will
 get pulled in soon, as this is something I know people want. I am open to
 feedback on that - anything that I can do to make it better.

 Thanks,
 Hari


 On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell pwend...@gmail.com
 wrote:

  Hey Cody,

 Thanks for reaching out with this. The lead on streaming is TD - he is
 traveling this week though so I can respond a bit. To the high level
 point of whether Kafka is important - it definitely is. Something like
 80% of Spark Streaming deployments (anecdotally) ingest data from
 Kafka. Also, good support for Kafka is something we generally want in
 Spark and not a library. In some cases IIRC there were user libraries
 that used unstable Kafka API's and we were somewhat waiting on Kafka
 to stabilize them to merge things upstream. Otherwise users wouldn't
 be able to use newer Kakfa versions. This is a high level impression
 only though, I haven't talked to TD about this recently so it's worth
 revisiting given the developments in Kafka.

 Please do bring things up like this on the dev list if there are
 blockers for your usage - thanks for pinging it.

 - Patrick

 On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger c...@koeninger.org
 wrote:
  Now that 1.2 is finalized... who are the go-to people to get some
  long-standing Kafka related issues resolved?
 
  The existing api is not sufficiently safe nor flexible for our
 production
  use. I don't think we're alone in this viewpoint, because I've seen
  several different patches and libraries to fix the same things we've
 been
  running into.
 
  Regarding flexibility
 
  https://issues.apache.org/jira/browse/SPARK-3146
 
  has been outstanding since August, and IMHO an equivalent of this is
  absolutely necessary. We wrote a similar patch ourselves, then found
 

Re: Which committers care about Kafka?

2014-12-18 Thread Luis Ángel Vicente Sánchez
But idempotency is not that easy t achieve sometimes. A strong only once
semantic through a proper API would  be superuseful; but I'm not implying
this is easy to achieve.
On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org wrote:

 If the downstream store for the output data is idempotent or transactional,
 and that downstream store also is the system of record for kafka offsets,
 then you have exactly-once semantics.  Commit offsets with / after the data
 is stored.  On any failure, restart from the last committed offsets.

 Yes, this approach is biased towards the etl-like use cases rather than
 near-realtime-analytics use cases.

 On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan 
 hshreedha...@cloudera.com
  wrote:
 
  I get what you are saying. But getting exactly once right is an extremely
  hard problem - especially in presence of failure. The issue is failures
 can
  happen in a bunch of places. For example, before the notification of
  downstream store being successful reaches the receiver that updates the
  offsets, the node fails. The store was successful, but duplicates came in
  either way. This is something worth discussing by itself - but without
  uuids etc this might not really be solved even when you think it is.
 
  Anyway, I will look at the links. Even I am interested in all of the
  features you mentioned - no HDFS WAL for Kafka and once-only delivery,
 but
  I doubt the latter is really possible to guarantee - though I really
 would
  love to have that!
 
  Thanks,
  Hari
 
 
  On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger c...@koeninger.org
  wrote:
 
  Thanks for the replies.
 
  Regarding skipping WAL, it's not just about optimization.  If you
  actually want exactly-once semantics, you need control of kafka offsets
 as
  well, including the ability to not use zookeeper as the system of record
  for offsets.  Kafka already is a reliable system that has strong
 ordering
  guarantees (within a partition) and does not mandate the use of
 zookeeper
  to store offsets.  I think there should be a spark api that acts as a
 very
  simple intermediary between Kafka and the user's choice of downstream
 store.
 
  Take a look at the links I posted - if there's already been 2
 independent
  implementations of the idea, chances are it's something people need.
 
  On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan 
  hshreedha...@cloudera.com wrote:
 
  Hi Cody,
 
  I am an absolute +1 on SPARK-3146. I think we can implement something
  pretty simple and lightweight for that one.
 
  For the Kafka DStream skipping the WAL implementation - this is
  something I discussed with TD a few weeks ago. Though it is a good
 idea to
  implement this to avoid unnecessary HDFS writes, it is an
 optimization. For
  that reason, we must be careful in implementation. There are a couple
 of
  issues that we need to ensure works properly - specifically ordering.
 To
  ensure we pull messages from different topics and partitions in the
 same
  order after failure, we’d still have to persist the metadata to HDFS
 (or
  some other system) - this metadata must contain the order of messages
  consumed, so we know how to re-read the messages. I am planning to
 explore
  this once I have some time (probably in Jan). In addition, we must also
  ensure bucketing functions work fine as well. I will file a placeholder
  jira for this one.
 
  I also wrote an API to write data back to Kafka a while back -
  https://github.com/apache/spark/pull/2994 . I am hoping that this will
  get pulled in soon, as this is something I know people want. I am open
 to
  feedback on that - anything that I can do to make it better.
 
  Thanks,
  Hari
 
 
  On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell pwend...@gmail.com
  wrote:
 
   Hey Cody,
 
  Thanks for reaching out with this. The lead on streaming is TD - he is
  traveling this week though so I can respond a bit. To the high level
  point of whether Kafka is important - it definitely is. Something like
  80% of Spark Streaming deployments (anecdotally) ingest data from
  Kafka. Also, good support for Kafka is something we generally want in
  Spark and not a library. In some cases IIRC there were user libraries
  that used unstable Kafka API's and we were somewhat waiting on Kafka
  to stabilize them to merge things upstream. Otherwise users wouldn't
  be able to use newer Kakfa versions. This is a high level impression
  only though, I haven't talked to TD about this recently so it's worth
  revisiting given the developments in Kafka.
 
  Please do bring things up like this on the dev list if there are
  blockers for your usage - thanks for pinging it.
 
  - Patrick
 
  On Thu, Dec 18, 2014 at 7:07 AM, Cody Koeninger c...@koeninger.org
  wrote:
   Now that 1.2 is finalized... who are the go-to people to get some
   long-standing Kafka related issues resolved?
  
   The existing api is not sufficiently safe nor flexible for our
  production
   use. I don't think we're alone 

RE: Which committers care about Kafka?

2014-12-18 Thread Shao, Saisai
Hi all,

I agree with Hari that Strong exact-once semantics is very hard to guarantee, 
especially in the failure situation. From my understanding even current 
implementation of ReliableKafkaReceiver cannot fully guarantee the exact once 
semantics once failed, first is the ordering of data replaying from last 
checkpoint, this is hard to guarantee when multiple partitions are injected in; 
second is the design complexity of achieving this, you can refer to the Kafka 
Spout in Trident, we have to dig into the very details of Kafka metadata 
management system to achieve this, not to say rebalance and fault-tolerance. 

Thanks
Jerry

-Original Message-
From: Luis Ángel Vicente Sánchez [mailto:langel.gro...@gmail.com] 
Sent: Friday, December 19, 2014 5:57 AM
To: Cody Koeninger
Cc: Hari Shreedharan; Patrick Wendell; dev@spark.apache.org
Subject: Re: Which committers care about Kafka?

But idempotency is not that easy t achieve sometimes. A strong only once 
semantic through a proper API would  be superuseful; but I'm not implying this 
is easy to achieve.
On 18 Dec 2014 21:52, Cody Koeninger c...@koeninger.org wrote:

 If the downstream store for the output data is idempotent or 
 transactional, and that downstream store also is the system of record 
 for kafka offsets, then you have exactly-once semantics.  Commit 
 offsets with / after the data is stored.  On any failure, restart from the 
 last committed offsets.

 Yes, this approach is biased towards the etl-like use cases rather 
 than near-realtime-analytics use cases.

 On Thu, Dec 18, 2014 at 3:27 PM, Hari Shreedharan  
 hshreedha...@cloudera.com
  wrote:
 
  I get what you are saying. But getting exactly once right is an 
  extremely hard problem - especially in presence of failure. The 
  issue is failures
 can
  happen in a bunch of places. For example, before the notification of 
  downstream store being successful reaches the receiver that updates 
  the offsets, the node fails. The store was successful, but 
  duplicates came in either way. This is something worth discussing by 
  itself - but without uuids etc this might not really be solved even when 
  you think it is.
 
  Anyway, I will look at the links. Even I am interested in all of the 
  features you mentioned - no HDFS WAL for Kafka and once-only 
  delivery,
 but
  I doubt the latter is really possible to guarantee - though I really
 would
  love to have that!
 
  Thanks,
  Hari
 
 
  On Thu, Dec 18, 2014 at 12:26 PM, Cody Koeninger 
  c...@koeninger.org
  wrote:
 
  Thanks for the replies.
 
  Regarding skipping WAL, it's not just about optimization.  If you 
  actually want exactly-once semantics, you need control of kafka 
  offsets
 as
  well, including the ability to not use zookeeper as the system of 
  record for offsets.  Kafka already is a reliable system that has 
  strong
 ordering
  guarantees (within a partition) and does not mandate the use of
 zookeeper
  to store offsets.  I think there should be a spark api that acts as 
  a
 very
  simple intermediary between Kafka and the user's choice of 
  downstream
 store.
 
  Take a look at the links I posted - if there's already been 2
 independent
  implementations of the idea, chances are it's something people need.
 
  On Thu, Dec 18, 2014 at 1:44 PM, Hari Shreedharan  
  hshreedha...@cloudera.com wrote:
 
  Hi Cody,
 
  I am an absolute +1 on SPARK-3146. I think we can implement 
  something pretty simple and lightweight for that one.
 
  For the Kafka DStream skipping the WAL implementation - this is 
  something I discussed with TD a few weeks ago. Though it is a good
 idea to
  implement this to avoid unnecessary HDFS writes, it is an
 optimization. For
  that reason, we must be careful in implementation. There are a 
  couple
 of
  issues that we need to ensure works properly - specifically ordering.
 To
  ensure we pull messages from different topics and partitions in 
  the
 same
  order after failure, we’d still have to persist the metadata to 
  HDFS
 (or
  some other system) - this metadata must contain the order of 
  messages consumed, so we know how to re-read the messages. I am 
  planning to
 explore
  this once I have some time (probably in Jan). In addition, we must 
  also ensure bucketing functions work fine as well. I will file a 
  placeholder jira for this one.
 
  I also wrote an API to write data back to Kafka a while back -
  https://github.com/apache/spark/pull/2994 . I am hoping that this 
  will get pulled in soon, as this is something I know people want. 
  I am open
 to
  feedback on that - anything that I can do to make it better.
 
  Thanks,
  Hari
 
 
  On Thu, Dec 18, 2014 at 11:14 AM, Patrick Wendell 
  pwend...@gmail.com
  wrote:
 
   Hey Cody,
 
  Thanks for reaching out with this. The lead on streaming is TD - 
  he is traveling this week though so I can respond a bit. To the 
  high level point of whether Kafka is important - it definitely 
  is. Something like 80% of Spark