Hey Danny,

I can think of two ways to accomplish this.

The first way is essentially what you've described. Allow the framework to
have a pre-defined set of options defined via config (drop the message,
re-route to another topic, etc).

The second way is to catch the serialization issues, and still pass the
failed message to the StreamTask. The way in which the StreamTask would be
notified of the failure is up for debate. One option would be to have a
ErrorTask interface, which has an onDeserializationError call back. No new
configuration would be required in this case--simply implementing the
ErrorTask means you get notified of any serialization errors (rather than
failing the container, which would be the default). Another option would
be to have the IncomingMessageEnvelope have an error flag, which we could
use to denote the serialization failure.

I like the second approach because it's more generic. Instead of
pre-defining exact behavior when a failure occurs, it seems more flexible
to let the StreamTask know, and let the developer decide what the
appropriate action is. I hadn't even thought of the re-route case you
brought up, and I'm sure there are many other possible actions that we're
not thinking of right now. Of the second approach's potential
implementation options, I favor the ErrorTask approach right now, but I
haven't dug into it too much.

Regardless of which way we choose, I think the default should be to fail
the container. This is the safest behavior, as it means there will be no
data loss, and developers will be alerted of the serialization error.

As far as implementation goes, The four classes that are probably of most
interest to you are SamzaContainer, TaskInstance, StreamConsumers, and
SerdeManager. You'll need to catch the serde exceptions somewhere in the
StreamConsumer or SerdeManager class, and implement your new logic there.
I think the best approach is to read over these four classes, and ask us
any questions you might have.

Cheers,
Chris

On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]> wrote:

>We have discussed this, and it is something that we want to look into.
>
>Do you have any thoughts on how to implement this feature?
>I assume you would want the failure behavior to be configurable.
>
>Like
>Drop the message,
>Send a message to a new queue, and drop.
>Fail the container (is that ever appropriate?)
>Anything else?
>
>I am not familiar with this code base.
>Do you have a suggestion on what classes I should be looking to modify?
>
>Is there someone who I should bounce ideas off of?
>
>
>Thanks
>
>
>Danny
>
>
>
>On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]> wrote:
>
>> It's not intentional, error handling just hasn't been added yet.  If
>>you're
>> interested, we'd love to have the contribution.  In particular, take a
>>look
>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59), which also
>> touches on how serdes should handle error conditions.
>>
>> Thanks,
>> Jakob
>>
>>
>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
>> <[email protected]>wrote:
>>
>> > I am currently using the JsonSerde/JsonSerdeFactory classes for
>> serializing
>> > kafka messages.
>> >
>> > I have noticed that if there is bad json input coming in through
>>kafka,
>> the
>> > samza container seems to crash.
>> >
>> > I was looking at JsonSerde.scala, which does not seem to have any
>>error
>> > handling.
>> >
>> > So I was curious if this was intentional?
>> > Or if there is a different way to handle these types of input errors?
>> >
>> >
>> > Thanks
>> >
>> >
>> > Danny
>> >
>>

Reply via email to