Hi Eno,

On 07.06.2017 22:49, Eno Thereska wrote:
Comments inline:

On 5 Jun 2017, at 18:19, Jan Filipiak <jan.filip...@trivago.com> wrote:

Hi

just my few thoughts

On 05.06.2017 11:44, Eno Thereska wrote:
Hi there,

Sorry for the late reply, I was out this past week. Looks like good progress 
was made with the discussions either way. Let me recap a couple of points I saw 
into one big reply:

1. Jan mentioned CRC errors. I think this is a good point. As these happen in 
Kafka, before Kafka Streams gets a chance to inspect anything, I'd like to hear 
the opinion of more Kafka folks like Ismael or Jason on this one. Currently the 
documentation is not great with what to do once a CRC check has failed. From 
looking at the code, it looks like the client gets a KafkaException (bubbled up 
from the fetcher) and currently we in streams catch this as part of poll() and 
fail. It might be advantageous to treat CRC handling in a similar way to 
serialisation handling (e.g., have the option to fail/skip). Let's see what the 
other folks say. Worst-case we can do a separate KIP for that if it proved too 
hard to do in one go.
there is no reasonable way to "skip" a crc error. How can you know the length 
you read was anything reasonable? you might be completely lost inside your response.
On the client side, every record received is checked for validity. As it 
happens, if the CRC check fails the exception is wrapped with a KafkaException 
that is thrown all the way to poll(). Assuming we change that and poll() throws 
a CRC exception, I was thinking we could treat it similarly to a deserialize 
exception and pass it to the exception handler to decide what to do. Default 
would be to fail. This might need a Kafka KIP btw and can be done separately 
from this KIP, but Jan, would you find this useful?
I don't think so. IMO you can not reasonably continue parsing when the checksum of a message is not correct. If you are not sure you got the correct length, how can you be sure to find the next record? I would always straight fail in all cases. Its to hard for me to understand why one would try to continue. I mentioned CRC's because thats the only bad pills I ever saw so far. But I am happy that it just stopped and I could check what was going on. This will also be invasive in the client code then.

If you ask me, I am always going to vote for "grind to halt" let the developers see what happened and let them fix it. It helps building good kafka experiences and better software and architectures. For me this is: "force the user todo the right thing". https://youtu.be/aAb7hSCtvGw?t=374 eg. not letting unexpected input slip by. Letting unexpected input slip by is what bought us 15+years of war of all sorts of ingestion attacks. I don't even dare to estimate how many missingrecords-search-teams going be formed, maybe some hackerone for stream apps :D

Best Jan


At a minimum, handling this type of exception will need to involve the 
exactly-once (EoS) logic. We'd still allow the option of failing or skipping, 
but EoS would need to clean up by rolling back all the side effects from the 
processing so far. Matthias, how does this sound?
Eos will not help the record might be 5,6 repartitions down into the topology. 
I haven't followed but I pray you made EoS optional! We don't need this and we 
don't want this and we will turn it off if it comes. So I wouldn't recommend 
relying on it. The option to turn it off is better than forcing it and still 
beeing unable to rollback badpills (as explained before)
Yeah as Matthias mentioned EoS is optional.

Thanks,
Eno


6. Will add an end-to-end example as Michael suggested.

Thanks
Eno



On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io> wrote:

What I don't understand is this:

 From there on its the easiest way forward: fix, redeploy, start => done
If you have many producers that work fine and a new "bad" producer
starts up and writes bad data into your input topic, your Streams app
dies but all your producers, including the bad one, keep writing.

Thus, how would you fix this, as you cannot "remove" the corrupted date
from the topic? It might take some time to identify the root cause and
stop the bad producer. Up to this point you get good and bad data into
your Streams input topic. If Streams app in not able to skip over those
bad records, how would you get all the good data from the topic? Not
saying it's not possible, but it's extra work copying the data with a
new non-Streams consumer-producer-app into a new topic and than feed
your Streams app from this new topic -- you also need to update all your
upstream producers to write to the new topic.

Thus, if you want to fail fast, you can still do this. And after you
detected and fixed the bad producer you might just reconfigure your app
to skip bad records until it reaches the good part of the data.
Afterwards, you could redeploy with fail-fast again.


Thus, for this pattern, I actually don't see any reason why to stop the
Streams app at all. If you have a callback, and use the callback to
raise an alert (and maybe get the bad data into a bad record queue), it
will not take longer to identify and stop the "bad" producer. But for
this case, you have zero downtime for your Streams app.

This seems to be much simpler. Or do I miss anything?


Having said this, I agree that the "threshold based callback" might be
questionable. But as you argue for strict "fail-fast", I want to argue
that this must not always be the best pattern to apply and that the
overall KIP idea is super useful from my point of view.


-Matthias


On 6/3/17 11:57 AM, Jan Filipiak wrote:
Could not agree more!

But then I think the easiest is still: print exception and die.
 From there on its the easiest way forward: fix, redeploy, start => done

All the other ways to recover a pipeline that was processing partially
all the time
and suddenly went over a "I cant take it anymore" threshold is not
straight forward IMO.

How to find the offset, when it became to bad when it is not the latest
commited one?
How to reset there? with some reasonable stuff in your rockses?

If one would do the following. The continuing Handler would measure for
a threshold and
would terminate after a certain threshold has passed (per task). Then
one can use offset commit/ flush intervals
to make reasonable assumption of how much is slipping by + you get an
easy recovery when it gets to bad
+ you could also account for "in processing" records.

Setting this threshold to zero would cover all cases with 1
implementation. It is still beneficial to have it pluggable

Again CRC-Errors are the only bad pills we saw in production for now.

Best Jan


On 02.06.2017 17:37, Jay Kreps wrote:
Jan, I agree with you philosophically. I think one practical challenge
has
to do with data formats. Many people use untyped events, so there is
simply
no guarantee on the form of the input. E.g. many companies use JSON
without
any kind of schema so it becomes very hard to assert anything about the
input which makes these programs very fragile to the "one accidental
message publication that creates an unsolvable problem.

For that reason I do wonder if limiting to just serialization actually
gets
you a useful solution. For JSON it will help with the problem of
non-parseable JSON, but sounds like it won't help in the case where the
JSON is well-formed but does not have any of the fields you expect and
depend on for your processing. I expect the reason for limiting the scope
is it is pretty hard to reason about correctness for anything that
stops in
the middle of processing an operator DAG?

-Jay

On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <jan.filip...@trivago.com>
wrote:

IMHO your doing it wrong then. + building to much support into the kafka
eco system is very counterproductive in fostering a happy userbase



On 02.06.2017 13:15, Damian Guy wrote:

Jan, you have a choice to Fail fast if you want. This is about giving
people options and there are times when you don't want to fail fast.


On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <jan.filip...@trivago.com>
wrote:

Hi
1.
That greatly complicates monitoring.  Fail Fast gives you that when
you
monitor only the lag of all your apps
you are completely covered. With that sort of new application
Monitoring
is very much more complicated as
you know need to monitor fail % of some special apps aswell. In my
opinion that is a huge downside already.

2.
using a schema regerstry like Avrostuff it might not even be the
record
that is broken, it might be just your app
unable to fetch a schema it needs now know. Maybe you got partitioned
away from that registry.

3. When you get alerted because of to high fail percentage. what
are the
steps you gonna do?
shut it down to buy time. fix the problem. spend way to much time to
find a good reprocess offset.
Your timewindows are in bad shape anyways, and you pretty much lost.
This routine is nonsense.

Dead letter queues would be the worst possible addition to the kafka
toolkit that I can think of. It just doesn't fit the architecture
of having clients falling behind is a valid option.

Further. I mentioned already the only bad pill ive seen so far is crc
errors. any plans for those?

Best Jan






On 02.06.2017 11:34, Damian Guy wrote:

I agree with what Matthias has said w.r.t failing fast. There are
plenty

of

times when you don't want to fail-fast and must attempt to  make

progress.

The dead-letter queue is exactly for these circumstances. Of
course if
every record is failing, then you probably do want to give up.

On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <matth...@confluent.io>

wrote:

First a meta comment. KIP discussion should take place on the dev
list
-- if user list is cc'ed please make sure to reply to both lists.

Thanks.
Thanks for making the scope of the KIP clear. Makes a lot of sense to
focus on deserialization exceptions for now.

With regard to corrupted state stores, would it make sense to fail a
task and wipe out the store to repair it via recreation from the
changelog? That's of course a quite advance pattern, but I want to
bring
it up to design the first step in a way such that we can get
there (if
we think it's a reasonable idea).

I also want to comment about fail fast vs making progress. I
think that
fail-fast must not always be the best option. The scenario I have in
mind is like this: you got a bunch of producers that feed the
Streams
input topic. Most producers work find, but maybe one producer miss
behaves and the data it writes is corrupted. You might not even
be able
to recover this lost data at any point -- thus, there is no
reason to
stop processing but you just skip over those records. Of course, you
need to fix the root cause, and thus you need to alert (either
via logs
of the exception handler directly) and you need to start to
investigate
to find the bad producer, shut it down and fix it.

Here the dead letter queue comes into place. From my
understanding, the
purpose of this feature is solely enable post debugging. I don't
think
those record would be fed back at any point in time (so I don't
see any
ordering issue -- a skipped record, with this regard, is just "fully
processed"). Thus, the dead letter queue should actually encode the
original records metadata (topic, partition offset etc) to enable
such
debugging. I guess, this might also be possible if you just log
the bad
records, but it would be harder to access (you first must find the
Streams instance that did write the log and extract the information
from
there). Reading it from topic is much simpler.

I also want to mention the following. Assume you have such a
topic with
some bad records and some good records. If we always fail-fast, it's
going to be super hard to process the good data. You would need to
write
an extra app that copied the data into a new topic filtering out the
bad
records (or apply the map() workaround withing stream). So I don't
think
that failing fast is most likely the best option in production is
necessarily, true.

Or do you think there are scenarios, for which you can recover the
corrupted records successfully? And even if this is possible, it
might
be a case for reprocessing instead of failing the whole application?
Also, if you think you can "repair" a corrupted record, should the
handler allow to return a "fixed" record? This would solve the
ordering
problem.



-Matthias




On 5/30/17 1:47 AM, Michael Noll wrote:

Thanks for your work on this KIP, Eno -- much appreciated!

- I think it would help to improve the KIP by adding an end-to-end
code
example that demonstrates, with the DSL and with the Processor API,
how

the

user would write a simple application that would then be augmented
with

the

proposed KIP changes to handle exceptions.  It should also
become much
clearer then that e.g. the KIP would lead to different code
paths for

the
happy case and any failure scenarios.
- Do we have sufficient information available to make informed

decisions
on
what to do next?  For example, do we know in which part of the
topology

the

record failed? `ConsumerRecord` gives us access to topic,
partition,
offset, timestamp, etc., but what about topology-related
information

(e.g.

what is the associated state store, if any)?

- Only partly on-topic for the scope of this KIP, but this is about
the
bigger picture: This KIP would give users the option to send
corrupted
records to dead letter queue (quarantine topic).  But, what pattern

would
we advocate to process such a dead letter queue then, e.g. how to
allow
for

retries with backoff ("If the first record in the dead letter queue

fails
again, then try the second record for the time being and go back
to the
first record at a later time").  Jay and Jan already alluded to

ordering
problems that will be caused by dead letter queues. As I said,
retries
might be out of scope but perhaps the implications should be
considered

if

possible?

Also, I wrote the text below before reaching the point in the

conversation

that this KIP's scope will be limited to exceptions in the
category of
poison pills / deserialization errors.  But since Jay brought up
user

code

errors again, I decided to include it again.

----------------------------snip----------------------------
A meta comment: I am not sure about this split between the code for
the
happy path (e.g. map/filter/... in the DSL) from the failure path

(using
exception handlers).  In Scala, for example, we can do:
       scala> val computation = scala.util.Try(1 / 0)
       computation: scala.util.Try[Int] =
Failure(java.lang.ArithmeticException: / by zero)

       scala> computation.getOrElse(42)
       res2: Int = 42

Another example with Scala's pattern matching, which is similar to
`KStream#branch()`:

       computation match {
         case scala.util.Success(x) => x * 5
         case scala.util.Failure(_) => 42
       }

(The above isn't the most idiomatic way to handle this in Scala,
but

that's

not the point I'm trying to make here.)

Hence the question I'm raising here is: Do we want to have an API
where

you

code "the happy path", and then have a different code path for
failures
(using exceptions and handlers);  or should we treat both
Success and
Failure in the same way?

I think the failure/exception handling approach (as proposed in
this

KIP)
is well-suited for errors in the category of deserialization problems
aka
poison pills, partly because the (default) serdes are defined through
configuration (explicit serdes however are defined through API
calls).

However, I'm not yet convinced that the failure/exception handling

approach

is the best idea for user code exceptions, e.g. if you fail to
guard
against NPE in your lambdas or divide a number by zero.

       scala> val stream = Seq(1, 2, 3, 4, 5)
       stream: Seq[Int] = List(1, 2, 3, 4, 5)

       // Here: Fallback to a sane default when encountering failed

records
       scala>     stream.map(x => Try(1/(3 - x))).flatMap(t =>
Seq(t.getOrElse(42)))
       res19: Seq[Int] = List(0, 1, 42, -1, 0)

       // Here: Skip over failed records
       scala> stream.map(x => Try(1/(3 - x))).collect{ case
Success(s)

=> s
}
       res20: Seq[Int] = List(0, 1, -1, 0)

The above is more natural to me than using error handlers to define
how

to

deal with failed records (here, the value `3` causes an arithmetic
exception).  Again, it might help the KIP if we added an end-to-end

example

for such user code errors.
----------------------------snip----------------------------




On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <

jan.filip...@trivago.com>
wrote:
Hi Jay,
Eno mentioned that he will narrow down the scope to only

ConsumerRecord
deserialisation.
I am working with Database Changelogs only. I would really not
like
to

see
a dead letter queue or something
similliar. how am I expected to get these back in order. Just
grind
to
hold an call me on the weekend. I'll fix it
then in a few minutes rather spend 2 weeks ordering dead letters.

(where
reprocessing might be even the faster fix)
Best Jan




On 29.05.2017 20:23, Jay Kreps wrote:

       - I think we should hold off on retries unless we have
worked
out
the
       full usage pattern, people can always implement their own. I
think
the idea
       is that you send the message to some kind of dead
letter queue

and
then
       replay these later. This obviously destroys all semantic

guarantees
we are
       working hard to provide right now, which may be okay.



Reply via email to