Comments inline:

> On 5 Jun 2017, at 18:19, Jan Filipiak <jan.filip...@trivago.com> wrote:
> 
> Hi
> 
> just my few thoughts
> 
> On 05.06.2017 11:44, Eno Thereska wrote:
>> Hi there,
>> 
>> Sorry for the late reply, I was out this past week. Looks like good progress 
>> was made with the discussions either way. Let me recap a couple of points I 
>> saw into one big reply:
>> 
>> 1. Jan mentioned CRC errors. I think this is a good point. As these happen 
>> in Kafka, before Kafka Streams gets a chance to inspect anything, I'd like 
>> to hear the opinion of more Kafka folks like Ismael or Jason on this one. 
>> Currently the documentation is not great with what to do once a CRC check 
>> has failed. From looking at the code, it looks like the client gets a 
>> KafkaException (bubbled up from the fetcher) and currently we in streams 
>> catch this as part of poll() and fail. It might be advantageous to treat CRC 
>> handling in a similar way to serialisation handling (e.g., have the option 
>> to fail/skip). Let's see what the other folks say. Worst-case we can do a 
>> separate KIP for that if it proved too hard to do in one go.
> there is no reasonable way to "skip" a crc error. How can you know the length 
> you read was anything reasonable? you might be completely lost inside your 
> response.

On the client side, every record received is checked for validity. As it 
happens, if the CRC check fails the exception is wrapped with a KafkaException 
that is thrown all the way to poll(). Assuming we change that and poll() throws 
a CRC exception, I was thinking we could treat it similarly to a deserialize 
exception and pass it to the exception handler to decide what to do. Default 
would be to fail. This might need a Kafka KIP btw and can be done separately 
from this KIP, but Jan, would you find this useful?

>> 
>> 
>> At a minimum, handling this type of exception will need to involve the 
>> exactly-once (EoS) logic. We'd still allow the option of failing or 
>> skipping, but EoS would need to clean up by rolling back all the side 
>> effects from the processing so far. Matthias, how does this sound?
> Eos will not help the record might be 5,6 repartitions down into the 
> topology. I haven't followed but I pray you made EoS optional! We don't need 
> this and we don't want this and we will turn it off if it comes. So I 
> wouldn't recommend relying on it. The option to turn it off is better than 
> forcing it and still beeing unable to rollback badpills (as explained before)
>> 

Yeah as Matthias mentioned EoS is optional.

Thanks,
Eno


>> 6. Will add an end-to-end example as Michael suggested.
>> 
>> Thanks
>> Eno
>> 
>> 
>> 
>>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io> wrote:
>>> 
>>> What I don't understand is this:
>>> 
>>>> From there on its the easiest way forward: fix, redeploy, start => done
>>> If you have many producers that work fine and a new "bad" producer
>>> starts up and writes bad data into your input topic, your Streams app
>>> dies but all your producers, including the bad one, keep writing.
>>> 
>>> Thus, how would you fix this, as you cannot "remove" the corrupted date
>>> from the topic? It might take some time to identify the root cause and
>>> stop the bad producer. Up to this point you get good and bad data into
>>> your Streams input topic. If Streams app in not able to skip over those
>>> bad records, how would you get all the good data from the topic? Not
>>> saying it's not possible, but it's extra work copying the data with a
>>> new non-Streams consumer-producer-app into a new topic and than feed
>>> your Streams app from this new topic -- you also need to update all your
>>> upstream producers to write to the new topic.
>>> 
>>> Thus, if you want to fail fast, you can still do this. And after you
>>> detected and fixed the bad producer you might just reconfigure your app
>>> to skip bad records until it reaches the good part of the data.
>>> Afterwards, you could redeploy with fail-fast again.
>>> 
>>> 
>>> Thus, for this pattern, I actually don't see any reason why to stop the
>>> Streams app at all. If you have a callback, and use the callback to
>>> raise an alert (and maybe get the bad data into a bad record queue), it
>>> will not take longer to identify and stop the "bad" producer. But for
>>> this case, you have zero downtime for your Streams app.
>>> 
>>> This seems to be much simpler. Or do I miss anything?
>>> 
>>> 
>>> Having said this, I agree that the "threshold based callback" might be
>>> questionable. But as you argue for strict "fail-fast", I want to argue
>>> that this must not always be the best pattern to apply and that the
>>> overall KIP idea is super useful from my point of view.
>>> 
>>> 
>>> -Matthias
>>> 
>>> 
>>> On 6/3/17 11:57 AM, Jan Filipiak wrote:
>>>> Could not agree more!
>>>> 
>>>> But then I think the easiest is still: print exception and die.
>>>> From there on its the easiest way forward: fix, redeploy, start => done
>>>> 
>>>> All the other ways to recover a pipeline that was processing partially
>>>> all the time
>>>> and suddenly went over a "I cant take it anymore" threshold is not
>>>> straight forward IMO.
>>>> 
>>>> How to find the offset, when it became to bad when it is not the latest
>>>> commited one?
>>>> How to reset there? with some reasonable stuff in your rockses?
>>>> 
>>>> If one would do the following. The continuing Handler would measure for
>>>> a threshold and
>>>> would terminate after a certain threshold has passed (per task). Then
>>>> one can use offset commit/ flush intervals
>>>> to make reasonable assumption of how much is slipping by + you get an
>>>> easy recovery when it gets to bad
>>>> + you could also account for "in processing" records.
>>>> 
>>>> Setting this threshold to zero would cover all cases with 1
>>>> implementation. It is still beneficial to have it pluggable
>>>> 
>>>> Again CRC-Errors are the only bad pills we saw in production for now.
>>>> 
>>>> Best Jan
>>>> 
>>>> 
>>>> On 02.06.2017 17:37, Jay Kreps wrote:
>>>>> Jan, I agree with you philosophically. I think one practical challenge
>>>>> has
>>>>> to do with data formats. Many people use untyped events, so there is
>>>>> simply
>>>>> no guarantee on the form of the input. E.g. many companies use JSON
>>>>> without
>>>>> any kind of schema so it becomes very hard to assert anything about the
>>>>> input which makes these programs very fragile to the "one accidental
>>>>> message publication that creates an unsolvable problem.
>>>>> 
>>>>> For that reason I do wonder if limiting to just serialization actually
>>>>> gets
>>>>> you a useful solution. For JSON it will help with the problem of
>>>>> non-parseable JSON, but sounds like it won't help in the case where the
>>>>> JSON is well-formed but does not have any of the fields you expect and
>>>>> depend on for your processing. I expect the reason for limiting the scope
>>>>> is it is pretty hard to reason about correctness for anything that
>>>>> stops in
>>>>> the middle of processing an operator DAG?
>>>>> 
>>>>> -Jay
>>>>> 
>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <jan.filip...@trivago.com>
>>>>> wrote:
>>>>> 
>>>>>> IMHO your doing it wrong then. + building to much support into the kafka
>>>>>> eco system is very counterproductive in fostering a happy userbase
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On 02.06.2017 13:15, Damian Guy wrote:
>>>>>> 
>>>>>>> Jan, you have a choice to Fail fast if you want. This is about giving
>>>>>>> people options and there are times when you don't want to fail fast.
>>>>>>> 
>>>>>>> 
>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <jan.filip...@trivago.com>
>>>>>>> wrote:
>>>>>>> 
>>>>>>> Hi
>>>>>>>> 1.
>>>>>>>> That greatly complicates monitoring.  Fail Fast gives you that when
>>>>>>>> you
>>>>>>>> monitor only the lag of all your apps
>>>>>>>> you are completely covered. With that sort of new application
>>>>>>>> Monitoring
>>>>>>>> is very much more complicated as
>>>>>>>> you know need to monitor fail % of some special apps aswell. In my
>>>>>>>> opinion that is a huge downside already.
>>>>>>>> 
>>>>>>>> 2.
>>>>>>>> using a schema regerstry like Avrostuff it might not even be the
>>>>>>>> record
>>>>>>>> that is broken, it might be just your app
>>>>>>>> unable to fetch a schema it needs now know. Maybe you got partitioned
>>>>>>>> away from that registry.
>>>>>>>> 
>>>>>>>> 3. When you get alerted because of to high fail percentage. what
>>>>>>>> are the
>>>>>>>> steps you gonna do?
>>>>>>>> shut it down to buy time. fix the problem. spend way to much time to
>>>>>>>> find a good reprocess offset.
>>>>>>>> Your timewindows are in bad shape anyways, and you pretty much lost.
>>>>>>>> This routine is nonsense.
>>>>>>>> 
>>>>>>>> Dead letter queues would be the worst possible addition to the kafka
>>>>>>>> toolkit that I can think of. It just doesn't fit the architecture
>>>>>>>> of having clients falling behind is a valid option.
>>>>>>>> 
>>>>>>>> Further. I mentioned already the only bad pill ive seen so far is crc
>>>>>>>> errors. any plans for those?
>>>>>>>> 
>>>>>>>> Best Jan
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote:
>>>>>>>> 
>>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There are
>>>>>>>>> plenty
>>>>>>>>> 
>>>>>>>> of
>>>>>>>> 
>>>>>>>>> times when you don't want to fail-fast and must attempt to  make
>>>>>>>>> 
>>>>>>>> progress.
>>>>>>>> 
>>>>>>>>> The dead-letter queue is exactly for these circumstances. Of
>>>>>>>>> course if
>>>>>>>>> every record is failing, then you probably do want to give up.
>>>>>>>>> 
>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <matth...@confluent.io>
>>>>>>>>> 
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>>> First a meta comment. KIP discussion should take place on the dev
>>>>>>>>> list
>>>>>>>>>> -- if user list is cc'ed please make sure to reply to both lists.
>>>>>>>>>> 
>>>>>>>>> Thanks.
>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of sense to
>>>>>>>>>> focus on deserialization exceptions for now.
>>>>>>>>>> 
>>>>>>>>>> With regard to corrupted state stores, would it make sense to fail a
>>>>>>>>>> task and wipe out the store to repair it via recreation from the
>>>>>>>>>> changelog? That's of course a quite advance pattern, but I want to
>>>>>>>>>> bring
>>>>>>>>>> it up to design the first step in a way such that we can get
>>>>>>>>>> there (if
>>>>>>>>>> we think it's a reasonable idea).
>>>>>>>>>> 
>>>>>>>>>> I also want to comment about fail fast vs making progress. I
>>>>>>>>>> think that
>>>>>>>>>> fail-fast must not always be the best option. The scenario I have in
>>>>>>>>>> mind is like this: you got a bunch of producers that feed the
>>>>>>>>>> Streams
>>>>>>>>>> input topic. Most producers work find, but maybe one producer miss
>>>>>>>>>> behaves and the data it writes is corrupted. You might not even
>>>>>>>>>> be able
>>>>>>>>>> to recover this lost data at any point -- thus, there is no
>>>>>>>>>> reason to
>>>>>>>>>> stop processing but you just skip over those records. Of course, you
>>>>>>>>>> need to fix the root cause, and thus you need to alert (either
>>>>>>>>>> via logs
>>>>>>>>>> of the exception handler directly) and you need to start to
>>>>>>>>>> investigate
>>>>>>>>>> to find the bad producer, shut it down and fix it.
>>>>>>>>>> 
>>>>>>>>>> Here the dead letter queue comes into place. From my
>>>>>>>>>> understanding, the
>>>>>>>>>> purpose of this feature is solely enable post debugging. I don't
>>>>>>>>>> think
>>>>>>>>>> those record would be fed back at any point in time (so I don't
>>>>>>>>>> see any
>>>>>>>>>> ordering issue -- a skipped record, with this regard, is just "fully
>>>>>>>>>> processed"). Thus, the dead letter queue should actually encode the
>>>>>>>>>> original records metadata (topic, partition offset etc) to enable
>>>>>>>>>> such
>>>>>>>>>> debugging. I guess, this might also be possible if you just log
>>>>>>>>>> the bad
>>>>>>>>>> records, but it would be harder to access (you first must find the
>>>>>>>>>> Streams instance that did write the log and extract the information
>>>>>>>>>> from
>>>>>>>>>> there). Reading it from topic is much simpler.
>>>>>>>>>> 
>>>>>>>>>> I also want to mention the following. Assume you have such a
>>>>>>>>>> topic with
>>>>>>>>>> some bad records and some good records. If we always fail-fast, it's
>>>>>>>>>> going to be super hard to process the good data. You would need to
>>>>>>>>>> write
>>>>>>>>>> an extra app that copied the data into a new topic filtering out the
>>>>>>>>>> bad
>>>>>>>>>> records (or apply the map() workaround withing stream). So I don't
>>>>>>>>>> think
>>>>>>>>>> that failing fast is most likely the best option in production is
>>>>>>>>>> necessarily, true.
>>>>>>>>>> 
>>>>>>>>>> Or do you think there are scenarios, for which you can recover the
>>>>>>>>>> corrupted records successfully? And even if this is possible, it
>>>>>>>>>> might
>>>>>>>>>> be a case for reprocessing instead of failing the whole application?
>>>>>>>>>> Also, if you think you can "repair" a corrupted record, should the
>>>>>>>>>> handler allow to return a "fixed" record? This would solve the
>>>>>>>>>> ordering
>>>>>>>>>> problem.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> -Matthias
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote:
>>>>>>>>>> 
>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated!
>>>>>>>>>>> 
>>>>>>>>>>> - I think it would help to improve the KIP by adding an end-to-end
>>>>>>>>>>> code
>>>>>>>>>>> example that demonstrates, with the DSL and with the Processor API,
>>>>>>>>>>> how
>>>>>>>>>>> 
>>>>>>>>>> the
>>>>>>>>>> 
>>>>>>>>>>> user would write a simple application that would then be augmented
>>>>>>>>>>> with
>>>>>>>>>>> 
>>>>>>>>>> the
>>>>>>>>>> 
>>>>>>>>>>> proposed KIP changes to handle exceptions.  It should also
>>>>>>>>>>> become much
>>>>>>>>>>> clearer then that e.g. the KIP would lead to different code
>>>>>>>>>>> paths for
>>>>>>>>>>> 
>>>>>>>>>> the
>>>>>>>>> happy case and any failure scenarios.
>>>>>>>>>>> - Do we have sufficient information available to make informed
>>>>>>>>>>> 
>>>>>>>>>> decisions
>>>>>>>>> on
>>>>>>>>>>> what to do next?  For example, do we know in which part of the
>>>>>>>>>>> topology
>>>>>>>>>>> 
>>>>>>>>>> the
>>>>>>>>>> 
>>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic,
>>>>>>>>>>> partition,
>>>>>>>>>>> offset, timestamp, etc., but what about topology-related
>>>>>>>>>>> information
>>>>>>>>>>> 
>>>>>>>>>> (e.g.
>>>>>>>>>> 
>>>>>>>>>>> what is the associated state store, if any)?
>>>>>>>>>>> 
>>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this is about
>>>>>>>>>>> the
>>>>>>>>>>> bigger picture: This KIP would give users the option to send
>>>>>>>>>>> corrupted
>>>>>>>>>>> records to dead letter queue (quarantine topic).  But, what pattern
>>>>>>>>>>> 
>>>>>>>>>> would
>>>>>>>>> we advocate to process such a dead letter queue then, e.g. how to
>>>>>>>>> allow
>>>>>>>>>> for
>>>>>>>>>> 
>>>>>>>>>>> retries with backoff ("If the first record in the dead letter queue
>>>>>>>>>>> 
>>>>>>>>>> fails
>>>>>>>>> again, then try the second record for the time being and go back
>>>>>>>>> to the
>>>>>>>>>>> first record at a later time").  Jay and Jan already alluded to
>>>>>>>>>>> 
>>>>>>>>>> ordering
>>>>>>>>> problems that will be caused by dead letter queues. As I said,
>>>>>>>>> retries
>>>>>>>>>>> might be out of scope but perhaps the implications should be
>>>>>>>>>>> considered
>>>>>>>>>>> 
>>>>>>>>>> if
>>>>>>>>>> 
>>>>>>>>>>> possible?
>>>>>>>>>>> 
>>>>>>>>>>> Also, I wrote the text below before reaching the point in the
>>>>>>>>>>> 
>>>>>>>>>> conversation
>>>>>>>>>> 
>>>>>>>>>>> that this KIP's scope will be limited to exceptions in the
>>>>>>>>>>> category of
>>>>>>>>>>> poison pills / deserialization errors.  But since Jay brought up
>>>>>>>>>>> user
>>>>>>>>>>> 
>>>>>>>>>> code
>>>>>>>>>> 
>>>>>>>>>>> errors again, I decided to include it again.
>>>>>>>>>>> 
>>>>>>>>>>> ----------------------------snip----------------------------
>>>>>>>>>>> A meta comment: I am not sure about this split between the code for
>>>>>>>>>>> the
>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure path
>>>>>>>>>>> 
>>>>>>>>>> (using
>>>>>>>>> exception handlers).  In Scala, for example, we can do:
>>>>>>>>>>>       scala> val computation = scala.util.Try(1 / 0)
>>>>>>>>>>>       computation: scala.util.Try[Int] =
>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero)
>>>>>>>>>>> 
>>>>>>>>>>>       scala> computation.getOrElse(42)
>>>>>>>>>>>       res2: Int = 42
>>>>>>>>>>> 
>>>>>>>>>>> Another example with Scala's pattern matching, which is similar to
>>>>>>>>>>> `KStream#branch()`:
>>>>>>>>>>> 
>>>>>>>>>>>       computation match {
>>>>>>>>>>>         case scala.util.Success(x) => x * 5
>>>>>>>>>>>         case scala.util.Failure(_) => 42
>>>>>>>>>>>       }
>>>>>>>>>>> 
>>>>>>>>>>> (The above isn't the most idiomatic way to handle this in Scala,
>>>>>>>>>>> but
>>>>>>>>>>> 
>>>>>>>>>> that's
>>>>>>>>>> 
>>>>>>>>>>> not the point I'm trying to make here.)
>>>>>>>>>>> 
>>>>>>>>>>> Hence the question I'm raising here is: Do we want to have an API
>>>>>>>>>>> where
>>>>>>>>>>> 
>>>>>>>>>> you
>>>>>>>>>> 
>>>>>>>>>>> code "the happy path", and then have a different code path for
>>>>>>>>>>> failures
>>>>>>>>>>> (using exceptions and handlers);  or should we treat both
>>>>>>>>>>> Success and
>>>>>>>>>>> Failure in the same way?
>>>>>>>>>>> 
>>>>>>>>>>> I think the failure/exception handling approach (as proposed in
>>>>>>>>>>> this
>>>>>>>>>>> 
>>>>>>>>>> KIP)
>>>>>>>>> is well-suited for errors in the category of deserialization problems
>>>>>>>>>> aka
>>>>>>>>> poison pills, partly because the (default) serdes are defined through
>>>>>>>>>>> configuration (explicit serdes however are defined through API
>>>>>>>>>>> calls).
>>>>>>>>>>> 
>>>>>>>>>>> However, I'm not yet convinced that the failure/exception handling
>>>>>>>>>>> 
>>>>>>>>>> approach
>>>>>>>>>> 
>>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail to
>>>>>>>>>>> guard
>>>>>>>>>>> against NPE in your lambdas or divide a number by zero.
>>>>>>>>>>> 
>>>>>>>>>>>       scala> val stream = Seq(1, 2, 3, 4, 5)
>>>>>>>>>>>       stream: Seq[Int] = List(1, 2, 3, 4, 5)
>>>>>>>>>>> 
>>>>>>>>>>>       // Here: Fallback to a sane default when encountering failed
>>>>>>>>>>> 
>>>>>>>>>> records
>>>>>>>>>       scala>     stream.map(x => Try(1/(3 - x))).flatMap(t =>
>>>>>>>>>>> Seq(t.getOrElse(42)))
>>>>>>>>>>>       res19: Seq[Int] = List(0, 1, 42, -1, 0)
>>>>>>>>>>> 
>>>>>>>>>>>       // Here: Skip over failed records
>>>>>>>>>>>       scala> stream.map(x => Try(1/(3 - x))).collect{ case
>>>>>>>>>>> Success(s)
>>>>>>>>>>> 
>>>>>>>>>> => s
>>>>>>>>> }
>>>>>>>>>>>       res20: Seq[Int] = List(0, 1, -1, 0)
>>>>>>>>>>> 
>>>>>>>>>>> The above is more natural to me than using error handlers to define
>>>>>>>>>>> how
>>>>>>>>>>> 
>>>>>>>>>> to
>>>>>>>>>> 
>>>>>>>>>>> deal with failed records (here, the value `3` causes an arithmetic
>>>>>>>>>>> exception).  Again, it might help the KIP if we added an end-to-end
>>>>>>>>>>> 
>>>>>>>>>> example
>>>>>>>>>> 
>>>>>>>>>>> for such user code errors.
>>>>>>>>>>> ----------------------------snip----------------------------
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <
>>>>>>>>>>> 
>>>>>>>>>> jan.filip...@trivago.com>
>>>>>>>>> wrote:
>>>>>>>>>>> Hi Jay,
>>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only
>>>>>>>>>>>> 
>>>>>>>>>>> ConsumerRecord
>>>>>>>>> deserialisation.
>>>>>>>>>>>> I am working with Database Changelogs only. I would really not
>>>>>>>>>>>> like
>>>>>>>>>>>> to
>>>>>>>>>>>> 
>>>>>>>>>>> see
>>>>>>>>>>> a dead letter queue or something
>>>>>>>>>>>> similliar. how am I expected to get these back in order. Just
>>>>>>>>>>>> grind
>>>>>>>>>>>> to
>>>>>>>>>>>> hold an call me on the weekend. I'll fix it
>>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead letters.
>>>>>>>>>>>> 
>>>>>>>>>>> (where
>>>>>>>>> reprocessing might be even the faster fix)
>>>>>>>>>>>> Best Jan
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> 
>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>       - I think we should hold off on retries unless we have
>>>>>>>>>>>> worked
>>>>>>>>>>>> out
>>>>>>>>> the
>>>>>>>>>>>       full usage pattern, people can always implement their own. I
>>>>>>>>>>>> think
>>>>>>>>> the idea
>>>>>>>>>>>>>       is that you send the message to some kind of dead
>>>>>>>>>>>>> letter queue
>>>>>>>>>>>>> 
>>>>>>>>>>>> and
>>>>>>>>> then
>>>>>>>>>>>>>       replay these later. This obviously destroys all semantic
>>>>>>>>>>>>> 
>>>>>>>>>>>> guarantees
>>>>>>>>> we are
>>>>>>>>>>>>>       working hard to provide right now, which may be okay.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
> 

Reply via email to