Hey Guys,

Thinking on this more. Given that we don't have a good grasp on this, and
it seems fairly complicated for now, I'm proposing that we hold off on all
this complex work, and just support the ability to drop messages for now.
This should be pretty straightforward to implement, and is generally
pretty useful, and not too invasive.

Does that sound cool with folks? Danny?

Cheers,
Chris

On 2/5/14 11:43 AM, "Chris Riccomini" <[email protected]> wrote:

>Hey Martin,
>
>I agree with you about ordering. It would be ideal to give the error()
>callback exactly when we would normally call process() on the message, but
>can't because of the serde error. I think this is do-able if we tweak
>things in SystemConsumers a bit.
>
>The problem I'm having trouble resolving is how to handle the
>MessageChooser. The MessageChooser chooses the processing order between
>partitions ("I have a message from stream/partition X, and
>stream/partition Y, which one do I process next?"). These choosers
>occasionally look at the actual message payload (e.g. The timestamp field
>of an incoming message) when doing the choosing. The chooser won't know
>how to handle an IncomingMessageEnvelope where the key and value are both
>byte[], when it's expecting Avro, ProtoBuf, JSON, etc. One solution I can
>think of would be to add an error() call to the MessageChooser interface.
>The other solution would be to just give the MessageChooser everything,
>and in cases where it is looking at the message payload, it just has to be
>careful. I think there might be other solutions here as well.
>
>More thought required.
>
>Cheers,
>Chris
>
>On 2/4/14 4:13 PM, "Martin Kleppmann" <[email protected]> wrote:
>
>>Ok -- I had originally misunderstood your ErrorTask proposal. The way you
>>described it looks good to me. I agree that a single error method, which
>>takes a generic error envelope object, is better than lots of different
>>methods.
>>
>>Rather than
>>
>>if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())
>>
>>could we just say this?
>>
>>if (envelope.getError() instanceof DeserializationError)
>>
>>Regarding ordering: my instinctive reaction is that it would be least
>>surprising if the error is received at the same point in the message
>>sequence as the message would have been received, had it not been an
>>error. But that may need some more thinking.
>>
>>Cheers,
>>Martin
>>
>>On 4 Feb 2014, at 22:23, Chris Riccomini <[email protected]> wrote:
>>> Hey Guys,
>>> 
>>> One other thing to consider here, that didn't strike me at first is how
>>>to
>>> handle ordering when a serialization error occurs.
>>> 
>>> Right now, the SystemConsumers class reads messages in, buffers them,
>>>and
>>> feeds them slowly to the MessageChooser. The MessageChooser takes only
>>> IncomingMessageEnvelope right now. So the question is, if we have a
>>>serde
>>> error, when does the StreamTask see it? It shouldn't see it before
>>>other
>>> messages for the same stream partition, since this would mean we've
>>>broken
>>> the ordering of the messages. Arguably, the MessageChooser should also
>>>get
>>> a chance to see it, to choose in what order to process it.
>>> 
>>> One could make the argument, though, that a message that can't be
>>> deserialized is invalid, and you're looking at potential data loss
>>>anyway,
>>> so ordering doesn't matter. I'm not sure if this assumption is true in
>>>all
>>> cases though.
>>> 
>>> I don't really have a good answer for this at the moment. I think we
>>> should think about it.
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 2/4/14 11:36 AM, "Danny Antonetti" <[email protected]>
>>>wrote:
>>> 
>>>> OK Great, that looks good.
>>>> 
>>>> I will look into this as I have time and get back to you with
>>>>questions.
>>>> 
>>>> 
>>>> Thanks for the help.
>>>> 
>>>> 
>>>> Danny
>>>> 
>>>> 
>>>> On Tue, Feb 4, 2014 at 11:29 AM, Chris Riccomini
>>>> <[email protected]>wrote:
>>>> 
>>>>> Hey Guys,
>>>>> 
>>>>> Here's a pseudo-code proposal for an error API.
>>>>> 
>>>>> interface ErrorTask {
>>>>>  public void error(ErrorEnvelope envelope, MessageCollector
>>>>>collector,
>>>>> TaskCoordinator coordinator);
>>>>> }
>>>>> 
>>>>> interface ErrorEnvelope {
>>>>>  ErrorCode getErrorCode();
>>>>>  Object getError();
>>>>> }
>>>>> 
>>>>> class DeserializationError {
>>>>>  byte[] getKey();
>>>>> 
>>>>>  byte[] getMessage();
>>>>>  boolean keyFailed();
>>>>>  boolean valueFailed();
>>>>> }
>>>>> 
>>>>> Then you could implement something like:
>>>>> 
>>>>> class MyStreamTask implements StreamTask, ErrorTask {
>>>>>  public void proces(Š) {
>>>>>    // do stuff
>>>>>  }
>>>>> 
>>>>>  public void error(ErrorEnvelope envelope, MessageCollector
>>>>>collector,
>>>>> TaskCoordinator coordinator) {
>>>>>    if 
>>>>>(DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())
>>>>> {
>>>>>      DeserializationError error = (DeserializationError)
>>>>> envelope.getError();
>>>>>      collector.send("kafka", "bad-data", error.getKey(),
>>>>> error.getValue());
>>>>>    }
>>>>>  }
>>>>> }
>>>>> 
>>>>> Arguably, we've just moved our big if-statement block from the
>>>>>process()
>>>>> method into the ErrorTask.error method, but the alternative, as I see
>>>>> it,
>>>>> is to have on call back per error message. This moves us into a state
>>>>> where every time we want to add a new callback, we break all existing
>>>>> implementations (since they must now implement the new method, as
>>>>> well). I
>>>>> chose the single callback and generic object approach because it
>>>>>matches
>>>>> what our process method does when multiple input streams are defined
>>>>>(if
>>>>> envelope is from stream A, do X, if envelope is from stream B, do Y),
>>>>> but
>>>>> I'm open to suggestions if people have them.
>>>>> 
>>>>> Regarding your JSON use case, yes, I think you'd be given the raw
>>>>>bytes
>>>>> for the key and message, and it'd be up to you to decide what to do
>>>>>with
>>>>> them. Samza allows you to define systems with no serde. Systems
>>>>>without
>>>>> a
>>>>> serde default to byte[] for both key and value, which means that you
>>>>> could
>>>>> define a system (or stream) with a pass-through serde, and send it
>>>>>the
>>>>> raw
>>>>> bytes for the key and message. Alternatively, you could try decoding
>>>>>the
>>>>> data using some other serde, posting the message, to a DB, logging
>>>>>the
>>>>> message in Log4j, discarding the message, etc.
>>>>> 
>>>>> Cheers,
>>>>> Chris
>>>>> 
>>>>> On 2/4/14 10:48 AM, "Danny Antonetti" <[email protected]>
>>>>>wrote:
>>>>> 
>>>>>> Hey Chris,
>>>>>> 
>>>>>> I am not sure I understand your suggestion about the ErrorTask.
>>>>>>What
>>>>>> would
>>>>>> this new functions method signature be?  I would assume it would
>>>>>>take
>>>>> in
>>>>>> the byte[] from the fromBytes function.  It seems like that ties the
>>>>> Serde
>>>>>> implementation to the StreamTask implementation.  Unless you are
>>>>>> suggesting
>>>>>> that it should be notified without giving the input byte[].
>>>>>> 
>>>>>> In our case we are using the JsonSerde, so the byte[] for the json
>>>>>>data
>>>>>> would be given to  onDeserializationError and then our task would
>>>>>>have
>>>>> to
>>>>>> decode the bytes?
>>>>>> 
>>>>>> Just to clarify my suggestion, I was not thinking that we would have
>>>>>>a
>>>>>> predefined set of behaviors.  I was thinking that I would have an
>>>>>> interface
>>>>>> (That would not be part of the StreamTask), maybe ErrorHandler.
>>>>>>With
>>>>> this
>>>>>> option there would be implementations of that interface for Dropping
>>>>> the
>>>>>> message, redirect to a new Queue, or drop it.  But this would
>>>>>>require
>>>>>> extra
>>>>>> configuration as you mentioned.
>>>>>> 
>>>>>> I am not invested in my approach, I just wanted to make sure that I
>>>>>> understand the suggestions/options.
>>>>>> 
>>>>>> 
>>>>>> Thanks
>>>>>> 
>>>>>> 
>>>>>> Danny
>>>>>> 
>>>>>> 
>>>>>> On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini
>>>>>> <[email protected]>wrote:
>>>>>> 
>>>>>>> Hey Danny,
>>>>>>> 
>>>>>>> I can think of two ways to accomplish this.
>>>>>>> 
>>>>>>> The first way is essentially what you've described. Allow the
>>>>> framework
>>>>>>> to
>>>>>>> have a pre-defined set of options defined via config (drop the
>>>>> message,
>>>>>>> re-route to another topic, etc).
>>>>>>> 
>>>>>>> The second way is to catch the serialization issues, and still pass
>>>>> the
>>>>>>> failed message to the StreamTask. The way in which the StreamTask
>>>>> would
>>>>>>> be
>>>>>>> notified of the failure is up for debate. One option would be to
>>>>> have a
>>>>>>> ErrorTask interface, which has an onDeserializationError call back.
>>>>> No
>>>>>>> new
>>>>>>> configuration would be required in this case--simply implementing
>>>>>>>the
>>>>>>> ErrorTask means you get notified of any serialization errors
>>>>>>>(rather
>>>>>>> than
>>>>>>> failing the container, which would be the default). Another option
>>>>> would
>>>>>>> be to have the IncomingMessageEnvelope have an error flag, which we
>>>>>>> could
>>>>>>> use to denote the serialization failure.
>>>>>>> 
>>>>>>> I like the second approach because it's more generic. Instead of
>>>>>>> pre-defining exact behavior when a failure occurs, it seems more
>>>>>>> flexible
>>>>>>> to let the StreamTask know, and let the developer decide what the
>>>>>>> appropriate action is. I hadn't even thought of the re-route case
>>>>>>>you
>>>>>>> brought up, and I'm sure there are many other possible actions that
>>>>>>> we're
>>>>>>> not thinking of right now. Of the second approach's potential
>>>>>>> implementation options, I favor the ErrorTask approach right now,
>>>>> but I
>>>>>>> haven't dug into it too much.
>>>>>>> 
>>>>>>> Regardless of which way we choose, I think the default should be to
>>>>> fail
>>>>>>> the container. This is the safest behavior, as it means there will
>>>>> be no
>>>>>>> data loss, and developers will be alerted of the serialization
>>>>>>>error.
>>>>>>> 
>>>>>>> As far as implementation goes, The four classes that are probably
>>>>>>>of
>>>>>>> most
>>>>>>> interest to you are SamzaContainer, TaskInstance, StreamConsumers,
>>>>> and
>>>>>>> SerdeManager. You'll need to catch the serde exceptions somewhere
>>>>>>>in
>>>>> the
>>>>>>> StreamConsumer or SerdeManager class, and implement your new logic
>>>>>>> there.
>>>>>>> I think the best approach is to read over these four classes, and
>>>>> ask us
>>>>>>> any questions you might have.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Chris
>>>>>>> 
>>>>>>> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]>
>>>>> wrote:
>>>>>>> 
>>>>>>>> We have discussed this, and it is something that we want to look
>>>>> into.
>>>>>>>> 
>>>>>>>> Do you have any thoughts on how to implement this feature?
>>>>>>>> I assume you would want the failure behavior to be configurable.
>>>>>>>> 
>>>>>>>> Like
>>>>>>>> Drop the message,
>>>>>>>> Send a message to a new queue, and drop.
>>>>>>>> Fail the container (is that ever appropriate?)
>>>>>>>> Anything else?
>>>>>>>> 
>>>>>>>> I am not familiar with this code base.
>>>>>>>> Do you have a suggestion on what classes I should be looking to
>>>>> modify?
>>>>>>>> 
>>>>>>>> Is there someone who I should bounce ideas off of?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Danny
>>>>>>>> 
>>>>>>>> 
>>>>>>>> 
>>>>>>>> On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]>
>>>>> wrote:
>>>>>>>> 
>>>>>>>>> It's not intentional, error handling just hasn't been added yet.
>>>>> If
>>>>>>>>> you're
>>>>>>>>> interested, we'd love to have the contribution.  In particular,
>>>>> take
>>>>>>> a
>>>>>>>>> look
>>>>>>>>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59),
>>>>> which
>>>>>>> also
>>>>>>>>> touches on how serdes should handle error conditions.
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Jakob
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
>>>>>>>>> <[email protected]>wrote:
>>>>>>>>> 
>>>>>>>>>> I am currently using the JsonSerde/JsonSerdeFactory classes for
>>>>>>>>> serializing
>>>>>>>>>> kafka messages.
>>>>>>>>>> 
>>>>>>>>>> I have noticed that if there is bad json input coming in through
>>>>>>>>> kafka,
>>>>>>>>> the
>>>>>>>>>> samza container seems to crash.
>>>>>>>>>> 
>>>>>>>>>> I was looking at JsonSerde.scala, which does not seem to have
>>>>> any
>>>>>>>>> error
>>>>>>>>>> handling.
>>>>>>>>>> 
>>>>>>>>>> So I was curious if this was intentional?
>>>>>>>>>> Or if there is a different way to handle these types of input
>>>>>>> errors?
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Thanks
>>>>>>>>>> 
>>>>>>>>>> 
>>>>>>>>>> Danny
>>>>>>>>>> 
>>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>
>

Reply via email to