Alaa,

what is the status of this KIP? Are you still working on it?

It would also be good to get feedback from other about the open questions.


-Matthias

On 9/17/19 5:33 PM, Matthias J. Sax wrote:
> Thanks for picking up this KIP.
> 
> My personal take about repartition topics is, that it seems to be ok to
> apply the handler for those, too (in addition to output topics). It
> seems to be more flexible and also simplifies the code. In the end, the
> topic name is passed via `ProducerRecord` into the handler and thus
> users can decide on a per-topic basis what to do.
> 
> About stores and changelogs: yes, serialization happens first. Hence,
> when we put() into RocksDB and also send() to the changelog topic (in
> that case we use `ByteArraySerializer`) no serialization error should
> happen (if there would have been a problem, it would have happened earlier).
> 
> However, in KIP-210, we did not consider the case that a send() might
> fail for a changelog topic while the put() into the store was already
> successfully applies. Hence, it's possible atm, to skip a failed write
> into the changelog topic, even if the put() into the store was
> successful. This seems to be a bug to me and we might want to create a
> separate Jira for it -- it's related to this KIP but should not be
> mangled into the KIP IMHO.
> 
> For the KIP itself, what we _could_ do is, to apply the handler to the
> serialization that happens before the data is put() into the store.
> However, I am not sure if we should allow this -- atm, I tend to think
> we should not allow it and exclude store serialization for the handler.
> 
> 
> 
> -Matthias
> 
> On 9/10/19 1:46 AM, Alaa Zbair wrote:
>> Hi,
>>
>> I have checked the KIP-399 and the discussion and also KIP-210.
>>
>> So the question we need to answer is whether it's okay to also skip
>> writing the record in the internal topics, the current implementation of
>> 'ProductionExceptionHandler' is applied for all topics and if we decided
>> to keep it that way, how to ensure that there will be no divergence in
>> local store and changelog topic ?
>>
>> I would like to get input from others on what they think about this.
>>
>> There is a point that I don't understand which is: why in case of a
>> serialization error do we have the choice of either skipping it or
>> putting in the store ? shouldn't the record be correctly serialized
>> before putting it into the store ?
>>
>> On 13/12/2018 14:13, Matthias J. Sax wrote:
>>> For store updates, records are first serialized and afterwards put into
>>> the store and written into the changelog topic.
>>>
>>> In the current implementation, if the send() into the changelog topic
>>> produces an error and the handler skips over it, the local store content
>>> and the changelog topic diverge. This seems to be a correctness issue.
>>>
>>> For serialization error, it would not happen that store and changelog
>>> diverge, because serialization happens before and put/send. Thus, with
>>> this KIP we could skip both put() and send(). However, I am still
>>> wondering, if it would be ok to skip a store update for this case? (Btw:
>>> the current PR does not address this atm, and a serialization error for
>>> a store write would not be covered but kill the instance).
>>>
>>> IIRC, the original idea of the KIP was to allow skipping over record for
>>> output topics only. That's why I am wondering if it's ok to allow
>>> skipper over record in repartitions topics, too.
>>>
>>> In the end, it's some data loss for all 3 cases, so maybe it's ok to
>>> allow skipping for all 3 cases. However, we should not allow that local
>>> store and changelog topic diverge IMHO (what might been an orthogonal
>>> bug thought).
>>>
>>> I also don't have an answer or preference. Just think, it's important to
>>> touch on those cases and get input how people think about it.
>>>
>>>
>>> -Matthias
>>>
>>>
>>>
>>> On 12/11/18 11:43 AM, Kamal Chandraprakash wrote:
>>>> Matthias,
>>>>
>>>> For changelog topics, I think it does not make sense to allow skipping
>>>> records if serialization fails? For internal repartitions topics, I am
>>>> not sure if we should allow it or not. Would you agree with this? We
>>>> should discuss the implication to derive a sound design.
>>>>
>>>> Can you explain the issue that happens when records are skipped to
>>>> changelog / internal-repartition topics ? So, that I can look into it.
>>>>
>>>> On Fri, Dec 7, 2018 at 12:07 AM Matthias J. Sax <matth...@confluent.io>
>>>> wrote:
>>>>
>>>>>>> To accept different types of records from multiple topologies, I
>>>>>>> have to
>>>>>>> define the ProducerRecord without generics.
>>>>> Yes. It does make sense. My point was, that the KIP should
>>>>> mention/explain this explicitly to allow other not familiar with the
>>>>> code base to understand it more easily :)
>>>>>
>>>>>
>>>>>
>>>>> About `ClassCastException`: seems to be an implementation detail. No
>>>>> need to make it part of the KIP discussion.
>>>>>
>>>>>
>>>>>
>>>>> One more thing that came to my mind. We use the `RecordCollector` to
>>>>> write into all topics, ie, user output topics and internal repartition
>>>>> and changelog topics.
>>>>>
>>>>> For changelog topics, I think it does not make sense to allow skipping
>>>>> records if serialization fails? For internal repartitions topics, I am
>>>>> not sure if we should allow it or not. Would you agree with this? We
>>>>> should discuss the implication to derive a sound design.
>>>>>
>>>>> I was also just double checking the code, and it seems that the current
>>>>> `ProductionExceptionHandler` is applied for all topics. This seems
>>>>> to be
>>>>> incorrect to me. Seems we missed this case when doing KIP-210? (Or did
>>>>> we discuss this and I cannot remember? Might be worth to double check.)
>>>>>
>>>>> Last thought: of course, the handler will know which topic is affected
>>>>> and can provide a corresponding implementation. Was just wondering
>>>>> if we
>>>>> should be more strict?
>>>>>
>>>>>
>>>>> -Matthias
>>>>>
>>>>> On 12/6/18 10:01 AM, Kamal Chandraprakash wrote:
>>>>>> Matt,
>>>>>>      I agree with Matthias on not to altering the serializer as
>>>>>> it's used
>>>>> by
>>>>>> multiple components.
>>>>>>
>>>>>> Matthias,
>>>>>>
>>>>>>   - the proposed method accepts a `ProducerRecord` -- it might be
>>>>>> good to
>>>>>> explain why this cannot be done in a type safe way (ie, missing
>>>>>> generics)
>>>>>>
>>>>>> To accept different types of records from multiple topologies, I
>>>>>> have to
>>>>>> define the ProducerRecord without generics.
>>>>>>
>>>>>> - `AlwaysProductionExceptionHandler` ->
>>>>>> `AlwaysContinueProductionExceptionHandler`
>>>>>>
>>>>>> Updated the typo error in KIP.
>>>>>>
>>>>>>   - `DefaultProductionExceptionHandler` is not mentioned
>>>>>>
>>>>>> The `handleSerializationException` method in the
>>>>>> `ProductionExceptionHandler` interface will have default
>>>>>> implementation
>>>>>> that is set to FAIL by default.
>>>>>> This is done to avoid any changes in the user implementation. So, I
>>>>> didn't
>>>>>> mentioned the `DefaultProductionExceptionHandler` class. Updated
>>>>>> the KIP.
>>>>>>
>>>>>> - Why do you distinguish between `ClassCastException` and "any other
>>>>>> unchecked exception? Both second case seems to include the first one?
>>>>>>
>>>>>> In SinkNode.java#93
>>>>>> <
>>>>> https://github.com/apache/kafka/blob/87cc31c4e7ea36e7e832a1d02d71480a91a75293/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java#L93
>>>>>
>>>>>> on
>>>>>> hitting `ClassCastException`, we are halting the streams as it's a
>>>>>> fatal
>>>>>> error.
>>>>>> To keep the original behavior, I've to distinguish the exceptions.
>>>>>>
>>>>>>
>>>>>> On Thu, Dec 6, 2018 at 10:44 PM Matthias J. Sax
>>>>>> <matth...@confluent.io>
>>>>>> wrote:
>>>>>>
>>>>>>> Well, that's exactly the point. The serializer should not be altered
>>>>>>> IMHO because this would have impact on other components. Also, for
>>>>>>> applications that use KafkaProducer directly, they can catch any
>>>>>>> serialization exception and react to it. Hence, I don't don't see a
>>>>>>> reason to change the serializer interface.
>>>>>>>
>>>>>>> Instead, it seems better to solve this issue in Streams by
>>>>>>> allowing to
>>>>>>> skip over a record for this case.
>>>>>>>
>>>>>>> Some more comments on the KIP:
>>>>>>>
>>>>>>>   - the proposed method accepts a `ProducerRecord` -- it might be
>>>>>>> good to
>>>>>>> explain why this cannot be done in a type safe way (ie, missing
>>>>> generics)
>>>>>>>   - `AlwaysProductionExceptionHandler` ->
>>>>>>> `AlwaysContinueProductionExceptionHandler`
>>>>>>>
>>>>>>>   - `DefaultProductionExceptionHandler` is not mentioned
>>>>>>>
>>>>>>>   - Why do you distinguish between `ClassCastException` and "any
>>>>>>> other
>>>>>>> unchecked exception? Both second case seems to include the first one?
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> -Matthias
>>>>>>>
>>>>>>> On 12/6/18 8:35 AM, Matt Farmer wrote:
>>>>>>>> Ah, good point.
>>>>>>>>
>>>>>>>> Should we consider altering the serializer interface to permit not
>>>>>>> sending
>>>>>>>> the record?
>>>>>>>>
>>>>>>>> On Wed, Dec 5, 2018 at 9:23 PM Kamal Chandraprakash <
>>>>>>>> kamal.chandraprak...@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Matt,
>>>>>>>>>
>>>>>>>>>      That's a good point. If these cases are handled in the
>>>>>>>>> serializer,
>>>>>>> then
>>>>>>>>> one cannot continue the stream processing by skipping the record.
>>>>>>>>> To continue, you may have to send a empty record serialized
>>>>>>>>> key/value
>>>>>>> (new
>>>>>>>>> byte[0]) to the downstream on hitting the error which may cause
>>>>>>> un-intended
>>>>>>>>> results.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Wed, Dec 5, 2018 at 8:41 PM Matt Farmer <m...@frmr.me> wrote:
>>>>>>>>>
>>>>>>>>>> Hi there,
>>>>>>>>>>
>>>>>>>>>> Thanks for this KIP.
>>>>>>>>>>
>>>>>>>>>> What’s the thinking behind doing this in
>>>>>>>>>> ProductionExceptionHandler
>>>>>>>>> versus
>>>>>>>>>> handling these cases in your serializer implementation?
>>>>>>>>>>
>>>>>>>>>> On Mon, Dec 3, 2018 at 1:09 AM Kamal Chandraprakash <
>>>>>>>>>> kamal.chandraprak...@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hello dev,
>>>>>>>>>>>
>>>>>>>>>>>    I hope to initiate the discussion for KIP-399: Extend
>>>>>>>>>>> ProductionExceptionHandler to cover serialization exceptions.
>>>>>>>>>>>
>>>>>>>>>>> KIP: <
>>>>>>>>>>>
>>>>>>>>>>>
>>>>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-399%3A+Extend+ProductionExceptionHandler+to+cover+serialization+exceptions
>>>>>
>>>>>>>>>>> JIRA: https://issues.apache.org/jira/browse/KAFKA-7499
>>>>>>>>>>>
>>>>>>>>>>> All feedbacks will be highly appreciated.
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Kamal Chandraprakash
>>>>>>>>>>>
>>>>>>>
>>>>>
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to