Alaa, what is the status of this KIP? Are you still working on it?
It would also be good to get feedback from other about the open questions. -Matthias On 9/17/19 5:33 PM, Matthias J. Sax wrote: > Thanks for picking up this KIP. > > My personal take about repartition topics is, that it seems to be ok to > apply the handler for those, too (in addition to output topics). It > seems to be more flexible and also simplifies the code. In the end, the > topic name is passed via `ProducerRecord` into the handler and thus > users can decide on a per-topic basis what to do. > > About stores and changelogs: yes, serialization happens first. Hence, > when we put() into RocksDB and also send() to the changelog topic (in > that case we use `ByteArraySerializer`) no serialization error should > happen (if there would have been a problem, it would have happened earlier). > > However, in KIP-210, we did not consider the case that a send() might > fail for a changelog topic while the put() into the store was already > successfully applies. Hence, it's possible atm, to skip a failed write > into the changelog topic, even if the put() into the store was > successful. This seems to be a bug to me and we might want to create a > separate Jira for it -- it's related to this KIP but should not be > mangled into the KIP IMHO. > > For the KIP itself, what we _could_ do is, to apply the handler to the > serialization that happens before the data is put() into the store. > However, I am not sure if we should allow this -- atm, I tend to think > we should not allow it and exclude store serialization for the handler. > > > > -Matthias > > On 9/10/19 1:46 AM, Alaa Zbair wrote: >> Hi, >> >> I have checked the KIP-399 and the discussion and also KIP-210. >> >> So the question we need to answer is whether it's okay to also skip >> writing the record in the internal topics, the current implementation of >> 'ProductionExceptionHandler' is applied for all topics and if we decided >> to keep it that way, how to ensure that there will be no divergence in >> local store and changelog topic ? >> >> I would like to get input from others on what they think about this. >> >> There is a point that I don't understand which is: why in case of a >> serialization error do we have the choice of either skipping it or >> putting in the store ? shouldn't the record be correctly serialized >> before putting it into the store ? >> >> On 13/12/2018 14:13, Matthias J. Sax wrote: >>> For store updates, records are first serialized and afterwards put into >>> the store and written into the changelog topic. >>> >>> In the current implementation, if the send() into the changelog topic >>> produces an error and the handler skips over it, the local store content >>> and the changelog topic diverge. This seems to be a correctness issue. >>> >>> For serialization error, it would not happen that store and changelog >>> diverge, because serialization happens before and put/send. Thus, with >>> this KIP we could skip both put() and send(). However, I am still >>> wondering, if it would be ok to skip a store update for this case? (Btw: >>> the current PR does not address this atm, and a serialization error for >>> a store write would not be covered but kill the instance). >>> >>> IIRC, the original idea of the KIP was to allow skipping over record for >>> output topics only. That's why I am wondering if it's ok to allow >>> skipper over record in repartitions topics, too. >>> >>> In the end, it's some data loss for all 3 cases, so maybe it's ok to >>> allow skipping for all 3 cases. However, we should not allow that local >>> store and changelog topic diverge IMHO (what might been an orthogonal >>> bug thought). >>> >>> I also don't have an answer or preference. Just think, it's important to >>> touch on those cases and get input how people think about it. >>> >>> >>> -Matthias >>> >>> >>> >>> On 12/11/18 11:43 AM, Kamal Chandraprakash wrote: >>>> Matthias, >>>> >>>> For changelog topics, I think it does not make sense to allow skipping >>>> records if serialization fails? For internal repartitions topics, I am >>>> not sure if we should allow it or not. Would you agree with this? We >>>> should discuss the implication to derive a sound design. >>>> >>>> Can you explain the issue that happens when records are skipped to >>>> changelog / internal-repartition topics ? So, that I can look into it. >>>> >>>> On Fri, Dec 7, 2018 at 12:07 AM Matthias J. Sax <matth...@confluent.io> >>>> wrote: >>>> >>>>>>> To accept different types of records from multiple topologies, I >>>>>>> have to >>>>>>> define the ProducerRecord without generics. >>>>> Yes. It does make sense. My point was, that the KIP should >>>>> mention/explain this explicitly to allow other not familiar with the >>>>> code base to understand it more easily :) >>>>> >>>>> >>>>> >>>>> About `ClassCastException`: seems to be an implementation detail. No >>>>> need to make it part of the KIP discussion. >>>>> >>>>> >>>>> >>>>> One more thing that came to my mind. We use the `RecordCollector` to >>>>> write into all topics, ie, user output topics and internal repartition >>>>> and changelog topics. >>>>> >>>>> For changelog topics, I think it does not make sense to allow skipping >>>>> records if serialization fails? For internal repartitions topics, I am >>>>> not sure if we should allow it or not. Would you agree with this? We >>>>> should discuss the implication to derive a sound design. >>>>> >>>>> I was also just double checking the code, and it seems that the current >>>>> `ProductionExceptionHandler` is applied for all topics. This seems >>>>> to be >>>>> incorrect to me. Seems we missed this case when doing KIP-210? (Or did >>>>> we discuss this and I cannot remember? Might be worth to double check.) >>>>> >>>>> Last thought: of course, the handler will know which topic is affected >>>>> and can provide a corresponding implementation. Was just wondering >>>>> if we >>>>> should be more strict? >>>>> >>>>> >>>>> -Matthias >>>>> >>>>> On 12/6/18 10:01 AM, Kamal Chandraprakash wrote: >>>>>> Matt, >>>>>> I agree with Matthias on not to altering the serializer as >>>>>> it's used >>>>> by >>>>>> multiple components. >>>>>> >>>>>> Matthias, >>>>>> >>>>>> - the proposed method accepts a `ProducerRecord` -- it might be >>>>>> good to >>>>>> explain why this cannot be done in a type safe way (ie, missing >>>>>> generics) >>>>>> >>>>>> To accept different types of records from multiple topologies, I >>>>>> have to >>>>>> define the ProducerRecord without generics. >>>>>> >>>>>> - `AlwaysProductionExceptionHandler` -> >>>>>> `AlwaysContinueProductionExceptionHandler` >>>>>> >>>>>> Updated the typo error in KIP. >>>>>> >>>>>> - `DefaultProductionExceptionHandler` is not mentioned >>>>>> >>>>>> The `handleSerializationException` method in the >>>>>> `ProductionExceptionHandler` interface will have default >>>>>> implementation >>>>>> that is set to FAIL by default. >>>>>> This is done to avoid any changes in the user implementation. So, I >>>>> didn't >>>>>> mentioned the `DefaultProductionExceptionHandler` class. Updated >>>>>> the KIP. >>>>>> >>>>>> - Why do you distinguish between `ClassCastException` and "any other >>>>>> unchecked exception? Both second case seems to include the first one? >>>>>> >>>>>> In SinkNode.java#93 >>>>>> < >>>>> https://github.com/apache/kafka/blob/87cc31c4e7ea36e7e832a1d02d71480a91a75293/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java#L93 >>>>> >>>>>> on >>>>>> hitting `ClassCastException`, we are halting the streams as it's a >>>>>> fatal >>>>>> error. >>>>>> To keep the original behavior, I've to distinguish the exceptions. >>>>>> >>>>>> >>>>>> On Thu, Dec 6, 2018 at 10:44 PM Matthias J. Sax >>>>>> <matth...@confluent.io> >>>>>> wrote: >>>>>> >>>>>>> Well, that's exactly the point. The serializer should not be altered >>>>>>> IMHO because this would have impact on other components. Also, for >>>>>>> applications that use KafkaProducer directly, they can catch any >>>>>>> serialization exception and react to it. Hence, I don't don't see a >>>>>>> reason to change the serializer interface. >>>>>>> >>>>>>> Instead, it seems better to solve this issue in Streams by >>>>>>> allowing to >>>>>>> skip over a record for this case. >>>>>>> >>>>>>> Some more comments on the KIP: >>>>>>> >>>>>>> - the proposed method accepts a `ProducerRecord` -- it might be >>>>>>> good to >>>>>>> explain why this cannot be done in a type safe way (ie, missing >>>>> generics) >>>>>>> - `AlwaysProductionExceptionHandler` -> >>>>>>> `AlwaysContinueProductionExceptionHandler` >>>>>>> >>>>>>> - `DefaultProductionExceptionHandler` is not mentioned >>>>>>> >>>>>>> - Why do you distinguish between `ClassCastException` and "any >>>>>>> other >>>>>>> unchecked exception? Both second case seems to include the first one? >>>>>>> >>>>>>> >>>>>>> >>>>>>> -Matthias >>>>>>> >>>>>>> On 12/6/18 8:35 AM, Matt Farmer wrote: >>>>>>>> Ah, good point. >>>>>>>> >>>>>>>> Should we consider altering the serializer interface to permit not >>>>>>> sending >>>>>>>> the record? >>>>>>>> >>>>>>>> On Wed, Dec 5, 2018 at 9:23 PM Kamal Chandraprakash < >>>>>>>> kamal.chandraprak...@gmail.com> wrote: >>>>>>>> >>>>>>>>> Matt, >>>>>>>>> >>>>>>>>> That's a good point. If these cases are handled in the >>>>>>>>> serializer, >>>>>>> then >>>>>>>>> one cannot continue the stream processing by skipping the record. >>>>>>>>> To continue, you may have to send a empty record serialized >>>>>>>>> key/value >>>>>>> (new >>>>>>>>> byte[0]) to the downstream on hitting the error which may cause >>>>>>> un-intended >>>>>>>>> results. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Wed, Dec 5, 2018 at 8:41 PM Matt Farmer <m...@frmr.me> wrote: >>>>>>>>> >>>>>>>>>> Hi there, >>>>>>>>>> >>>>>>>>>> Thanks for this KIP. >>>>>>>>>> >>>>>>>>>> What’s the thinking behind doing this in >>>>>>>>>> ProductionExceptionHandler >>>>>>>>> versus >>>>>>>>>> handling these cases in your serializer implementation? >>>>>>>>>> >>>>>>>>>> On Mon, Dec 3, 2018 at 1:09 AM Kamal Chandraprakash < >>>>>>>>>> kamal.chandraprak...@gmail.com> wrote: >>>>>>>>>> >>>>>>>>>>> Hello dev, >>>>>>>>>>> >>>>>>>>>>> I hope to initiate the discussion for KIP-399: Extend >>>>>>>>>>> ProductionExceptionHandler to cover serialization exceptions. >>>>>>>>>>> >>>>>>>>>>> KIP: < >>>>>>>>>>> >>>>>>>>>>> >>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions >>>>> >>>>>>>>>>> JIRA: https://issues.apache.org/jira/browse/KAFKA-7499 >>>>>>>>>>> >>>>>>>>>>> All feedbacks will be highly appreciated. >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Kamal Chandraprakash >>>>>>>>>>> >>>>>>> >>>>> >> >
signature.asc
Description: OpenPGP digital signature