Thanks for the updated KIP, some more comments:

1.The config name is "default.deserialization.exception.handler" while the
interface class name is "RecordExceptionHandler", which is more general
than the intended purpose. Could we rename the class name accordingly?

2. Could you describe the full implementation of "DefaultExceptionHandler",
currently it is not clear to me how it is implemented with the configured
value.

In addition, I think we do not need to include an additional
"DEFAULT_DESERIALIZATION_EXCEPTION_RESPONSE_CONFIG" as the configure()
function is mainly used for users to pass any customized parameters that is
out of the Streams library; plus adding such additional config sounds
over-complicated for a default exception handler. Instead I'd suggest we
just provide two handlers (or three if people feel strong about the
LogAndThresholdExceptionHandler), one for FailOnExceptionHandler and one
for LogAndContinueOnExceptionHandler. And we can set
LogAndContinueOnExceptionHandler
by default.


Guozhang








On Wed, Jun 21, 2017 at 1:39 AM, Eno Thereska <eno.there...@gmail.com>
wrote:

> Thanks Guozhang,
>
> I’ve updated the KIP and hopefully addressed all the comments so far. In
> the process also changed the name of the KIP to reflect its scope better:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-161%3A+streams+
> deserialization+exception+handlers <https://cwiki.apache.org/
> confluence/display/KAFKA/KIP-161:+streams+deserialization+
> exception+handlers>
>
> Any other feedback appreciated, otherwise I’ll start the vote soon.
>
> Thanks
> Eno
>
> > On Jun 12, 2017, at 6:28 AM, Guozhang Wang <wangg...@gmail.com> wrote:
> >
> > Eno, Thanks for bringing this proposal up and sorry for getting late on
> > this. Here are my two cents:
> >
> > 1. First some meta comments regarding "fail fast" v.s. "making
> progress". I
> > agree that in general we should better "enforce user to do the right
> thing"
> > in system design, but we also need to keep in mind that Kafka is a
> > multi-tenant system, i.e. from a Streams app's pov you probably would not
> > control the whole streaming processing pipeline end-to-end. E.g. Your
> input
> > data may not be controlled by yourself; it could be written by another
> app,
> > or another team in your company, or even a different organization, and if
> > an error happens maybe you cannot fix "to do the right thing" just by
> > yourself in time. In such an environment I think it is important to leave
> > the door open to let users be more resilient. So I find the current
> > proposal which does leave the door open for either fail-fast or make
> > progress quite reasonable.
> >
> > 2. On the other hand, if the question is whether we should provide a
> > built-in "send to bad queue" handler from the library, I think that might
> > be an overkill: with some tweaks (see my detailed comments below) on the
> > API we can allow users to implement such handlers pretty easily. In
> fact, I
> > feel even "LogAndThresholdExceptionHandler" is not necessary as a
> built-in
> > handler, as it would then require users to specify the threshold via
> > configs, etc. I think letting people provide such "eco-libraries" may be
> > better.
> >
> > 3. Regarding the CRC error: today we validate CRC on both the broker end
> > upon receiving produce requests and on consumer end upon receiving fetch
> > responses; and if the CRC validation fails in the former case it would
> not
> > be appended to the broker logs. So if we do see a CRC failure on the
> > consumer side it has to be that either we have a flipped bit on the
> broker
> > disks or over the wire. For the first case it is fatal while for the
> second
> > it is retriable. Unfortunately we cannot tell which case it is when
> seeing
> > CRC validation failures. But in either case, just skipping and making
> > progress seems not a good choice here, and hence I would personally
> exclude
> > these errors from the general serde errors to NOT leave the door open of
> > making progress.
> >
> > Currently such errors are thrown as KafkaException that wraps an
> > InvalidRecordException, which may be too general and we could consider
> just
> > throwing the InvalidRecordException directly. But that could be an
> > orthogonal discussion if we agrees that CRC failures should not be
> > considered in this KIP.
> >
> > ----------------
> >
> > Now some detailed comments:
> >
> > 4. Could we consider adding the processor context in the handle()
> function
> > as well? This context will be wrapping as the source node that is about
> to
> > process the record. This could expose more info like which task / source
> > node sees this error, which timestamp of the message, etc, and also can
> > allow users to implement their handlers by exposing some metrics, by
> > calling context.forward() to implement the "send to bad queue" behavior
> etc.
> >
> > 5. Could you add the string name of
> > StreamsConfig.DEFAULT_RECORD_EXCEPTION_HANDLER as well in the KIP?
> > Personally I find "default" prefix a bit misleading since we do not allow
> > users to override it per-node yet. But I'm okay either way as I can see
> we
> > may extend it in the future and probably would like to not rename the
> > config again. Also from the experience of `default partitioner` and
> > `default timestamp extractor` we may also make sure that the passed in
> > object can be either a string "class name" or a class object?
> >
> >
> > Guozhang
> >
> >
> > On Wed, Jun 7, 2017 at 2:16 PM, Jan Filipiak <jan.filip...@trivago.com>
> > wrote:
> >
> >> Hi Eno,
> >>
> >> On 07.06.2017 22:49, Eno Thereska wrote:
> >>
> >>> Comments inline:
> >>>
> >>> On 5 Jun 2017, at 18:19, Jan Filipiak <jan.filip...@trivago.com>
> wrote:
> >>>>
> >>>> Hi
> >>>>
> >>>> just my few thoughts
> >>>>
> >>>> On 05.06.2017 11:44, Eno Thereska wrote:
> >>>>
> >>>>> Hi there,
> >>>>>
> >>>>> Sorry for the late reply, I was out this past week. Looks like good
> >>>>> progress was made with the discussions either way. Let me recap a
> couple of
> >>>>> points I saw into one big reply:
> >>>>>
> >>>>> 1. Jan mentioned CRC errors. I think this is a good point. As these
> >>>>> happen in Kafka, before Kafka Streams gets a chance to inspect
> anything,
> >>>>> I'd like to hear the opinion of more Kafka folks like Ismael or
> Jason on
> >>>>> this one. Currently the documentation is not great with what to do
> once a
> >>>>> CRC check has failed. From looking at the code, it looks like the
> client
> >>>>> gets a KafkaException (bubbled up from the fetcher) and currently we
> in
> >>>>> streams catch this as part of poll() and fail. It might be
> advantageous to
> >>>>> treat CRC handling in a similar way to serialisation handling (e.g.,
> have
> >>>>> the option to fail/skip). Let's see what the other folks say.
> Worst-case we
> >>>>> can do a separate KIP for that if it proved too hard to do in one go.
> >>>>>
> >>>> there is no reasonable way to "skip" a crc error. How can you know the
> >>>> length you read was anything reasonable? you might be completely lost
> >>>> inside your response.
> >>>>
> >>> On the client side, every record received is checked for validity. As
> it
> >>> happens, if the CRC check fails the exception is wrapped with a
> >>> KafkaException that is thrown all the way to poll(). Assuming we change
> >>> that and poll() throws a CRC exception, I was thinking we could treat
> it
> >>> similarly to a deserialize exception and pass it to the exception
> handler
> >>> to decide what to do. Default would be to fail. This might need a
> Kafka KIP
> >>> btw and can be done separately from this KIP, but Jan, would you find
> this
> >>> useful?
> >>>
> >> I don't think so. IMO you can not reasonably continue parsing when the
> >> checksum of a message is not correct. If you are not sure you got the
> >> correct length, how can you be sure to find the next record? I would
> always
> >> straight fail in all cases. Its to hard for me to understand why one
> would
> >> try to continue. I mentioned CRC's because thats the only bad pills I
> ever
> >> saw so far. But I am happy that it just stopped and I could check what
> was
> >> going on. This will also be invasive in the client code then.
> >>
> >> If you ask me, I am always going to vote for "grind to halt" let the
> >> developers see what happened and let them fix it. It helps building good
> >> kafka experiences and better software and architectures. For me this is:
> >> "force the user todo the right thing". https://youtu.be/aAb7hSCtvGw?
> t=374
> >> eg. not letting unexpected input slip by.  Letting unexpected input
> slip by
> >> is what bought us 15+years of war of all sorts of ingestion attacks. I
> >> don't even dare to estimate how many missingrecords-search-teams going
> be
> >> formed, maybe some hackerone for stream apps :D
> >>
> >> Best Jan
> >>
> >>
> >>>
> >>>>> At a minimum, handling this type of exception will need to involve
> the
> >>>>> exactly-once (EoS) logic. We'd still allow the option of failing or
> >>>>> skipping, but EoS would need to clean up by rolling back all the side
> >>>>> effects from the processing so far. Matthias, how does this sound?
> >>>>>
> >>>> Eos will not help the record might be 5,6 repartitions down into the
> >>>> topology. I haven't followed but I pray you made EoS optional! We
> don't
> >>>> need this and we don't want this and we will turn it off if it comes.
> So I
> >>>> wouldn't recommend relying on it. The option to turn it off is better
> than
> >>>> forcing it and still beeing unable to rollback badpills (as explained
> >>>> before)
> >>>>
> >>> Yeah as Matthias mentioned EoS is optional.
> >>>
> >>> Thanks,
> >>> Eno
> >>>
> >>>
> >>> 6. Will add an end-to-end example as Michael suggested.
> >>>>>
> >>>>> Thanks
> >>>>> Eno
> >>>>>
> >>>>>
> >>>>>
> >>>>> On 4 Jun 2017, at 02:35, Matthias J. Sax <matth...@confluent.io>
> wrote:
> >>>>>>
> >>>>>> What I don't understand is this:
> >>>>>>
> >>>>>> From there on its the easiest way forward: fix, redeploy, start =>
> >>>>>>> done
> >>>>>>>
> >>>>>> If you have many producers that work fine and a new "bad" producer
> >>>>>> starts up and writes bad data into your input topic, your Streams
> app
> >>>>>> dies but all your producers, including the bad one, keep writing.
> >>>>>>
> >>>>>> Thus, how would you fix this, as you cannot "remove" the corrupted
> date
> >>>>>> from the topic? It might take some time to identify the root cause
> and
> >>>>>> stop the bad producer. Up to this point you get good and bad data
> into
> >>>>>> your Streams input topic. If Streams app in not able to skip over
> those
> >>>>>> bad records, how would you get all the good data from the topic? Not
> >>>>>> saying it's not possible, but it's extra work copying the data with
> a
> >>>>>> new non-Streams consumer-producer-app into a new topic and than feed
> >>>>>> your Streams app from this new topic -- you also need to update all
> >>>>>> your
> >>>>>> upstream producers to write to the new topic.
> >>>>>>
> >>>>>> Thus, if you want to fail fast, you can still do this. And after you
> >>>>>> detected and fixed the bad producer you might just reconfigure your
> app
> >>>>>> to skip bad records until it reaches the good part of the data.
> >>>>>> Afterwards, you could redeploy with fail-fast again.
> >>>>>>
> >>>>>>
> >>>>>> Thus, for this pattern, I actually don't see any reason why to stop
> the
> >>>>>> Streams app at all. If you have a callback, and use the callback to
> >>>>>> raise an alert (and maybe get the bad data into a bad record
> queue), it
> >>>>>> will not take longer to identify and stop the "bad" producer. But
> for
> >>>>>> this case, you have zero downtime for your Streams app.
> >>>>>>
> >>>>>> This seems to be much simpler. Or do I miss anything?
> >>>>>>
> >>>>>>
> >>>>>> Having said this, I agree that the "threshold based callback" might
> be
> >>>>>> questionable. But as you argue for strict "fail-fast", I want to
> argue
> >>>>>> that this must not always be the best pattern to apply and that the
> >>>>>> overall KIP idea is super useful from my point of view.
> >>>>>>
> >>>>>>
> >>>>>> -Matthias
> >>>>>>
> >>>>>>
> >>>>>> On 6/3/17 11:57 AM, Jan Filipiak wrote:
> >>>>>>
> >>>>>>> Could not agree more!
> >>>>>>>
> >>>>>>> But then I think the easiest is still: print exception and die.
> >>>>>>> From there on its the easiest way forward: fix, redeploy, start =>
> >>>>>>> done
> >>>>>>>
> >>>>>>> All the other ways to recover a pipeline that was processing
> partially
> >>>>>>> all the time
> >>>>>>> and suddenly went over a "I cant take it anymore" threshold is not
> >>>>>>> straight forward IMO.
> >>>>>>>
> >>>>>>> How to find the offset, when it became to bad when it is not the
> >>>>>>> latest
> >>>>>>> commited one?
> >>>>>>> How to reset there? with some reasonable stuff in your rockses?
> >>>>>>>
> >>>>>>> If one would do the following. The continuing Handler would measure
> >>>>>>> for
> >>>>>>> a threshold and
> >>>>>>> would terminate after a certain threshold has passed (per task).
> Then
> >>>>>>> one can use offset commit/ flush intervals
> >>>>>>> to make reasonable assumption of how much is slipping by + you get
> an
> >>>>>>> easy recovery when it gets to bad
> >>>>>>> + you could also account for "in processing" records.
> >>>>>>>
> >>>>>>> Setting this threshold to zero would cover all cases with 1
> >>>>>>> implementation. It is still beneficial to have it pluggable
> >>>>>>>
> >>>>>>> Again CRC-Errors are the only bad pills we saw in production for
> now.
> >>>>>>>
> >>>>>>> Best Jan
> >>>>>>>
> >>>>>>>
> >>>>>>> On 02.06.2017 17:37, Jay Kreps wrote:
> >>>>>>>
> >>>>>>>> Jan, I agree with you philosophically. I think one practical
> >>>>>>>> challenge
> >>>>>>>> has
> >>>>>>>> to do with data formats. Many people use untyped events, so there
> is
> >>>>>>>> simply
> >>>>>>>> no guarantee on the form of the input. E.g. many companies use
> JSON
> >>>>>>>> without
> >>>>>>>> any kind of schema so it becomes very hard to assert anything
> about
> >>>>>>>> the
> >>>>>>>> input which makes these programs very fragile to the "one
> accidental
> >>>>>>>> message publication that creates an unsolvable problem.
> >>>>>>>>
> >>>>>>>> For that reason I do wonder if limiting to just serialization
> >>>>>>>> actually
> >>>>>>>> gets
> >>>>>>>> you a useful solution. For JSON it will help with the problem of
> >>>>>>>> non-parseable JSON, but sounds like it won't help in the case
> where
> >>>>>>>> the
> >>>>>>>> JSON is well-formed but does not have any of the fields you expect
> >>>>>>>> and
> >>>>>>>> depend on for your processing. I expect the reason for limiting
> the
> >>>>>>>> scope
> >>>>>>>> is it is pretty hard to reason about correctness for anything that
> >>>>>>>> stops in
> >>>>>>>> the middle of processing an operator DAG?
> >>>>>>>>
> >>>>>>>> -Jay
> >>>>>>>>
> >>>>>>>> On Fri, Jun 2, 2017 at 4:50 AM, Jan Filipiak <
> >>>>>>>> jan.filip...@trivago.com>
> >>>>>>>> wrote:
> >>>>>>>>
> >>>>>>>> IMHO your doing it wrong then. + building to much support into the
> >>>>>>>>> kafka
> >>>>>>>>> eco system is very counterproductive in fostering a happy
> userbase
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 02.06.2017 13:15, Damian Guy wrote:
> >>>>>>>>>
> >>>>>>>>> Jan, you have a choice to Fail fast if you want. This is about
> >>>>>>>>>> giving
> >>>>>>>>>> people options and there are times when you don't want to fail
> >>>>>>>>>> fast.
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> On Fri, 2 Jun 2017 at 11:00 Jan Filipiak <
> jan.filip...@trivago.com
> >>>>>>>>>>>
> >>>>>>>>>> wrote:
> >>>>>>>>>>
> >>>>>>>>>> Hi
> >>>>>>>>>>
> >>>>>>>>>>> 1.
> >>>>>>>>>>> That greatly complicates monitoring.  Fail Fast gives you that
> >>>>>>>>>>> when
> >>>>>>>>>>> you
> >>>>>>>>>>> monitor only the lag of all your apps
> >>>>>>>>>>> you are completely covered. With that sort of new application
> >>>>>>>>>>> Monitoring
> >>>>>>>>>>> is very much more complicated as
> >>>>>>>>>>> you know need to monitor fail % of some special apps aswell.
> In my
> >>>>>>>>>>> opinion that is a huge downside already.
> >>>>>>>>>>>
> >>>>>>>>>>> 2.
> >>>>>>>>>>> using a schema regerstry like Avrostuff it might not even be
> the
> >>>>>>>>>>> record
> >>>>>>>>>>> that is broken, it might be just your app
> >>>>>>>>>>> unable to fetch a schema it needs now know. Maybe you got
> >>>>>>>>>>> partitioned
> >>>>>>>>>>> away from that registry.
> >>>>>>>>>>>
> >>>>>>>>>>> 3. When you get alerted because of to high fail percentage.
> what
> >>>>>>>>>>> are the
> >>>>>>>>>>> steps you gonna do?
> >>>>>>>>>>> shut it down to buy time. fix the problem. spend way to much
> time
> >>>>>>>>>>> to
> >>>>>>>>>>> find a good reprocess offset.
> >>>>>>>>>>> Your timewindows are in bad shape anyways, and you pretty much
> >>>>>>>>>>> lost.
> >>>>>>>>>>> This routine is nonsense.
> >>>>>>>>>>>
> >>>>>>>>>>> Dead letter queues would be the worst possible addition to the
> >>>>>>>>>>> kafka
> >>>>>>>>>>> toolkit that I can think of. It just doesn't fit the
> architecture
> >>>>>>>>>>> of having clients falling behind is a valid option.
> >>>>>>>>>>>
> >>>>>>>>>>> Further. I mentioned already the only bad pill ive seen so far
> is
> >>>>>>>>>>> crc
> >>>>>>>>>>> errors. any plans for those?
> >>>>>>>>>>>
> >>>>>>>>>>> Best Jan
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>> On 02.06.2017 11:34, Damian Guy wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> I agree with what Matthias has said w.r.t failing fast. There
> are
> >>>>>>>>>>>> plenty
> >>>>>>>>>>>>
> >>>>>>>>>>>> of
> >>>>>>>>>>>
> >>>>>>>>>>> times when you don't want to fail-fast and must attempt to
> make
> >>>>>>>>>>>>
> >>>>>>>>>>>> progress.
> >>>>>>>>>>>
> >>>>>>>>>>> The dead-letter queue is exactly for these circumstances. Of
> >>>>>>>>>>>> course if
> >>>>>>>>>>>> every record is failing, then you probably do want to give up.
> >>>>>>>>>>>>
> >>>>>>>>>>>> On Fri, 2 Jun 2017 at 07:56 Matthias J. Sax <
> >>>>>>>>>>>> matth...@confluent.io>
> >>>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>
> >>>>>>>>>>> First a meta comment. KIP discussion should take place on the
> dev
> >>>>>>>>>>>> list
> >>>>>>>>>>>>
> >>>>>>>>>>>>> -- if user list is cc'ed please make sure to reply to both
> >>>>>>>>>>>>> lists.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks.
> >>>>>>>>>>>> Thanks for making the scope of the KIP clear. Makes a lot of
> >>>>>>>>>>>> sense to
> >>>>>>>>>>>>
> >>>>>>>>>>>>> focus on deserialization exceptions for now.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> With regard to corrupted state stores, would it make sense to
> >>>>>>>>>>>>> fail a
> >>>>>>>>>>>>> task and wipe out the store to repair it via recreation from
> the
> >>>>>>>>>>>>> changelog? That's of course a quite advance pattern, but I
> want
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>> bring
> >>>>>>>>>>>>> it up to design the first step in a way such that we can get
> >>>>>>>>>>>>> there (if
> >>>>>>>>>>>>> we think it's a reasonable idea).
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I also want to comment about fail fast vs making progress. I
> >>>>>>>>>>>>> think that
> >>>>>>>>>>>>> fail-fast must not always be the best option. The scenario I
> >>>>>>>>>>>>> have in
> >>>>>>>>>>>>> mind is like this: you got a bunch of producers that feed the
> >>>>>>>>>>>>> Streams
> >>>>>>>>>>>>> input topic. Most producers work find, but maybe one producer
> >>>>>>>>>>>>> miss
> >>>>>>>>>>>>> behaves and the data it writes is corrupted. You might not
> even
> >>>>>>>>>>>>> be able
> >>>>>>>>>>>>> to recover this lost data at any point -- thus, there is no
> >>>>>>>>>>>>> reason to
> >>>>>>>>>>>>> stop processing but you just skip over those records. Of
> >>>>>>>>>>>>> course, you
> >>>>>>>>>>>>> need to fix the root cause, and thus you need to alert
> (either
> >>>>>>>>>>>>> via logs
> >>>>>>>>>>>>> of the exception handler directly) and you need to start to
> >>>>>>>>>>>>> investigate
> >>>>>>>>>>>>> to find the bad producer, shut it down and fix it.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Here the dead letter queue comes into place. From my
> >>>>>>>>>>>>> understanding, the
> >>>>>>>>>>>>> purpose of this feature is solely enable post debugging. I
> don't
> >>>>>>>>>>>>> think
> >>>>>>>>>>>>> those record would be fed back at any point in time (so I
> don't
> >>>>>>>>>>>>> see any
> >>>>>>>>>>>>> ordering issue -- a skipped record, with this regard, is just
> >>>>>>>>>>>>> "fully
> >>>>>>>>>>>>> processed"). Thus, the dead letter queue should actually
> encode
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> original records metadata (topic, partition offset etc) to
> >>>>>>>>>>>>> enable
> >>>>>>>>>>>>> such
> >>>>>>>>>>>>> debugging. I guess, this might also be possible if you just
> log
> >>>>>>>>>>>>> the bad
> >>>>>>>>>>>>> records, but it would be harder to access (you first must
> find
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> Streams instance that did write the log and extract the
> >>>>>>>>>>>>> information
> >>>>>>>>>>>>> from
> >>>>>>>>>>>>> there). Reading it from topic is much simpler.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> I also want to mention the following. Assume you have such a
> >>>>>>>>>>>>> topic with
> >>>>>>>>>>>>> some bad records and some good records. If we always
> fail-fast,
> >>>>>>>>>>>>> it's
> >>>>>>>>>>>>> going to be super hard to process the good data. You would
> need
> >>>>>>>>>>>>> to
> >>>>>>>>>>>>> write
> >>>>>>>>>>>>> an extra app that copied the data into a new topic filtering
> >>>>>>>>>>>>> out the
> >>>>>>>>>>>>> bad
> >>>>>>>>>>>>> records (or apply the map() workaround withing stream). So I
> >>>>>>>>>>>>> don't
> >>>>>>>>>>>>> think
> >>>>>>>>>>>>> that failing fast is most likely the best option in
> production
> >>>>>>>>>>>>> is
> >>>>>>>>>>>>> necessarily, true.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Or do you think there are scenarios, for which you can
> recover
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> corrupted records successfully? And even if this is
> possible, it
> >>>>>>>>>>>>> might
> >>>>>>>>>>>>> be a case for reprocessing instead of failing the whole
> >>>>>>>>>>>>> application?
> >>>>>>>>>>>>> Also, if you think you can "repair" a corrupted record,
> should
> >>>>>>>>>>>>> the
> >>>>>>>>>>>>> handler allow to return a "fixed" record? This would solve
> the
> >>>>>>>>>>>>> ordering
> >>>>>>>>>>>>> problem.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> -Matthias
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> On 5/30/17 1:47 AM, Michael Noll wrote:
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> Thanks for your work on this KIP, Eno -- much appreciated!
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> - I think it would help to improve the KIP by adding an
> >>>>>>>>>>>>>> end-to-end
> >>>>>>>>>>>>>> code
> >>>>>>>>>>>>>> example that demonstrates, with the DSL and with the
> Processor
> >>>>>>>>>>>>>> API,
> >>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> user would write a simple application that would then be
> >>>>>>>>>>>>>> augmented
> >>>>>>>>>>>>>> with
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> proposed KIP changes to handle exceptions.  It should also
> >>>>>>>>>>>>>> become much
> >>>>>>>>>>>>>> clearer then that e.g. the KIP would lead to different code
> >>>>>>>>>>>>>> paths for
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>
> >>>>>>>>>>>> happy case and any failure scenarios.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> - Do we have sufficient information available to make
> informed
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> decisions
> >>>>>>>>>>>>>
> >>>>>>>>>>>> on
> >>>>>>>>>>>>
> >>>>>>>>>>>>> what to do next?  For example, do we know in which part of
> the
> >>>>>>>>>>>>>> topology
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> record failed? `ConsumerRecord` gives us access to topic,
> >>>>>>>>>>>>>> partition,
> >>>>>>>>>>>>>> offset, timestamp, etc., but what about topology-related
> >>>>>>>>>>>>>> information
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> (e.g.
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> what is the associated state store, if any)?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> - Only partly on-topic for the scope of this KIP, but this
> is
> >>>>>>>>>>>>>> about
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> bigger picture: This KIP would give users the option to send
> >>>>>>>>>>>>>> corrupted
> >>>>>>>>>>>>>> records to dead letter queue (quarantine topic).  But, what
> >>>>>>>>>>>>>> pattern
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> would
> >>>>>>>>>>>>>
> >>>>>>>>>>>> we advocate to process such a dead letter queue then, e.g.
> how to
> >>>>>>>>>>>> allow
> >>>>>>>>>>>>
> >>>>>>>>>>>>> for
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> retries with backoff ("If the first record in the dead letter
> >>>>>>>>>>>>>> queue
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> fails
> >>>>>>>>>>>>>
> >>>>>>>>>>>> again, then try the second record for the time being and go
> back
> >>>>>>>>>>>> to the
> >>>>>>>>>>>>
> >>>>>>>>>>>>> first record at a later time").  Jay and Jan already alluded
> to
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> ordering
> >>>>>>>>>>>>>
> >>>>>>>>>>>> problems that will be caused by dead letter queues. As I said,
> >>>>>>>>>>>> retries
> >>>>>>>>>>>>
> >>>>>>>>>>>>> might be out of scope but perhaps the implications should be
> >>>>>>>>>>>>>> considered
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> if
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> possible?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Also, I wrote the text below before reaching the point in
> the
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> conversation
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> that this KIP's scope will be limited to exceptions in the
> >>>>>>>>>>>>>> category of
> >>>>>>>>>>>>>> poison pills / deserialization errors.  But since Jay
> brought
> >>>>>>>>>>>>>> up
> >>>>>>>>>>>>>> user
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> code
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> errors again, I decided to include it again.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> ----------------------------snip--------------------------
> --
> >>>>>>>>>>>>>> A meta comment: I am not sure about this split between the
> >>>>>>>>>>>>>> code for
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>>> happy path (e.g. map/filter/... in the DSL) from the failure
> >>>>>>>>>>>>>> path
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> (using
> >>>>>>>>>>>>>
> >>>>>>>>>>>> exception handlers).  In Scala, for example, we can do:
> >>>>>>>>>>>>
> >>>>>>>>>>>>>       scala> val computation = scala.util.Try(1 / 0)
> >>>>>>>>>>>>>>       computation: scala.util.Try[Int] =
> >>>>>>>>>>>>>> Failure(java.lang.ArithmeticException: / by zero)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>       scala> computation.getOrElse(42)
> >>>>>>>>>>>>>>       res2: Int = 42
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Another example with Scala's pattern matching, which is
> >>>>>>>>>>>>>> similar to
> >>>>>>>>>>>>>> `KStream#branch()`:
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>       computation match {
> >>>>>>>>>>>>>>         case scala.util.Success(x) => x * 5
> >>>>>>>>>>>>>>         case scala.util.Failure(_) => 42
> >>>>>>>>>>>>>>       }
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> (The above isn't the most idiomatic way to handle this in
> >>>>>>>>>>>>>> Scala,
> >>>>>>>>>>>>>> but
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> that's
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> not the point I'm trying to make here.)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> Hence the question I'm raising here is: Do we want to have
> an
> >>>>>>>>>>>>>> API
> >>>>>>>>>>>>>> where
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> you
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> code "the happy path", and then have a different code path
> for
> >>>>>>>>>>>>>> failures
> >>>>>>>>>>>>>> (using exceptions and handlers);  or should we treat both
> >>>>>>>>>>>>>> Success and
> >>>>>>>>>>>>>> Failure in the same way?
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> I think the failure/exception handling approach (as
> proposed in
> >>>>>>>>>>>>>> this
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> KIP)
> >>>>>>>>>>>>>
> >>>>>>>>>>>> is well-suited for errors in the category of deserialization
> >>>>>>>>>>>> problems
> >>>>>>>>>>>>
> >>>>>>>>>>>>> aka
> >>>>>>>>>>>>>
> >>>>>>>>>>>> poison pills, partly because the (default) serdes are defined
> >>>>>>>>>>>> through
> >>>>>>>>>>>>
> >>>>>>>>>>>>> configuration (explicit serdes however are defined through
> API
> >>>>>>>>>>>>>> calls).
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> However, I'm not yet convinced that the failure/exception
> >>>>>>>>>>>>>> handling
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> approach
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> is the best idea for user code exceptions, e.g. if you fail
> to
> >>>>>>>>>>>>>> guard
> >>>>>>>>>>>>>> against NPE in your lambdas or divide a number by zero.
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>       scala> val stream = Seq(1, 2, 3, 4, 5)
> >>>>>>>>>>>>>>       stream: Seq[Int] = List(1, 2, 3, 4, 5)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>       // Here: Fallback to a sane default when encountering
> >>>>>>>>>>>>>> failed
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> records
> >>>>>>>>>>>>>
> >>>>>>>>>>>>       scala>     stream.map(x => Try(1/(3 - x))).flatMap(t =>
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Seq(t.getOrElse(42)))
> >>>>>>>>>>>>>>       res19: Seq[Int] = List(0, 1, 42, -1, 0)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>       // Here: Skip over failed records
> >>>>>>>>>>>>>>       scala> stream.map(x => Try(1/(3 - x))).collect{ case
> >>>>>>>>>>>>>> Success(s)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> => s
> >>>>>>>>>>>>>
> >>>>>>>>>>>> }
> >>>>>>>>>>>>
> >>>>>>>>>>>>>       res20: Seq[Int] = List(0, 1, -1, 0)
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> The above is more natural to me than using error handlers to
> >>>>>>>>>>>>>> define
> >>>>>>>>>>>>>> how
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> to
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> deal with failed records (here, the value `3` causes an
> >>>>>>>>>>>>>> arithmetic
> >>>>>>>>>>>>>> exception).  Again, it might help the KIP if we added an
> >>>>>>>>>>>>>> end-to-end
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> example
> >>>>>>>>>>>>>
> >>>>>>>>>>>>> for such user code errors.
> >>>>>>>>>>>>>> ----------------------------snip--------------------------
> --
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> On Tue, May 30, 2017 at 9:24 AM, Jan Filipiak <
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>> jan.filip...@trivago.com>
> >>>>>>>>>>>>>
> >>>>>>>>>>>> wrote:
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Hi Jay,
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> Eno mentioned that he will narrow down the scope to only
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> ConsumerRecord
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> deserialisation.
> >>>>>>>>>>>>
> >>>>>>>>>>>>> I am working with Database Changelogs only. I would really
> not
> >>>>>>>>>>>>>>> like
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> see
> >>>>>>>>>>>>>> a dead letter queue or something
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> similliar. how am I expected to get these back in order.
> Just
> >>>>>>>>>>>>>>> grind
> >>>>>>>>>>>>>>> to
> >>>>>>>>>>>>>>> hold an call me on the weekend. I'll fix it
> >>>>>>>>>>>>>>> then in a few minutes rather spend 2 weeks ordering dead
> >>>>>>>>>>>>>>> letters.
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> (where
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>> reprocessing might be even the faster fix)
> >>>>>>>>>>>>
> >>>>>>>>>>>>> Best Jan
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> On 29.05.2017 20:23, Jay Kreps wrote:
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>       - I think we should hold off on retries unless we
> have
> >>>>>>>>>>>>>>> worked
> >>>>>>>>>>>>>>> out
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the
> >>>>>>>>>>>>
> >>>>>>>>>>>>>       full usage pattern, people can always implement their
> >>>>>>>>>>>>>> own. I
> >>>>>>>>>>>>>>
> >>>>>>>>>>>>>>> think
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> the idea
> >>>>>>>>>>>>
> >>>>>>>>>>>>>       is that you send the message to some kind of dead
> >>>>>>>>>>>>>>>> letter queue
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> and
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> then
> >>>>>>>>>>>>
> >>>>>>>>>>>>>       replay these later. This obviously destroys all
> semantic
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>> guarantees
> >>>>>>>>>>>>>>>
> >>>>>>>>>>>>>> we are
> >>>>>>>>>>>>
> >>>>>>>>>>>>>       working hard to provide right now, which may be okay.
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>>>>>>>>>>>>>>>
> >>
> >
> >
> > --
> > -- Guozhang
>
>


-- 
-- Guozhang

Reply via email to