Should go to dev list, too.

-------- Forwarded Message --------
Subject: Re: [DISCUSS]: KIP-161: streams record processing exception
handlers
Date: Mon, 5 Jun 2017 19:19:42 +0200
From: Jan Filipiak <jan.filip...@trivago.com>
Reply-To: users@kafka.apache.org
To: users@kafka.apache.org

Hi

just my few thoughts

On 05.06.2017 11:44, Eno Thereska wrote:
> Hi there,
>
> Sorry for the late reply, I was out this past week. Looks like good progress 
> was made with the discussions either way. Let me recap a couple of points I 
> saw into one big reply:
>
> 1. Jan mentioned CRC errors. I think this is a good point. As these happen in 
> Kafka, before Kafka Streams gets a chance to inspect anything, I'd like to 
> hear the opinion of more Kafka folks like Ismael or Jason on this one. 
> Currently the documentation is not great with what to do once a CRC check has 
> failed. From looking at the code, it looks like the client gets a 
> KafkaException (bubbled up from the fetcher) and currently we in streams 
> catch this as part of poll() and fail. It might be advantageous to treat CRC 
> handling in a similar way to serialisation handling (e.g., have the option to 
> fail/skip). Let's see what the other folks say. Worst-case we can do a 
> separate KIP for that if it proved too hard to do in one go.
there is no reasonable way to "skip" a crc error. How can you know the
length you read was anything reasonable? you might be completely lost
inside your response.
> 2. Damian has convinced me that the KIP should just be for deserialisation 
> from the network, not from local state store DBs. For the latter we'll follow 
> the current way of failing since the DB is likely corrupt.
>
> 3. Dead letter queue option. There was never any intention here to do 
> anything super clever like attempt to re-inject the failed records from the 
> dead letter queue back into the system. Reasoning about when that'd be useful 
> in light of all sorts of semantic breakings would be hard (arguably 
> impossible). The idea was to just have a place to have all these dead records 
> to help with subsequent debugging. We could also just log a whole bunch of 
> info for a poison pill record and not have a dead letter queue at all. 
> Perhaps that's a better, simpler, starting point.
+1
>
> 4. Agree with Jay on style, a DefaultHandler with some config options. Will 
> add options to KIP. Also as part of this let's remove the threshold logger 
> since it gets complex and arguably the ROI is low.
>
> 5. Jay's JSON example, where serialisation passes but the JSON message 
> doesn't have the expected fields, is an interesting one. It's a bit 
> complicated to handle this in the middle of processing. For example, some 
> operators in the DAG might actually find the needed JSON fields and make 
> progress, but other operators, for the same record, might not find their 
> fields and will throw an exception.
>
> At a minimum, handling this type of exception will need to involve the 
> exactly-once (EoS) logic. We'd still allow the option of failing or skipping, 
> but EoS would need to clean up by rolling back all the side effects from the 
> processing so far. Matthias, how does this sound?
Eos will not help the record might be 5,6 repartitions down into the
topology. I haven't followed but I pray you made EoS optional! We don't
need this and we don't want this and we will turn it off if it comes. So
I wouldn't recommend relying on it. The option to turn it off is better
than forcing it and still beeing unable to rollback badpills (as
explained before)
>
> 6. Will add an end-to-end example as Michael suggested.
>
> Thanks
> Eno
>
>
>
>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io> wrote:
>>
>> What I don't understand is this:
>>
>>>  From there on its the easiest way forward: fix, redeploy, start => done
>> If you have many producers that work fine and a new "bad" producer
>> starts up and writes bad data into your input topic, your Streams app
>> dies but all your producers, including the bad one, keep writing.
>>
>> Thus, how would you fix this, as you cannot "remove" the corrupted date
>> from the topic? It might take some time to identify the root cause and
>> stop the bad producer. Up to this point you get good and bad data into
>> your Streams input topic. If Streams app in not able to skip over those
>> bad records, how would you get all the good data from the topic? Not
>> saying it's not possible, but it's extra work copying the data with a
>> new non-Streams consumer-producer-app into a new topic and than feed
>> your Streams app from this new topic -- you also need to update all your
>> upstream producers to write to the new topic.
>>
>> Thus, if you want to fail fast, you can still do this. And after you
>> detected and fixed the bad producer you might just reconfigure your app
>> to skip bad records until it reaches the good part of the data.
>> Afterwards, you could redeploy with fail-fast again.
>>
>>
>> Thus, for this pattern, I actually don't see any reason why to stop the
>> Streams app at all. If you have a callback, and use the callback to
>> raise an alert (and maybe get the bad data into a bad record queue), it
>> will not take longer to identify and stop the "bad" producer. But for
>> this case, you have zero downtime for your Streams app.
>>
>> This seems to be much simpler. Or do I miss anything?
>>
>>
>> Having said this, I agree that the "threshold based callback" might be
>> questionable. But as you argue for strict "fail-fast", I want to argue
>> that this must not always be the best pattern to apply and that the
>> overall KIP idea is super useful from my point of view.
>>
>>
>> -Matthias
>>
>>
>> On 6/3/17 11:57 AM, Jan Filipiak wrote:
>>> Could not agree more!
>>>
>>> But then I think the easiest is still: print exception and die.
>>>  From there on its the easiest way forward: fix, redeploy, start => done
>>>
>>> All the other ways to recover a pipeline that was processing partially
>>> all the time
>>> and suddenly went over a "I cant take it anymore" threshold is not
>>> straight forward IMO.
>>>
>>> How to find the offset, when it became to bad when it is not the latest
>>> commited one?
>>> How to reset there? with some reasonable stuff in your rockses?
>>>
>>> If one would do the following. The continuing Handler would measure for
>>> a threshold and
>>> would terminate after a certain threshold has passed (per task). Then
>>> one can use offset commit/ flush intervals
>>> to make reasonable assumption of how much is slipping by + you get an
>>> easy recovery when it gets to bad
>>> + you could also account for "in processing" records.
>>>
>>> Setting this threshold to zero would cover all cases with 1
>>> implementation. It is still beneficial to have it pluggable
>>>
>>> Again CRC-Errors are the only bad pills we saw in production for now.
>>>
>>> Best Jan
>>>
>>>
>>> On 02.06.2017 17:37, Jay Kreps wrote:
>>>> Jan, I agree with you philosophically. I think one practical challenge
>>>> has
>>>> to do with data formats. Many people use untyped events, so there is
>>>> simply
>>>> no guarantee on the form of the input. E.g. many companies use JSON
>>>> without
>>>> any kind of schema so it becomes very hard to assert anything about the
>>>> input which makes these programs very fragile to the "one accidental
>>>> message publication that creates an unsolvable problem.
>>>>
>>>> For that reason I do wonder if limiting to just serialization actually
>>>> gets
>>>> you a useful solution. For JSON it will help with the problem of
>>>> non-parseable JSON, but sounds like it won't help in the case where the
>>>> JSON is well-formed but does not have any of the fields you expect and
>>>> depend on for your processing. I expect the reason for limiting the scope
>>>> is it is pretty hard to reason about correctness for anything that
>>>> stops in
>>>> the middle of processing an operator DAG?
>>>>
>>>> -Jay
>>>>
>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <jan.filip...@trivago.com>
>>>> wrote:
>>>>
>>>>> IMHO your doing it wrong then. + building to much support into the kafka
>>>>> eco system is very counterproductive in fostering a happy userbase
>>>>>
>>>>>
>>>>>
>>>>> On 02.06.2017 13:15, Damian Guy wrote:
>>>>>
>>>>>> Jan, you have a choice to Fail fast if you want. This is about giving
>>>>>> people options and there are times when you don't want to fail fast.
>>>>>>
>>>>>>
>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <jan.filip...@trivago.com>
>>>>>> wrote:
>>>>>>
>>>>>> Hi
>>>>>>> 1.
>>>>>>> That greatly complicates monitoring.  Fail Fast gives you that when
>>>>>>> you
>>>>>>> monitor only the lag of all your apps
>>>>>>> you are completely covered. With that sort of new application
>>>>>>> Monitoring
>>>>>>> is very much more complicated as
>>>>>>> you know need to monitor fail % of some special apps aswell. In my
>>>>>>> opinion that is a huge downside already.
>>>>>>>
>>>>>>> 2.
>>>>>>> using a schema regerstry like Avrostuff it might not even be the
>>>>>>> record
>>>>>>> that is broken, it might be just your app
>>>>>>> unable to fetch a schema it needs now know. Maybe you got partitioned
>>>>>>> away from that registry.
>>>>>>>
>>>>>>> 3. When you get alerted because of to high fail percentage. what
>>>>>>> are the
>>>>>>> steps you gonna do?
>>>>>>> shut it down to buy time. fix the problem. spend way to much time to
>>>>>>> find a good reprocess offset.
>>>>>>> Your timewindows are in bad shape anyways, and you pretty much lost.
>>>>>>> This routine is nonsense.
>>>>>>>
>>>>>>> Dead letter queues would be the worst possible addition to the kafka
>>>>>>> toolkit that I can think of. It just doesn't fit the architecture
>>>>>>> of having clients falling behind is a valid option.
>>>>>>>
>>>>>>> Further. I mentioned already the only bad pill ive seen so far is crc
>>>>>>> errors. any plans for those?
>>>>>>>
>>>>>>> Best Jan
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On 02.06.2017 11:34, Damian Guy wrote:
>>>>>>>
>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There are
>>>>>>>> plenty
>>>>>>>>
>>>>>>> of
>>>>>>>
>>>>>>>> times when you don't want to fail-fast and must attempt to  make
>>>>>>>>
>>>>>>> progress.
>>>>>>>
>>>>>>>> The dead-letter queue is exactly for these circumstances. Of
>>>>>>>> course if
>>>>>>>> every record is failing, then you probably do want to give up.
>>>>>>>>
>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <matth...@confluent.io>
>>>>>>>>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> First a meta comment. KIP discussion should take place on the dev
>>>>>>>> list
>>>>>>>>> -- if user list is cc'ed please make sure to reply to both lists.
>>>>>>>>>
>>>>>>>> Thanks.
>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of sense to
>>>>>>>>> focus on deserialization exceptions for now.
>>>>>>>>>
>>>>>>>>> With regard to corrupted state stores, would it make sense to fail a
>>>>>>>>> task and wipe out the store to repair it via recreation from the
>>>>>>>>> changelog? That's of course a quite advance pattern, but I want to
>>>>>>>>> bring
>>>>>>>>> it up to design the first step in a way such that we can get
>>>>>>>>> there (if
>>>>>>>>> we think it's a reasonable idea).
>>>>>>>>>
>>>>>>>>> I also want to comment about fail fast vs making progress. I
>>>>>>>>> think that
>>>>>>>>> fail-fast must not always be the best option. The scenario I have in
>>>>>>>>> mind is like this: you got a bunch of producers that feed the
>>>>>>>>> Streams
>>>>>>>>> input topic. Most producers work find, but maybe one producer miss
>>>>>>>>> behaves and the data it writes is corrupted. You might not even
>>>>>>>>> be able
>>>>>>>>> to recover this lost data at any point -- thus, there is no
>>>>>>>>> reason to
>>>>>>>>> stop processing but you just skip over those records. Of course, you
>>>>>>>>> need to fix the root cause, and thus you need to alert (either
>>>>>>>>> via logs
>>>>>>>>> of the exception handler directly) and you need to start to
>>>>>>>>> investigate
>>>>>>>>> to find the bad producer, shut it down and fix it.
>>>>>>>>>
>>>>>>>>> Here the dead letter queue comes into place. From my
>>>>>>>>> understanding, the
>>>>>>>>> purpose of this feature is solely enable post debugging. I don't
>>>>>>>>> think
>>>>>>>>> those record would be fed back at any point in time (so I don't
>>>>>>>>> see any
>>>>>>>>> ordering issue -- a skipped record, with this regard, is just "fully
>>>>>>>>> processed"). Thus, the dead letter queue should actually encode the
>>>>>>>>> original records metadata (topic, partition offset etc) to enable
>>>>>>>>> such
>>>>>>>>> debugging. I guess, this might also be possible if you just log
>>>>>>>>> the bad
>>>>>>>>> records, but it would be harder to access (you first must find the
>>>>>>>>> Streams instance that did write the log and extract the information
>>>>>>>>> from
>>>>>>>>> there). Reading it from topic is much simpler.
>>>>>>>>>
>>>>>>>>> I also want to mention the following. Assume you have such a
>>>>>>>>> topic with
>>>>>>>>> some bad records and some good records. If we always fail-fast, it's
>>>>>>>>> going to be super hard to process the good data. You would need to
>>>>>>>>> write
>>>>>>>>> an extra app that copied the data into a new topic filtering out the
>>>>>>>>> bad
>>>>>>>>> records (or apply the map() workaround withing stream). So I don't
>>>>>>>>> think
>>>>>>>>> that failing fast is most likely the best option in production is
>>>>>>>>> necessarily, true.
>>>>>>>>>
>>>>>>>>> Or do you think there are scenarios, for which you can recover the
>>>>>>>>> corrupted records successfully? And even if this is possible, it
>>>>>>>>> might
>>>>>>>>> be a case for reprocessing instead of failing the whole application?
>>>>>>>>> Also, if you think you can "repair" a corrupted record, should the
>>>>>>>>> handler allow to return a "fixed" record? This would solve the
>>>>>>>>> ordering
>>>>>>>>> problem.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> -Matthias
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote:
>>>>>>>>>
>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated!
>>>>>>>>>>
>>>>>>>>>> - I think it would help to improve the KIP by adding an end-to-end
>>>>>>>>>> code
>>>>>>>>>> example that demonstrates, with the DSL and with the Processor API,
>>>>>>>>>> how
>>>>>>>>>>
>>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> user would write a simple application that would then be augmented
>>>>>>>>>> with
>>>>>>>>>>
>>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> proposed KIP changes to handle exceptions.  It should also
>>>>>>>>>> become much
>>>>>>>>>> clearer then that e.g. the KIP would lead to different code
>>>>>>>>>> paths for
>>>>>>>>>>
>>>>>>>>> the
>>>>>>>> happy case and any failure scenarios.
>>>>>>>>>> - Do we have sufficient information available to make informed
>>>>>>>>>>
>>>>>>>>> decisions
>>>>>>>> on
>>>>>>>>>> what to do next?  For example, do we know in which part of the
>>>>>>>>>> topology
>>>>>>>>>>
>>>>>>>>> the
>>>>>>>>>
>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic,
>>>>>>>>>> partition,
>>>>>>>>>> offset, timestamp, etc., but what about topology-related
>>>>>>>>>> information
>>>>>>>>>>
>>>>>>>>> (e.g.
>>>>>>>>>
>>>>>>>>>> what is the associated state store, if any)?
>>>>>>>>>>
>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this is about
>>>>>>>>>> the
>>>>>>>>>> bigger picture: This KIP would give users the option to send
>>>>>>>>>> corrupted
>>>>>>>>>> records to dead letter queue (quarantine topic).  But, what pattern
>>>>>>>>>>
>>>>>>>>> would
>>>>>>>> we advocate to process such a dead letter queue then, e.g. how to
>>>>>>>> allow
>>>>>>>>> for
>>>>>>>>>
>>>>>>>>>> retries with backoff ("If the first record in the dead letter queue
>>>>>>>>>>
>>>>>>>>> fails
>>>>>>>> again, then try the second record for the time being and go back
>>>>>>>> to the
>>>>>>>>>> first record at a later time").  Jay and Jan already alluded to
>>>>>>>>>>
>>>>>>>>> ordering
>>>>>>>> problems that will be caused by dead letter queues. As I said,
>>>>>>>> retries
>>>>>>>>>> might be out of scope but perhaps the implications should be
>>>>>>>>>> considered
>>>>>>>>>>
>>>>>>>>> if
>>>>>>>>>
>>>>>>>>>> possible?
>>>>>>>>>>
>>>>>>>>>> Also, I wrote the text below before reaching the point in the
>>>>>>>>>>
>>>>>>>>> conversation
>>>>>>>>>
>>>>>>>>>> that this KIP's scope will be limited to exceptions in the
>>>>>>>>>> category of
>>>>>>>>>> poison pills / deserialization errors.  But since Jay brought up
>>>>>>>>>> user
>>>>>>>>>>
>>>>>>>>> code
>>>>>>>>>
>>>>>>>>>> errors again, I decided to include it again.
>>>>>>>>>>
>>>>>>>>>> ----------------------------snip----------------------------
>>>>>>>>>> A meta comment: I am not sure about this split between the code for
>>>>>>>>>> the
>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure path
>>>>>>>>>>
>>>>>>>>> (using
>>>>>>>> exception handlers).  In Scala, for example, we can do:
>>>>>>>>>>        scala> val computation = scala.util.Try(1 / 0)
>>>>>>>>>>        computation: scala.util.Try[Int] =
>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero)
>>>>>>>>>>
>>>>>>>>>>        scala> computation.getOrElse(42)
>>>>>>>>>>        res2: Int = 42
>>>>>>>>>>
>>>>>>>>>> Another example with Scala's pattern matching, which is similar to
>>>>>>>>>> `KStream#branch()`:
>>>>>>>>>>
>>>>>>>>>>        computation match {
>>>>>>>>>>          case scala.util.Success(x) => x * 5
>>>>>>>>>>          case scala.util.Failure(_) => 42
>>>>>>>>>>        }
>>>>>>>>>>
>>>>>>>>>> (The above isn't the most idiomatic way to handle this in Scala,
>>>>>>>>>> but
>>>>>>>>>>
>>>>>>>>> that's
>>>>>>>>>
>>>>>>>>>> not the point I'm trying to make here.)
>>>>>>>>>>
>>>>>>>>>> Hence the question I'm raising here is: Do we want to have an API
>>>>>>>>>> where
>>>>>>>>>>
>>>>>>>>> you
>>>>>>>>>
>>>>>>>>>> code "the happy path", and then have a different code path for
>>>>>>>>>> failures
>>>>>>>>>> (using exceptions and handlers);  or should we treat both
>>>>>>>>>> Success and
>>>>>>>>>> Failure in the same way?
>>>>>>>>>>
>>>>>>>>>> I think the failure/exception handling approach (as proposed in
>>>>>>>>>> this
>>>>>>>>>>
>>>>>>>>> KIP)
>>>>>>>> is well-suited for errors in the category of deserialization problems
>>>>>>>>> aka
>>>>>>>> poison pills, partly because the (default) serdes are defined through
>>>>>>>>>> configuration (explicit serdes however are defined through API
>>>>>>>>>> calls).
>>>>>>>>>>
>>>>>>>>>> However, I'm not yet convinced that the failure/exception handling
>>>>>>>>>>
>>>>>>>>> approach
>>>>>>>>>
>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail to
>>>>>>>>>> guard
>>>>>>>>>> against NPE in your lambdas or divide a number by zero.
>>>>>>>>>>
>>>>>>>>>>        scala> val stream = Seq(1, 2, 3, 4, 5)
>>>>>>>>>>        stream: Seq[Int] = List(1, 2, 3, 4, 5)
>>>>>>>>>>
>>>>>>>>>>        // Here: Fallback to a sane default when encountering failed
>>>>>>>>>>
>>>>>>>>> records
>>>>>>>>        scala>     stream.map(x => Try(1/(3 - x))).flatMap(t =>
>>>>>>>>>> Seq(t.getOrElse(42)))
>>>>>>>>>>        res19: Seq[Int] = List(0, 1, 42, -1, 0)
>>>>>>>>>>
>>>>>>>>>>        // Here: Skip over failed records
>>>>>>>>>>        scala> stream.map(x => Try(1/(3 - x))).collect{ case
>>>>>>>>>> Success(s)
>>>>>>>>>>
>>>>>>>>> => s
>>>>>>>> }
>>>>>>>>>>        res20: Seq[Int] = List(0, 1, -1, 0)
>>>>>>>>>>
>>>>>>>>>> The above is more natural to me than using error handlers to define
>>>>>>>>>> how
>>>>>>>>>>
>>>>>>>>> to
>>>>>>>>>
>>>>>>>>>> deal with failed records (here, the value `3` causes an arithmetic
>>>>>>>>>> exception).  Again, it might help the KIP if we added an end-to-end
>>>>>>>>>>
>>>>>>>>> example
>>>>>>>>>
>>>>>>>>>> for such user code errors.
>>>>>>>>>> ----------------------------snip----------------------------
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <
>>>>>>>>>>
>>>>>>>>> jan.filip...@trivago.com>
>>>>>>>> wrote:
>>>>>>>>>> Hi Jay,
>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only
>>>>>>>>>>>
>>>>>>>>>> ConsumerRecord
>>>>>>>> deserialisation.
>>>>>>>>>>> I am working with Database Changelogs only. I would really not
>>>>>>>>>>> like
>>>>>>>>>>> to
>>>>>>>>>>>
>>>>>>>>>> see
>>>>>>>>>> a dead letter queue or something
>>>>>>>>>>> similliar. how am I expected to get these back in order. Just
>>>>>>>>>>> grind
>>>>>>>>>>> to
>>>>>>>>>>> hold an call me on the weekend. I'll fix it
>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead letters.
>>>>>>>>>>>
>>>>>>>>>> (where
>>>>>>>> reprocessing might be even the faster fix)
>>>>>>>>>>> Best Jan
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote:
>>>>>>>>>>>
>>>>>>>>>>>        - I think we should hold off on retries unless we have
>>>>>>>>>>> worked
>>>>>>>>>>> out
>>>>>>>> the
>>>>>>>>>>        full usage pattern, people can always implement their own. I
>>>>>>>>>>> think
>>>>>>>> the idea
>>>>>>>>>>>>        is that you send the message to some kind of dead
>>>>>>>>>>>> letter queue
>>>>>>>>>>>>
>>>>>>>>>>> and
>>>>>>>> then
>>>>>>>>>>>>        replay these later. This obviously destroys all semantic
>>>>>>>>>>>>
>>>>>>>>>>> guarantees
>>>>>>>> we are
>>>>>>>>>>>>        working hard to provide right now, which may be okay.
>>>>>>>>>>>>
>>>>>>>>>>>>


Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to