Hey Martin,
I'm not sure we're in a position to make the decision about whether a
serde error should or should not be treated the same as an exception in
the task's process method. Really, the developer has to decide this.
That's why I favor the second approach.
Regarding the boilerplate code that intermingles error messages and
regular messages in the same method, I don't like that style since it
seems clunky and error prone. This is why I prefer the ErrorTask
interface. With ErrorTask, it would look like:
public void process(Š) {
doStuff(message);
}
public void error(Š) {
collector.send("kafka", "bad message", message);
}
Your point is good, though. There are more potential errors than just
serde errors. I think the ErrorTask interface could be extended with these
error types as they come up (thought that would be an API-breaking
change). Perhaps a simple generic ErrorTask.onError() callback can be used
for all error types, and the if() statement to parse out the type of error
can be implemented in there?
Cheers,
Chris
On 2/4/14 10:38 AM, "Martin Kleppmann" <[email protected]> wrote:
>A counterpoint thought: should a serialization/deserialization error be
>treated the same as an exception raised in a task's `process` method? A
>dead-letter queue could be a useful mechanism for dealing with all
>message processing failures, not just deserialization errors. In that
>case, the first way (configured options for handling the exception) would
>give a nice symmetry between different kinds of errors.
>
>The second way could achieve the same thing, but would require some
>boilerplate code along the lines of:
>
>public void process(...) {
> if (message not deserialized correctly) {
> deadLetter.emit(message);
> } else {
> try {
> doStuff(message);
> } catch (Exception e) {
> deadLetter.emit(message, e);
> }
> }
>}
>
>
>On 4 Feb 2014, at 17:46, Chris Riccomini <[email protected]> wrote:
>> Hey Danny,
>>
>> I can think of two ways to accomplish this.
>>
>> The first way is essentially what you've described. Allow the framework
>>to
>> have a pre-defined set of options defined via config (drop the message,
>> re-route to another topic, etc).
>>
>> The second way is to catch the serialization issues, and still pass the
>> failed message to the StreamTask. The way in which the StreamTask would
>>be
>> notified of the failure is up for debate. One option would be to have a
>> ErrorTask interface, which has an onDeserializationError call back. No
>>new
>> configuration would be required in this case--simply implementing the
>> ErrorTask means you get notified of any serialization errors (rather
>>than
>> failing the container, which would be the default). Another option would
>> be to have the IncomingMessageEnvelope have an error flag, which we
>>could
>> use to denote the serialization failure.
>>
>> I like the second approach because it's more generic. Instead of
>> pre-defining exact behavior when a failure occurs, it seems more
>>flexible
>> to let the StreamTask know, and let the developer decide what the
>> appropriate action is. I hadn't even thought of the re-route case you
>> brought up, and I'm sure there are many other possible actions that
>>we're
>> not thinking of right now. Of the second approach's potential
>> implementation options, I favor the ErrorTask approach right now, but I
>> haven't dug into it too much.
>>
>> Regardless of which way we choose, I think the default should be to fail
>> the container. This is the safest behavior, as it means there will be no
>> data loss, and developers will be alerted of the serialization error.
>>
>> As far as implementation goes, The four classes that are probably of
>>most
>> interest to you are SamzaContainer, TaskInstance, StreamConsumers, and
>> SerdeManager. You'll need to catch the serde exceptions somewhere in the
>> StreamConsumer or SerdeManager class, and implement your new logic
>>there.
>> I think the best approach is to read over these four classes, and ask us
>> any questions you might have.
>>
>> Cheers,
>> Chris
>>
>> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]> wrote:
>>
>>> We have discussed this, and it is something that we want to look into.
>>>
>>> Do you have any thoughts on how to implement this feature?
>>> I assume you would want the failure behavior to be configurable.
>>>
>>> Like
>>> Drop the message,
>>> Send a message to a new queue, and drop.
>>> Fail the container (is that ever appropriate?)
>>> Anything else?
>>>
>>> I am not familiar with this code base.
>>> Do you have a suggestion on what classes I should be looking to modify?
>>>
>>> Is there someone who I should bounce ideas off of?
>>>
>>>
>>> Thanks
>>>
>>>
>>> Danny
>>>
>>>
>>>
>>> On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]> wrote:
>>>
>>>> It's not intentional, error handling just hasn't been added yet. If
>>>> you're
>>>> interested, we'd love to have the contribution. In particular, take a
>>>> look
>>>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59), which
>>>>also
>>>> touches on how serdes should handle error conditions.
>>>>
>>>> Thanks,
>>>> Jakob
>>>>
>>>>
>>>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
>>>> <[email protected]>wrote:
>>>>
>>>>> I am currently using the JsonSerde/JsonSerdeFactory classes for
>>>> serializing
>>>>> kafka messages.
>>>>>
>>>>> I have noticed that if there is bad json input coming in through
>>>> kafka,
>>>> the
>>>>> samza container seems to crash.
>>>>>
>>>>> I was looking at JsonSerde.scala, which does not seem to have any
>>>> error
>>>>> handling.
>>>>>
>>>>> So I was curious if this was intentional?
>>>>> Or if there is a different way to handle these types of input errors?
>>>>>
>>>>>
>>>>> Thanks
>>>>>
>>>>>
>>>>> Danny
>>>>>
>>>>
>>
>