Hey Guys,

Here's a pseudo-code proposal for an error API.

interface ErrorTask {
  public void error(ErrorEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator);
}

interface ErrorEnvelope {
  ErrorCode getErrorCode();
  Object getError();
}

class DeserializationError {
  byte[] getKey();

  byte[] getMessage();
  boolean keyFailed();
  boolean valueFailed();
}

Then you could implement something like:

class MyStreamTask implements StreamTask, ErrorTask {
  public void proces(Š) {
    // do stuff
  }

  public void error(ErrorEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator) {
    if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode()) {
      DeserializationError error = (DeserializationError)
envelope.getError();
      collector.send("kafka", "bad-data", error.getKey(),
error.getValue());
    }
  }
}

Arguably, we've just moved our big if-statement block from the process()
method into the ErrorTask.error method, but the alternative, as I see it,
is to have on call back per error message. This moves us into a state
where every time we want to add a new callback, we break all existing
implementations (since they must now implement the new method, as well). I
chose the single callback and generic object approach because it matches
what our process method does when multiple input streams are defined (if
envelope is from stream A, do X, if envelope is from stream B, do Y), but
I'm open to suggestions if people have them.

Regarding your JSON use case, yes, I think you'd be given the raw bytes
for the key and message, and it'd be up to you to decide what to do with
them. Samza allows you to define systems with no serde. Systems without a
serde default to byte[] for both key and value, which means that you could
define a system (or stream) with a pass-through serde, and send it the raw
bytes for the key and message. Alternatively, you could try decoding the
data using some other serde, posting the message, to a DB, logging the
message in Log4j, discarding the message, etc.

Cheers,
Chris

On 2/4/14 10:48 AM, "Danny Antonetti" <[email protected]> wrote:

>Hey Chris,
>
>I am not sure I understand your suggestion about the ErrorTask.  What
>would
>this new functions method signature be?  I would assume it would take in
>the byte[] from the fromBytes function.  It seems like that ties the Serde
>implementation to the StreamTask implementation.  Unless you are
>suggesting
>that it should be notified without giving the input byte[].
>
>In our case we are using the JsonSerde, so the byte[] for the json data
>would be given to  onDeserializationError and then our task would have to
>decode the bytes?
>
>Just to clarify my suggestion, I was not thinking that we would have a
>predefined set of behaviors.  I was thinking that I would have an
>interface
>(That would not be part of the StreamTask), maybe ErrorHandler.  With this
>option there would be implementations of that interface for Dropping the
>message, redirect to a new Queue, or drop it.  But this would require
>extra
>configuration as you mentioned.
>
>I am not invested in my approach, I just wanted to make sure that I
>understand the suggestions/options.
>
>
>Thanks
>
>
>Danny
>
>
>On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini
><[email protected]>wrote:
>
>> Hey Danny,
>>
>> I can think of two ways to accomplish this.
>>
>> The first way is essentially what you've described. Allow the framework
>>to
>> have a pre-defined set of options defined via config (drop the message,
>> re-route to another topic, etc).
>>
>> The second way is to catch the serialization issues, and still pass the
>> failed message to the StreamTask. The way in which the StreamTask would
>>be
>> notified of the failure is up for debate. One option would be to have a
>> ErrorTask interface, which has an onDeserializationError call back. No
>>new
>> configuration would be required in this case--simply implementing the
>> ErrorTask means you get notified of any serialization errors (rather
>>than
>> failing the container, which would be the default). Another option would
>> be to have the IncomingMessageEnvelope have an error flag, which we
>>could
>> use to denote the serialization failure.
>>
>> I like the second approach because it's more generic. Instead of
>> pre-defining exact behavior when a failure occurs, it seems more
>>flexible
>> to let the StreamTask know, and let the developer decide what the
>> appropriate action is. I hadn't even thought of the re-route case you
>> brought up, and I'm sure there are many other possible actions that
>>we're
>> not thinking of right now. Of the second approach's potential
>> implementation options, I favor the ErrorTask approach right now, but I
>> haven't dug into it too much.
>>
>> Regardless of which way we choose, I think the default should be to fail
>> the container. This is the safest behavior, as it means there will be no
>> data loss, and developers will be alerted of the serialization error.
>>
>> As far as implementation goes, The four classes that are probably of
>>most
>> interest to you are SamzaContainer, TaskInstance, StreamConsumers, and
>> SerdeManager. You'll need to catch the serde exceptions somewhere in the
>> StreamConsumer or SerdeManager class, and implement your new logic
>>there.
>> I think the best approach is to read over these four classes, and ask us
>> any questions you might have.
>>
>> Cheers,
>> Chris
>>
>> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]> wrote:
>>
>> >We have discussed this, and it is something that we want to look into.
>> >
>> >Do you have any thoughts on how to implement this feature?
>> >I assume you would want the failure behavior to be configurable.
>> >
>> >Like
>> >Drop the message,
>> >Send a message to a new queue, and drop.
>> >Fail the container (is that ever appropriate?)
>> >Anything else?
>> >
>> >I am not familiar with this code base.
>> >Do you have a suggestion on what classes I should be looking to modify?
>> >
>> >Is there someone who I should bounce ideas off of?
>> >
>> >
>> >Thanks
>> >
>> >
>> >Danny
>> >
>> >
>> >
>> >On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]> wrote:
>> >
>> >> It's not intentional, error handling just hasn't been added yet.  If
>> >>you're
>> >> interested, we'd love to have the contribution.  In particular, take
>>a
>> >>look
>> >> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59), which
>> also
>> >> touches on how serdes should handle error conditions.
>> >>
>> >> Thanks,
>> >> Jakob
>> >>
>> >>
>> >> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
>> >> <[email protected]>wrote:
>> >>
>> >> > I am currently using the JsonSerde/JsonSerdeFactory classes for
>> >> serializing
>> >> > kafka messages.
>> >> >
>> >> > I have noticed that if there is bad json input coming in through
>> >>kafka,
>> >> the
>> >> > samza container seems to crash.
>> >> >
>> >> > I was looking at JsonSerde.scala, which does not seem to have any
>> >>error
>> >> > handling.
>> >> >
>> >> > So I was curious if this was intentional?
>> >> > Or if there is a different way to handle these types of input
>>errors?
>> >> >
>> >> >
>> >> > Thanks
>> >> >
>> >> >
>> >> > Danny
>> >> >
>> >>
>>
>>

Reply via email to