Ok -- I had originally misunderstood your ErrorTask proposal. The way you 
described it looks good to me. I agree that a single error method, which takes 
a generic error envelope object, is better than lots of different methods.

Rather than

if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())

could we just say this?

if (envelope.getError() instanceof DeserializationError)

Regarding ordering: my instinctive reaction is that it would be least 
surprising if the error is received at the same point in the message sequence 
as the message would have been received, had it not been an error. But that may 
need some more thinking.

Cheers,
Martin

On 4 Feb 2014, at 22:23, Chris Riccomini <[email protected]> wrote:
> Hey Guys,
> 
> One other thing to consider here, that didn't strike me at first is how to
> handle ordering when a serialization error occurs.
> 
> Right now, the SystemConsumers class reads messages in, buffers them, and
> feeds them slowly to the MessageChooser. The MessageChooser takes only
> IncomingMessageEnvelope right now. So the question is, if we have a serde
> error, when does the StreamTask see it? It shouldn't see it before other
> messages for the same stream partition, since this would mean we've broken
> the ordering of the messages. Arguably, the MessageChooser should also get
> a chance to see it, to choose in what order to process it.
> 
> One could make the argument, though, that a message that can't be
> deserialized is invalid, and you're looking at potential data loss anyway,
> so ordering doesn't matter. I'm not sure if this assumption is true in all
> cases though.
> 
> I don't really have a good answer for this at the moment. I think we
> should think about it.
> 
> Cheers,
> Chris
> 
> On 2/4/14 11:36 AM, "Danny Antonetti" <[email protected]> wrote:
> 
>> OK Great, that looks good.
>> 
>> I will look into this as I have time and get back to you with questions.
>> 
>> 
>> Thanks for the help.
>> 
>> 
>> Danny
>> 
>> 
>> On Tue, Feb 4, 2014 at 11:29 AM, Chris Riccomini
>> <[email protected]>wrote:
>> 
>>> Hey Guys,
>>> 
>>> Here's a pseudo-code proposal for an error API.
>>> 
>>> interface ErrorTask {
>>>  public void error(ErrorEnvelope envelope, MessageCollector collector,
>>> TaskCoordinator coordinator);
>>> }
>>> 
>>> interface ErrorEnvelope {
>>>  ErrorCode getErrorCode();
>>>  Object getError();
>>> }
>>> 
>>> class DeserializationError {
>>>  byte[] getKey();
>>> 
>>>  byte[] getMessage();
>>>  boolean keyFailed();
>>>  boolean valueFailed();
>>> }
>>> 
>>> Then you could implement something like:
>>> 
>>> class MyStreamTask implements StreamTask, ErrorTask {
>>>  public void proces(Š) {
>>>    // do stuff
>>>  }
>>> 
>>>  public void error(ErrorEnvelope envelope, MessageCollector collector,
>>> TaskCoordinator coordinator) {
>>>    if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())
>>> {
>>>      DeserializationError error = (DeserializationError)
>>> envelope.getError();
>>>      collector.send("kafka", "bad-data", error.getKey(),
>>> error.getValue());
>>>    }
>>>  }
>>> }
>>> 
>>> Arguably, we've just moved our big if-statement block from the process()
>>> method into the ErrorTask.error method, but the alternative, as I see
>>> it,
>>> is to have on call back per error message. This moves us into a state
>>> where every time we want to add a new callback, we break all existing
>>> implementations (since they must now implement the new method, as
>>> well). I
>>> chose the single callback and generic object approach because it matches
>>> what our process method does when multiple input streams are defined (if
>>> envelope is from stream A, do X, if envelope is from stream B, do Y),
>>> but
>>> I'm open to suggestions if people have them.
>>> 
>>> Regarding your JSON use case, yes, I think you'd be given the raw bytes
>>> for the key and message, and it'd be up to you to decide what to do with
>>> them. Samza allows you to define systems with no serde. Systems without
>>> a
>>> serde default to byte[] for both key and value, which means that you
>>> could
>>> define a system (or stream) with a pass-through serde, and send it the
>>> raw
>>> bytes for the key and message. Alternatively, you could try decoding the
>>> data using some other serde, posting the message, to a DB, logging the
>>> message in Log4j, discarding the message, etc.
>>> 
>>> Cheers,
>>> Chris
>>> 
>>> On 2/4/14 10:48 AM, "Danny Antonetti" <[email protected]> wrote:
>>> 
>>>> Hey Chris,
>>>> 
>>>> I am not sure I understand your suggestion about the ErrorTask.  What
>>>> would
>>>> this new functions method signature be?  I would assume it would take
>>> in
>>>> the byte[] from the fromBytes function.  It seems like that ties the
>>> Serde
>>>> implementation to the StreamTask implementation.  Unless you are
>>>> suggesting
>>>> that it should be notified without giving the input byte[].
>>>> 
>>>> In our case we are using the JsonSerde, so the byte[] for the json data
>>>> would be given to  onDeserializationError and then our task would have
>>> to
>>>> decode the bytes?
>>>> 
>>>> Just to clarify my suggestion, I was not thinking that we would have a
>>>> predefined set of behaviors.  I was thinking that I would have an
>>>> interface
>>>> (That would not be part of the StreamTask), maybe ErrorHandler.  With
>>> this
>>>> option there would be implementations of that interface for Dropping
>>> the
>>>> message, redirect to a new Queue, or drop it.  But this would require
>>>> extra
>>>> configuration as you mentioned.
>>>> 
>>>> I am not invested in my approach, I just wanted to make sure that I
>>>> understand the suggestions/options.
>>>> 
>>>> 
>>>> Thanks
>>>> 
>>>> 
>>>> Danny
>>>> 
>>>> 
>>>> On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini
>>>> <[email protected]>wrote:
>>>> 
>>>>> Hey Danny,
>>>>> 
>>>>> I can think of two ways to accomplish this.
>>>>> 
>>>>> The first way is essentially what you've described. Allow the
>>> framework
>>>>> to
>>>>> have a pre-defined set of options defined via config (drop the
>>> message,
>>>>> re-route to another topic, etc).
>>>>> 
>>>>> The second way is to catch the serialization issues, and still pass
>>> the
>>>>> failed message to the StreamTask. The way in which the StreamTask
>>> would
>>>>> be
>>>>> notified of the failure is up for debate. One option would be to
>>> have a
>>>>> ErrorTask interface, which has an onDeserializationError call back.
>>> No
>>>>> new
>>>>> configuration would be required in this case--simply implementing the
>>>>> ErrorTask means you get notified of any serialization errors (rather
>>>>> than
>>>>> failing the container, which would be the default). Another option
>>> would
>>>>> be to have the IncomingMessageEnvelope have an error flag, which we
>>>>> could
>>>>> use to denote the serialization failure.
>>>>> 
>>>>> I like the second approach because it's more generic. Instead of
>>>>> pre-defining exact behavior when a failure occurs, it seems more
>>>>> flexible
>>>>> to let the StreamTask know, and let the developer decide what the
>>>>> appropriate action is. I hadn't even thought of the re-route case you
>>>>> brought up, and I'm sure there are many other possible actions that
>>>>> we're
>>>>> not thinking of right now. Of the second approach's potential
>>>>> implementation options, I favor the ErrorTask approach right now,
>>> but I
>>>>> haven't dug into it too much.
>>>>> 
>>>>> Regardless of which way we choose, I think the default should be to
>>> fail
>>>>> the container. This is the safest behavior, as it means there will
>>> be no
>>>>> data loss, and developers will be alerted of the serialization error.
>>>>> 
>>>>> As far as implementation goes, The four classes that are probably of
>>>>> most
>>>>> interest to you are SamzaContainer, TaskInstance, StreamConsumers,
>>> and
>>>>> SerdeManager. You'll need to catch the serde exceptions somewhere in
>>> the
>>>>> StreamConsumer or SerdeManager class, and implement your new logic
>>>>> there.
>>>>> I think the best approach is to read over these four classes, and
>>> ask us
>>>>> any questions you might have.
>>>>> 
>>>>> Cheers,
>>>>> Chris
>>>>> 
>>>>> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]>
>>> wrote:
>>>>> 
>>>>>> We have discussed this, and it is something that we want to look
>>> into.
>>>>>> 
>>>>>> Do you have any thoughts on how to implement this feature?
>>>>>> I assume you would want the failure behavior to be configurable.
>>>>>> 
>>>>>> Like
>>>>>> Drop the message,
>>>>>> Send a message to a new queue, and drop.
>>>>>> Fail the container (is that ever appropriate?)
>>>>>> Anything else?
>>>>>> 
>>>>>> I am not familiar with this code base.
>>>>>> Do you have a suggestion on what classes I should be looking to
>>> modify?
>>>>>> 
>>>>>> Is there someone who I should bounce ideas off of?
>>>>>> 
>>>>>> 
>>>>>> Thanks
>>>>>> 
>>>>>> 
>>>>>> Danny
>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]>
>>> wrote:
>>>>>> 
>>>>>>> It's not intentional, error handling just hasn't been added yet.
>>> If
>>>>>>> you're
>>>>>>> interested, we'd love to have the contribution.  In particular,
>>> take
>>>>> a
>>>>>>> look
>>>>>>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59),
>>> which
>>>>> also
>>>>>>> touches on how serdes should handle error conditions.
>>>>>>> 
>>>>>>> Thanks,
>>>>>>> Jakob
>>>>>>> 
>>>>>>> 
>>>>>>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
>>>>>>> <[email protected]>wrote:
>>>>>>> 
>>>>>>>> I am currently using the JsonSerde/JsonSerdeFactory classes for
>>>>>>> serializing
>>>>>>>> kafka messages.
>>>>>>>> 
>>>>>>>> I have noticed that if there is bad json input coming in through
>>>>>>> kafka,
>>>>>>> the
>>>>>>>> samza container seems to crash.
>>>>>>>> 
>>>>>>>> I was looking at JsonSerde.scala, which does not seem to have
>>> any
>>>>>>> error
>>>>>>>> handling.
>>>>>>>> 
>>>>>>>> So I was curious if this was intentional?
>>>>>>>> Or if there is a different way to handle these types of input
>>>>> errors?
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Thanks
>>>>>>>> 
>>>>>>>> 
>>>>>>>> Danny
>>>>>>>> 
>>>>>>> 
>>>>> 
>>>>> 
>>> 
>>> 
> 

Reply via email to