IMHO your doing it wrong then. + building to much support into the kafka eco system is very counterproductive in fostering a happy userbase

On 02.06.2017 13:15, Damian Guy wrote:
Jan, you have a choice to Fail fast if you want. This is about giving
people options and there are times when you don't want to fail fast.


On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <jan.filip...@trivago.com> wrote:

Hi

1.
That greatly complicates monitoring.  Fail Fast gives you that when you
monitor only the lag of all your apps
you are completely covered. With that sort of new application Monitoring
is very much more complicated as
you know need to monitor fail % of some special apps aswell. In my
opinion that is a huge downside already.

2.
using a schema regerstry like Avrostuff it might not even be the record
that is broken, it might be just your app
unable to fetch a schema it needs now know. Maybe you got partitioned
away from that registry.

3. When you get alerted because of to high fail percentage. what are the
steps you gonna do?
shut it down to buy time. fix the problem. spend way to much time to
find a good reprocess offset.
Your timewindows are in bad shape anyways, and you pretty much lost.
This routine is nonsense.

Dead letter queues would be the worst possible addition to the kafka
toolkit that I can think of. It just doesn't fit the architecture
of having clients falling behind is a valid option.

Further. I mentioned already the only bad pill ive seen so far is crc
errors. any plans for those?

Best Jan






On 02.06.2017 11:34, Damian Guy wrote:
I agree with what Matthias has said w.r.t failing fast. There are plenty
of
times when you don't want to fail-fast and must attempt to  make
progress.
The dead-letter queue is exactly for these circumstances. Of course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <matth...@confluent.io>
wrote:
First a meta comment. KIP discussion should take place on the dev list
-- if user list is cc'ed please make sure to reply to both lists.
Thanks.
Thanks for making the scope of the KIP clear. Makes a lot of sense to
focus on deserialization exceptions for now.

With regard to corrupted state stores, would it make sense to fail a
task and wipe out the store to repair it via recreation from the
changelog? That's of course a quite advance pattern, but I want to bring
it up to design the first step in a way such that we can get there (if
we think it's a reasonable idea).

I also want to comment about fail fast vs making progress. I think that
fail-fast must not always be the best option. The scenario I have in
mind is like this: you got a bunch of producers that feed the Streams
input topic. Most producers work find, but maybe one producer miss
behaves and the data it writes is corrupted. You might not even be able
to recover this lost data at any point -- thus, there is no reason to
stop processing but you just skip over those records. Of course, you
need to fix the root cause, and thus you need to alert (either via logs
of the exception handler directly) and you need to start to investigate
to find the bad producer, shut it down and fix it.

Here the dead letter queue comes into place. From my understanding, the
purpose of this feature is solely enable post debugging. I don't think
those record would be fed back at any point in time (so I don't see any
ordering issue -- a skipped record, with this regard, is just "fully
processed"). Thus, the dead letter queue should actually encode the
original records metadata (topic, partition offset etc) to enable such
debugging. I guess, this might also be possible if you just log the bad
records, but it would be harder to access (you first must find the
Streams instance that did write the log and extract the information from
there). Reading it from topic is much simpler.

I also want to mention the following. Assume you have such a topic with
some bad records and some good records. If we always fail-fast, it's
going to be super hard to process the good data. You would need to write
an extra app that copied the data into a new topic filtering out the bad
records (or apply the map() workaround withing stream). So I don't think
that failing fast is most likely the best option in production is
necessarily, true.

Or do you think there are scenarios, for which you can recover the
corrupted records successfully? And even if this is possible, it might
be a case for reprocessing instead of failing the whole application?
Also, if you think you can "repair" a corrupted record, should the
handler allow to return a "fixed" record? This would solve the ordering
problem.



-Matthias




On 5/30/17 1:47 AM, Michael Noll wrote:
Thanks for your work on this KIP, Eno -- much appreciated!

- I think it would help to improve the KIP by adding an end-to-end code
example that demonstrates, with the DSL and with the Processor API, how
the
user would write a simple application that would then be augmented with
the
proposed KIP changes to handle exceptions.  It should also become much
clearer then that e.g. the KIP would lead to different code paths for
the
happy case and any failure scenarios.

- Do we have sufficient information available to make informed
decisions
on
what to do next?  For example, do we know in which part of the topology
the
record failed? `ConsumerRecord` gives us access to topic, partition,
offset, timestamp, etc., but what about topology-related information
(e.g.
what is the associated state store, if any)?

- Only partly on-topic for the scope of this KIP, but this is about the
bigger picture: This KIP would give users the option to send corrupted
records to dead letter queue (quarantine topic).  But, what pattern
would
we advocate to process such a dead letter queue then, e.g. how to allow
for
retries with backoff ("If the first record in the dead letter queue
fails
again, then try the second record for the time being and go back to the
first record at a later time").  Jay and Jan already alluded to
ordering
problems that will be caused by dead letter queues. As I said, retries
might be out of scope but perhaps the implications should be considered
if
possible?

Also, I wrote the text below before reaching the point in the
conversation
that this KIP's scope will be limited to exceptions in the category of
poison pills / deserialization errors.  But since Jay brought up user
code
errors again, I decided to include it again.

----------------------------snip----------------------------
A meta comment: I am not sure about this split between the code for the
happy path (e.g. map/filter/... in the DSL) from the failure path
(using
exception handlers).  In Scala, for example, we can do:

      scala> val computation = scala.util.Try(1 / 0)
      computation: scala.util.Try[Int] =
Failure(java.lang.ArithmeticException: / by zero)

      scala> computation.getOrElse(42)
      res2: Int = 42

Another example with Scala's pattern matching, which is similar to
`KStream#branch()`:

      computation match {
        case scala.util.Success(x) => x * 5
        case scala.util.Failure(_) => 42
      }

(The above isn't the most idiomatic way to handle this in Scala, but
that's
not the point I'm trying to make here.)

Hence the question I'm raising here is: Do we want to have an API where
you
code "the happy path", and then have a different code path for failures
(using exceptions and handlers);  or should we treat both Success and
Failure in the same way?

I think the failure/exception handling approach (as proposed in this
KIP)
is well-suited for errors in the category of deserialization problems
aka
poison pills, partly because the (default) serdes are defined through
configuration (explicit serdes however are defined through API calls).

However, I'm not yet convinced that the failure/exception handling
approach
is the best idea for user code exceptions, e.g. if you fail to guard
against NPE in your lambdas or divide a number by zero.

      scala> val stream = Seq(1, 2, 3, 4, 5)
      stream: Seq[Int] = List(1, 2, 3, 4, 5)

      // Here: Fallback to a sane default when encountering failed
records
      scala>     stream.map(x => Try(1/(3 - x))).flatMap(t =>
Seq(t.getOrElse(42)))
      res19: Seq[Int] = List(0, 1, 42, -1, 0)

      // Here: Skip over failed records
      scala> stream.map(x => Try(1/(3 - x))).collect{ case Success(s)
=> s
}
      res20: Seq[Int] = List(0, 1, -1, 0)

The above is more natural to me than using error handlers to define how
to
deal with failed records (here, the value `3` causes an arithmetic
exception).  Again, it might help the KIP if we added an end-to-end
example
for such user code errors.
----------------------------snip----------------------------




On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <
jan.filip...@trivago.com>
wrote:

Hi Jay,

Eno mentioned that he will narrow down the scope to only
ConsumerRecord
deserialisation.

I am working with Database Changelogs only. I would really not like to
see
a dead letter queue or something
similliar. how am I expected to get these back in order. Just grind to
hold an call me on the weekend. I'll fix it
then in a few minutes rather spend 2 weeks ordering dead letters.
(where
reprocessing might be even the faster fix)

Best Jan




On 29.05.2017 20:23, Jay Kreps wrote:

      - I think we should hold off on retries unless we have worked
out
the
      full usage pattern, people can always implement their own. I
think
the idea
      is that you send the message to some kind of dead letter queue
and
then
      replay these later. This obviously destroys all semantic
guarantees
we are
      working hard to provide right now, which may be okay.



Reply via email to