Thanks Guozhang,

I’ve updated the KIP and hopefully addressed all the comments so far. In the 
process also changed the name of the KIP to reflect its scope better: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+deserialization+exception+handlers
 
<https://cwiki.apache.org/confluence/display/KAFKA/KIP-161:+streams+deserialization+exception+handlers>

Any other feedback appreciated, otherwise I’ll start the vote soon.

Thanks
Eno

> On Jun 12, 2017, at 6:28 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> 
> Eno, Thanks for bringing this proposal up and sorry for getting late on
> this. Here are my two cents:
> 
> 1. First some meta comments regarding "fail fast" v.s. "making progress". I
> agree that in general we should better "enforce user to do the right thing"
> in system design, but we also need to keep in mind that Kafka is a
> multi-tenant system, i.e. from a Streams app's pov you probably would not
> control the whole streaming processing pipeline end-to-end. E.g. Your input
> data may not be controlled by yourself; it could be written by another app,
> or another team in your company, or even a different organization, and if
> an error happens maybe you cannot fix "to do the right thing" just by
> yourself in time. In such an environment I think it is important to leave
> the door open to let users be more resilient. So I find the current
> proposal which does leave the door open for either fail-fast or make
> progress quite reasonable.
> 
> 2. On the other hand, if the question is whether we should provide a
> built-in "send to bad queue" handler from the library, I think that might
> be an overkill: with some tweaks (see my detailed comments below) on the
> API we can allow users to implement such handlers pretty easily. In fact, I
> feel even "LogAndThresholdExceptionHandler" is not necessary as a built-in
> handler, as it would then require users to specify the threshold via
> configs, etc. I think letting people provide such "eco-libraries" may be
> better.
> 
> 3. Regarding the CRC error: today we validate CRC on both the broker end
> upon receiving produce requests and on consumer end upon receiving fetch
> responses; and if the CRC validation fails in the former case it would not
> be appended to the broker logs. So if we do see a CRC failure on the
> consumer side it has to be that either we have a flipped bit on the broker
> disks or over the wire. For the first case it is fatal while for the second
> it is retriable. Unfortunately we cannot tell which case it is when seeing
> CRC validation failures. But in either case, just skipping and making
> progress seems not a good choice here, and hence I would personally exclude
> these errors from the general serde errors to NOT leave the door open of
> making progress.
> 
> Currently such errors are thrown as KafkaException that wraps an
> InvalidRecordException, which may be too general and we could consider just
> throwing the InvalidRecordException directly. But that could be an
> orthogonal discussion if we agrees that CRC failures should not be
> considered in this KIP.
> 
> ----------------
> 
> Now some detailed comments:
> 
> 4. Could we consider adding the processor context in the handle() function
> as well? This context will be wrapping as the source node that is about to
> process the record. This could expose more info like which task / source
> node sees this error, which timestamp of the message, etc, and also can
> allow users to implement their handlers by exposing some metrics, by
> calling context.forward() to implement the "send to bad queue" behavior etc.
> 
> 5. Could you add the string name of
> StreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER as well in the KIP?
> Personally I find "default" prefix a bit misleading since we do not allow
> users to override it per-node yet. But I'm okay either way as I can see we
> may extend it in the future and probably would like to not rename the
> config again. Also from the experience of `default partitioner` and
> `default timestamp extractor` we may also make sure that the passed in
> object can be either a string "class name" or a class object?
> 
> 
> Guozhang
> 
> 
> On Wed, Jun 7, 2017 at 2:16 PM, Jan Filipiak <jan.filip...@trivago.com>
> wrote:
> 
>> Hi Eno,
>> 
>> On 07.06.2017 22:49, Eno Thereska wrote:
>> 
>>> Comments inline:
>>> 
>>> On 5 Jun 2017, at 18:19, Jan Filipiak <jan.filip...@trivago.com> wrote:
>>>> 
>>>> Hi
>>>> 
>>>> just my few thoughts
>>>> 
>>>> On 05.06.2017 11:44, Eno Thereska wrote:
>>>> 
>>>>> Hi there,
>>>>> 
>>>>> Sorry for the late reply, I was out this past week. Looks like good
>>>>> progress was made with the discussions either way. Let me recap a couple 
>>>>> of
>>>>> points I saw into one big reply:
>>>>> 
>>>>> 1. Jan mentioned CRC errors. I think this is a good point. As these
>>>>> happen in Kafka, before Kafka Streams gets a chance to inspect anything,
>>>>> I'd like to hear the opinion of more Kafka folks like Ismael or Jason on
>>>>> this one. Currently the documentation is not great with what to do once a
>>>>> CRC check has failed. From looking at the code, it looks like the client
>>>>> gets a KafkaException (bubbled up from the fetcher) and currently we in
>>>>> streams catch this as part of poll() and fail. It might be advantageous to
>>>>> treat CRC handling in a similar way to serialisation handling (e.g., have
>>>>> the option to fail/skip). Let's see what the other folks say. Worst-case 
>>>>> we
>>>>> can do a separate KIP for that if it proved too hard to do in one go.
>>>>> 
>>>> there is no reasonable way to "skip" a crc error. How can you know the
>>>> length you read was anything reasonable? you might be completely lost
>>>> inside your response.
>>>> 
>>> On the client side, every record received is checked for validity. As it
>>> happens, if the CRC check fails the exception is wrapped with a
>>> KafkaException that is thrown all the way to poll(). Assuming we change
>>> that and poll() throws a CRC exception, I was thinking we could treat it
>>> similarly to a deserialize exception and pass it to the exception handler
>>> to decide what to do. Default would be to fail. This might need a Kafka KIP
>>> btw and can be done separately from this KIP, but Jan, would you find this
>>> useful?
>>> 
>> I don't think so. IMO you can not reasonably continue parsing when the
>> checksum of a message is not correct. If you are not sure you got the
>> correct length, how can you be sure to find the next record? I would always
>> straight fail in all cases. Its to hard for me to understand why one would
>> try to continue. I mentioned CRC's because thats the only bad pills I ever
>> saw so far. But I am happy that it just stopped and I could check what was
>> going on. This will also be invasive in the client code then.
>> 
>> If you ask me, I am always going to vote for "grind to halt" let the
>> developers see what happened and let them fix it. It helps building good
>> kafka experiences and better software and architectures. For me this is:
>> "force the user todo the right thing". https://youtu.be/aAb7hSCtvGw?t=374
>> eg. not letting unexpected input slip by.  Letting unexpected input slip by
>> is what bought us 15+years of war of all sorts of ingestion attacks. I
>> don't even dare to estimate how many missingrecords-search-teams going be
>> formed, maybe some hackerone for stream apps :D
>> 
>> Best Jan
>> 
>> 
>>> 
>>>>> At a minimum, handling this type of exception will need to involve the
>>>>> exactly-once (EoS) logic. We'd still allow the option of failing or
>>>>> skipping, but EoS would need to clean up by rolling back all the side
>>>>> effects from the processing so far. Matthias, how does this sound?
>>>>> 
>>>> Eos will not help the record might be 5,6 repartitions down into the
>>>> topology. I haven't followed but I pray you made EoS optional! We don't
>>>> need this and we don't want this and we will turn it off if it comes. So I
>>>> wouldn't recommend relying on it. The option to turn it off is better than
>>>> forcing it and still beeing unable to rollback badpills (as explained
>>>> before)
>>>> 
>>> Yeah as Matthias mentioned EoS is optional.
>>> 
>>> Thanks,
>>> Eno
>>> 
>>> 
>>> 6. Will add an end-to-end example as Michael suggested.
>>>>> 
>>>>> Thanks
>>>>> Eno
>>>>> 
>>>>> 
>>>>> 
>>>>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io> wrote:
>>>>>> 
>>>>>> What I don't understand is this:
>>>>>> 
>>>>>> From there on its the easiest way forward: fix, redeploy, start =>
>>>>>>> done
>>>>>>> 
>>>>>> If you have many producers that work fine and a new "bad" producer
>>>>>> starts up and writes bad data into your input topic, your Streams app
>>>>>> dies but all your producers, including the bad one, keep writing.
>>>>>> 
>>>>>> Thus, how would you fix this, as you cannot "remove" the corrupted date
>>>>>> from the topic? It might take some time to identify the root cause and
>>>>>> stop the bad producer. Up to this point you get good and bad data into
>>>>>> your Streams input topic. If Streams app in not able to skip over those
>>>>>> bad records, how would you get all the good data from the topic? Not
>>>>>> saying it's not possible, but it's extra work copying the data with a
>>>>>> new non-Streams consumer-producer-app into a new topic and than feed
>>>>>> your Streams app from this new topic -- you also need to update all
>>>>>> your
>>>>>> upstream producers to write to the new topic.
>>>>>> 
>>>>>> Thus, if you want to fail fast, you can still do this. And after you
>>>>>> detected and fixed the bad producer you might just reconfigure your app
>>>>>> to skip bad records until it reaches the good part of the data.
>>>>>> Afterwards, you could redeploy with fail-fast again.
>>>>>> 
>>>>>> 
>>>>>> Thus, for this pattern, I actually don't see any reason why to stop the
>>>>>> Streams app at all. If you have a callback, and use the callback to
>>>>>> raise an alert (and maybe get the bad data into a bad record queue), it
>>>>>> will not take longer to identify and stop the "bad" producer. But for
>>>>>> this case, you have zero downtime for your Streams app.
>>>>>> 
>>>>>> This seems to be much simpler. Or do I miss anything?
>>>>>> 
>>>>>> 
>>>>>> Having said this, I agree that the "threshold based callback" might be
>>>>>> questionable. But as you argue for strict "fail-fast", I want to argue
>>>>>> that this must not always be the best pattern to apply and that the
>>>>>> overall KIP idea is super useful from my point of view.
>>>>>> 
>>>>>> 
>>>>>> -Matthias
>>>>>> 
>>>>>> 
>>>>>> On 6/3/17 11:57 AM, Jan Filipiak wrote:
>>>>>> 
>>>>>>> Could not agree more!
>>>>>>> 
>>>>>>> But then I think the easiest is still: print exception and die.
>>>>>>> From there on its the easiest way forward: fix, redeploy, start =>
>>>>>>> done
>>>>>>> 
>>>>>>> All the other ways to recover a pipeline that was processing partially
>>>>>>> all the time
>>>>>>> and suddenly went over a "I cant take it anymore" threshold is not
>>>>>>> straight forward IMO.
>>>>>>> 
>>>>>>> How to find the offset, when it became to bad when it is not the
>>>>>>> latest
>>>>>>> commited one?
>>>>>>> How to reset there? with some reasonable stuff in your rockses?
>>>>>>> 
>>>>>>> If one would do the following. The continuing Handler would measure
>>>>>>> for
>>>>>>> a threshold and
>>>>>>> would terminate after a certain threshold has passed (per task). Then
>>>>>>> one can use offset commit/ flush intervals
>>>>>>> to make reasonable assumption of how much is slipping by + you get an
>>>>>>> easy recovery when it gets to bad
>>>>>>> + you could also account for "in processing" records.
>>>>>>> 
>>>>>>> Setting this threshold to zero would cover all cases with 1
>>>>>>> implementation. It is still beneficial to have it pluggable
>>>>>>> 
>>>>>>> Again CRC-Errors are the only bad pills we saw in production for now.
>>>>>>> 
>>>>>>> Best Jan
>>>>>>> 
>>>>>>> 
>>>>>>> On 02.06.2017 17:37, Jay Kreps wrote:
>>>>>>> 
>>>>>>>> Jan, I agree with you philosophically. I think one practical
>>>>>>>> challenge
>>>>>>>> has
>>>>>>>> to do with data formats. Many people use untyped events, so there is
>>>>>>>> simply
>>>>>>>> no guarantee on the form of the input. E.g. many companies use JSON
>>>>>>>> without
>>>>>>>> any kind of schema so it becomes very hard to assert anything about
>>>>>>>> the
>>>>>>>> input which makes these programs very fragile to the "one accidental
>>>>>>>> message publication that creates an unsolvable problem.
>>>>>>>> 
>>>>>>>> For that reason I do wonder if limiting to just serialization
>>>>>>>> actually
>>>>>>>> gets
>>>>>>>> you a useful solution. For JSON it will help with the problem of
>>>>>>>> non-parseable JSON, but sounds like it won't help in the case where
>>>>>>>> the
>>>>>>>> JSON is well-formed but does not have any of the fields you expect
>>>>>>>> and
>>>>>>>> depend on for your processing. I expect the reason for limiting the
>>>>>>>> scope
>>>>>>>> is it is pretty hard to reason about correctness for anything that
>>>>>>>> stops in
>>>>>>>> the middle of processing an operator DAG?
>>>>>>>> 
>>>>>>>> -Jay
>>>>>>>> 
>>>>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <
>>>>>>>> jan.filip...@trivago.com>
>>>>>>>> wrote:
>>>>>>>> 
>>>>>>>> IMHO your doing it wrong then. + building to much support into the
>>>>>>>>> kafka
>>>>>>>>> eco system is very counterproductive in fostering a happy userbase
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On 02.06.2017 13:15, Damian Guy wrote:
>>>>>>>>> 
>>>>>>>>> Jan, you have a choice to Fail fast if you want. This is about
>>>>>>>>>> giving
>>>>>>>>>> people options and there are times when you don't want to fail
>>>>>>>>>> fast.
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <jan.filip...@trivago.com
>>>>>>>>>>> 
>>>>>>>>>> wrote:
>>>>>>>>>> 
>>>>>>>>>> Hi
>>>>>>>>>> 
>>>>>>>>>>> 1.
>>>>>>>>>>> That greatly complicates monitoring.  Fail Fast gives you that
>>>>>>>>>>> when
>>>>>>>>>>> you
>>>>>>>>>>> monitor only the lag of all your apps
>>>>>>>>>>> you are completely covered. With that sort of new application
>>>>>>>>>>> Monitoring
>>>>>>>>>>> is very much more complicated as
>>>>>>>>>>> you know need to monitor fail % of some special apps aswell. In my
>>>>>>>>>>> opinion that is a huge downside already.
>>>>>>>>>>> 
>>>>>>>>>>> 2.
>>>>>>>>>>> using a schema regerstry like Avrostuff it might not even be the
>>>>>>>>>>> record
>>>>>>>>>>> that is broken, it might be just your app
>>>>>>>>>>> unable to fetch a schema it needs now know. Maybe you got
>>>>>>>>>>> partitioned
>>>>>>>>>>> away from that registry.
>>>>>>>>>>> 
>>>>>>>>>>> 3. When you get alerted because of to high fail percentage. what
>>>>>>>>>>> are the
>>>>>>>>>>> steps you gonna do?
>>>>>>>>>>> shut it down to buy time. fix the problem. spend way to much time
>>>>>>>>>>> to
>>>>>>>>>>> find a good reprocess offset.
>>>>>>>>>>> Your timewindows are in bad shape anyways, and you pretty much
>>>>>>>>>>> lost.
>>>>>>>>>>> This routine is nonsense.
>>>>>>>>>>> 
>>>>>>>>>>> Dead letter queues would be the worst possible addition to the
>>>>>>>>>>> kafka
>>>>>>>>>>> toolkit that I can think of. It just doesn't fit the architecture
>>>>>>>>>>> of having clients falling behind is a valid option.
>>>>>>>>>>> 
>>>>>>>>>>> Further. I mentioned already the only bad pill ive seen so far is
>>>>>>>>>>> crc
>>>>>>>>>>> errors. any plans for those?
>>>>>>>>>>> 
>>>>>>>>>>> Best Jan
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> 
>>>>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote:
>>>>>>>>>>> 
>>>>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There are
>>>>>>>>>>>> plenty
>>>>>>>>>>>> 
>>>>>>>>>>>> of
>>>>>>>>>>> 
>>>>>>>>>>> times when you don't want to fail-fast and must attempt to  make
>>>>>>>>>>>> 
>>>>>>>>>>>> progress.
>>>>>>>>>>> 
>>>>>>>>>>> The dead-letter queue is exactly for these circumstances. Of
>>>>>>>>>>>> course if
>>>>>>>>>>>> every record is failing, then you probably do want to give up.
>>>>>>>>>>>> 
>>>>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <
>>>>>>>>>>>> matth...@confluent.io>
>>>>>>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>> 
>>>>>>>>>>> First a meta comment. KIP discussion should take place on the dev
>>>>>>>>>>>> list
>>>>>>>>>>>> 
>>>>>>>>>>>>> -- if user list is cc'ed please make sure to reply to both
>>>>>>>>>>>>> lists.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks.
>>>>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of
>>>>>>>>>>>> sense to
>>>>>>>>>>>> 
>>>>>>>>>>>>> focus on deserialization exceptions for now.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> With regard to corrupted state stores, would it make sense to
>>>>>>>>>>>>> fail a
>>>>>>>>>>>>> task and wipe out the store to repair it via recreation from the
>>>>>>>>>>>>> changelog? That's of course a quite advance pattern, but I want
>>>>>>>>>>>>> to
>>>>>>>>>>>>> bring
>>>>>>>>>>>>> it up to design the first step in a way such that we can get
>>>>>>>>>>>>> there (if
>>>>>>>>>>>>> we think it's a reasonable idea).
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I also want to comment about fail fast vs making progress. I
>>>>>>>>>>>>> think that
>>>>>>>>>>>>> fail-fast must not always be the best option. The scenario I
>>>>>>>>>>>>> have in
>>>>>>>>>>>>> mind is like this: you got a bunch of producers that feed the
>>>>>>>>>>>>> Streams
>>>>>>>>>>>>> input topic. Most producers work find, but maybe one producer
>>>>>>>>>>>>> miss
>>>>>>>>>>>>> behaves and the data it writes is corrupted. You might not even
>>>>>>>>>>>>> be able
>>>>>>>>>>>>> to recover this lost data at any point -- thus, there is no
>>>>>>>>>>>>> reason to
>>>>>>>>>>>>> stop processing but you just skip over those records. Of
>>>>>>>>>>>>> course, you
>>>>>>>>>>>>> need to fix the root cause, and thus you need to alert (either
>>>>>>>>>>>>> via logs
>>>>>>>>>>>>> of the exception handler directly) and you need to start to
>>>>>>>>>>>>> investigate
>>>>>>>>>>>>> to find the bad producer, shut it down and fix it.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Here the dead letter queue comes into place. From my
>>>>>>>>>>>>> understanding, the
>>>>>>>>>>>>> purpose of this feature is solely enable post debugging. I don't
>>>>>>>>>>>>> think
>>>>>>>>>>>>> those record would be fed back at any point in time (so I don't
>>>>>>>>>>>>> see any
>>>>>>>>>>>>> ordering issue -- a skipped record, with this regard, is just
>>>>>>>>>>>>> "fully
>>>>>>>>>>>>> processed"). Thus, the dead letter queue should actually encode
>>>>>>>>>>>>> the
>>>>>>>>>>>>> original records metadata (topic, partition offset etc) to
>>>>>>>>>>>>> enable
>>>>>>>>>>>>> such
>>>>>>>>>>>>> debugging. I guess, this might also be possible if you just log
>>>>>>>>>>>>> the bad
>>>>>>>>>>>>> records, but it would be harder to access (you first must find
>>>>>>>>>>>>> the
>>>>>>>>>>>>> Streams instance that did write the log and extract the
>>>>>>>>>>>>> information
>>>>>>>>>>>>> from
>>>>>>>>>>>>> there). Reading it from topic is much simpler.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> I also want to mention the following. Assume you have such a
>>>>>>>>>>>>> topic with
>>>>>>>>>>>>> some bad records and some good records. If we always fail-fast,
>>>>>>>>>>>>> it's
>>>>>>>>>>>>> going to be super hard to process the good data. You would need
>>>>>>>>>>>>> to
>>>>>>>>>>>>> write
>>>>>>>>>>>>> an extra app that copied the data into a new topic filtering
>>>>>>>>>>>>> out the
>>>>>>>>>>>>> bad
>>>>>>>>>>>>> records (or apply the map() workaround withing stream). So I
>>>>>>>>>>>>> don't
>>>>>>>>>>>>> think
>>>>>>>>>>>>> that failing fast is most likely the best option in production
>>>>>>>>>>>>> is
>>>>>>>>>>>>> necessarily, true.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Or do you think there are scenarios, for which you can recover
>>>>>>>>>>>>> the
>>>>>>>>>>>>> corrupted records successfully? And even if this is possible, it
>>>>>>>>>>>>> might
>>>>>>>>>>>>> be a case for reprocessing instead of failing the whole
>>>>>>>>>>>>> application?
>>>>>>>>>>>>> Also, if you think you can "repair" a corrupted record, should
>>>>>>>>>>>>> the
>>>>>>>>>>>>> handler allow to return a "fixed" record? This would solve the
>>>>>>>>>>>>> ordering
>>>>>>>>>>>>> problem.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> -Matthias
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> 
>>>>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote:
>>>>>>>>>>>>> 
>>>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated!
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> - I think it would help to improve the KIP by adding an
>>>>>>>>>>>>>> end-to-end
>>>>>>>>>>>>>> code
>>>>>>>>>>>>>> example that demonstrates, with the DSL and with the Processor
>>>>>>>>>>>>>> API,
>>>>>>>>>>>>>> how
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the
>>>>>>>>>>>>> 
>>>>>>>>>>>>> user would write a simple application that would then be
>>>>>>>>>>>>>> augmented
>>>>>>>>>>>>>> with
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the
>>>>>>>>>>>>> 
>>>>>>>>>>>>> proposed KIP changes to handle exceptions.  It should also
>>>>>>>>>>>>>> become much
>>>>>>>>>>>>>> clearer then that e.g. the KIP would lead to different code
>>>>>>>>>>>>>> paths for
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the
>>>>>>>>>>>>> 
>>>>>>>>>>>> happy case and any failure scenarios.
>>>>>>>>>>>> 
>>>>>>>>>>>>> - Do we have sufficient information available to make informed
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> decisions
>>>>>>>>>>>>> 
>>>>>>>>>>>> on
>>>>>>>>>>>> 
>>>>>>>>>>>>> what to do next?  For example, do we know in which part of the
>>>>>>>>>>>>>> topology
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the
>>>>>>>>>>>>> 
>>>>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic,
>>>>>>>>>>>>>> partition,
>>>>>>>>>>>>>> offset, timestamp, etc., but what about topology-related
>>>>>>>>>>>>>> information
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> (e.g.
>>>>>>>>>>>>> 
>>>>>>>>>>>>> what is the associated state store, if any)?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this is
>>>>>>>>>>>>>> about
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> bigger picture: This KIP would give users the option to send
>>>>>>>>>>>>>> corrupted
>>>>>>>>>>>>>> records to dead letter queue (quarantine topic).  But, what
>>>>>>>>>>>>>> pattern
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> would
>>>>>>>>>>>>> 
>>>>>>>>>>>> we advocate to process such a dead letter queue then, e.g. how to
>>>>>>>>>>>> allow
>>>>>>>>>>>> 
>>>>>>>>>>>>> for
>>>>>>>>>>>>> 
>>>>>>>>>>>>> retries with backoff ("If the first record in the dead letter
>>>>>>>>>>>>>> queue
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> fails
>>>>>>>>>>>>> 
>>>>>>>>>>>> again, then try the second record for the time being and go back
>>>>>>>>>>>> to the
>>>>>>>>>>>> 
>>>>>>>>>>>>> first record at a later time").  Jay and Jan already alluded to
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> ordering
>>>>>>>>>>>>> 
>>>>>>>>>>>> problems that will be caused by dead letter queues. As I said,
>>>>>>>>>>>> retries
>>>>>>>>>>>> 
>>>>>>>>>>>>> might be out of scope but perhaps the implications should be
>>>>>>>>>>>>>> considered
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> if
>>>>>>>>>>>>> 
>>>>>>>>>>>>> possible?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Also, I wrote the text below before reaching the point in the
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> conversation
>>>>>>>>>>>>> 
>>>>>>>>>>>>> that this KIP's scope will be limited to exceptions in the
>>>>>>>>>>>>>> category of
>>>>>>>>>>>>>> poison pills / deserialization errors.  But since Jay brought
>>>>>>>>>>>>>> up
>>>>>>>>>>>>>> user
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> code
>>>>>>>>>>>>> 
>>>>>>>>>>>>> errors again, I decided to include it again.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> ----------------------------snip----------------------------
>>>>>>>>>>>>>> A meta comment: I am not sure about this split between the
>>>>>>>>>>>>>> code for
>>>>>>>>>>>>>> the
>>>>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure
>>>>>>>>>>>>>> path
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> (using
>>>>>>>>>>>>> 
>>>>>>>>>>>> exception handlers).  In Scala, for example, we can do:
>>>>>>>>>>>> 
>>>>>>>>>>>>>       scala> val computation = scala.util.Try(1 / 0)
>>>>>>>>>>>>>>       computation: scala.util.Try[Int] =
>>>>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>       scala> computation.getOrElse(42)
>>>>>>>>>>>>>>       res2: Int = 42
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Another example with Scala's pattern matching, which is
>>>>>>>>>>>>>> similar to
>>>>>>>>>>>>>> `KStream#branch()`:
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>       computation match {
>>>>>>>>>>>>>>         case scala.util.Success(x) => x * 5
>>>>>>>>>>>>>>         case scala.util.Failure(_) => 42
>>>>>>>>>>>>>>       }
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> (The above isn't the most idiomatic way to handle this in
>>>>>>>>>>>>>> Scala,
>>>>>>>>>>>>>> but
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> that's
>>>>>>>>>>>>> 
>>>>>>>>>>>>> not the point I'm trying to make here.)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> Hence the question I'm raising here is: Do we want to have an
>>>>>>>>>>>>>> API
>>>>>>>>>>>>>> where
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> you
>>>>>>>>>>>>> 
>>>>>>>>>>>>> code "the happy path", and then have a different code path for
>>>>>>>>>>>>>> failures
>>>>>>>>>>>>>> (using exceptions and handlers);  or should we treat both
>>>>>>>>>>>>>> Success and
>>>>>>>>>>>>>> Failure in the same way?
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> I think the failure/exception handling approach (as proposed in
>>>>>>>>>>>>>> this
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> KIP)
>>>>>>>>>>>>> 
>>>>>>>>>>>> is well-suited for errors in the category of deserialization
>>>>>>>>>>>> problems
>>>>>>>>>>>> 
>>>>>>>>>>>>> aka
>>>>>>>>>>>>> 
>>>>>>>>>>>> poison pills, partly because the (default) serdes are defined
>>>>>>>>>>>> through
>>>>>>>>>>>> 
>>>>>>>>>>>>> configuration (explicit serdes however are defined through API
>>>>>>>>>>>>>> calls).
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> However, I'm not yet convinced that the failure/exception
>>>>>>>>>>>>>> handling
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> approach
>>>>>>>>>>>>> 
>>>>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail to
>>>>>>>>>>>>>> guard
>>>>>>>>>>>>>> against NPE in your lambdas or divide a number by zero.
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>       scala> val stream = Seq(1, 2, 3, 4, 5)
>>>>>>>>>>>>>>       stream: Seq[Int] = List(1, 2, 3, 4, 5)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>       // Here: Fallback to a sane default when encountering
>>>>>>>>>>>>>> failed
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> records
>>>>>>>>>>>>> 
>>>>>>>>>>>>       scala>     stream.map(x => Try(1/(3 - x))).flatMap(t =>
>>>>>>>>>>>> 
>>>>>>>>>>>>> Seq(t.getOrElse(42)))
>>>>>>>>>>>>>>       res19: Seq[Int] = List(0, 1, 42, -1, 0)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>       // Here: Skip over failed records
>>>>>>>>>>>>>>       scala> stream.map(x => Try(1/(3 - x))).collect{ case
>>>>>>>>>>>>>> Success(s)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> => s
>>>>>>>>>>>>> 
>>>>>>>>>>>> }
>>>>>>>>>>>> 
>>>>>>>>>>>>>       res20: Seq[Int] = List(0, 1, -1, 0)
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> The above is more natural to me than using error handlers to
>>>>>>>>>>>>>> define
>>>>>>>>>>>>>> how
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> to
>>>>>>>>>>>>> 
>>>>>>>>>>>>> deal with failed records (here, the value `3` causes an
>>>>>>>>>>>>>> arithmetic
>>>>>>>>>>>>>> exception).  Again, it might help the KIP if we added an
>>>>>>>>>>>>>> end-to-end
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> example
>>>>>>>>>>>>> 
>>>>>>>>>>>>> for such user code errors.
>>>>>>>>>>>>>> ----------------------------snip----------------------------
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> jan.filip...@trivago.com>
>>>>>>>>>>>>> 
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> 
>>>>>>>>>>>>> Hi Jay,
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> ConsumerRecord
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> deserialisation.
>>>>>>>>>>>> 
>>>>>>>>>>>>> I am working with Database Changelogs only. I would really not
>>>>>>>>>>>>>>> like
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> see
>>>>>>>>>>>>>> a dead letter queue or something
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> similliar. how am I expected to get these back in order. Just
>>>>>>>>>>>>>>> grind
>>>>>>>>>>>>>>> to
>>>>>>>>>>>>>>> hold an call me on the weekend. I'll fix it
>>>>>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead
>>>>>>>>>>>>>>> letters.
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> (where
>>>>>>>>>>>>>> 
>>>>>>>>>>>>> reprocessing might be even the faster fix)
>>>>>>>>>>>> 
>>>>>>>>>>>>> Best Jan
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote:
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>       - I think we should hold off on retries unless we have
>>>>>>>>>>>>>>> worked
>>>>>>>>>>>>>>> out
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the
>>>>>>>>>>>> 
>>>>>>>>>>>>>       full usage pattern, people can always implement their
>>>>>>>>>>>>>> own. I
>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>> think
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> the idea
>>>>>>>>>>>> 
>>>>>>>>>>>>>       is that you send the message to some kind of dead
>>>>>>>>>>>>>>>> letter queue
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> and
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> then
>>>>>>>>>>>> 
>>>>>>>>>>>>>       replay these later. This obviously destroys all semantic
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> guarantees
>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>> we are
>>>>>>>>>>>> 
>>>>>>>>>>>>>       working hard to provide right now, which may be okay.
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>>>>>>>>>>>>>>>> 
>> 
> 
> 
> -- 
> -- Guozhang

Reply via email to