Hey Chris, I am not sure I understand your suggestion about the ErrorTask. What would this new functions method signature be? I would assume it would take in the byte[] from the fromBytes function. It seems like that ties the Serde implementation to the StreamTask implementation. Unless you are suggesting that it should be notified without giving the input byte[].
In our case we are using the JsonSerde, so the byte[] for the json data would be given to onDeserializationError and then our task would have to decode the bytes? Just to clarify my suggestion, I was not thinking that we would have a predefined set of behaviors. I was thinking that I would have an interface (That would not be part of the StreamTask), maybe ErrorHandler. With this option there would be implementations of that interface for Dropping the message, redirect to a new Queue, or drop it. But this would require extra configuration as you mentioned. I am not invested in my approach, I just wanted to make sure that I understand the suggestions/options. Thanks Danny On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini <[email protected]>wrote: > Hey Danny, > > I can think of two ways to accomplish this. > > The first way is essentially what you've described. Allow the framework to > have a pre-defined set of options defined via config (drop the message, > re-route to another topic, etc). > > The second way is to catch the serialization issues, and still pass the > failed message to the StreamTask. The way in which the StreamTask would be > notified of the failure is up for debate. One option would be to have a > ErrorTask interface, which has an onDeserializationError call back. No new > configuration would be required in this case--simply implementing the > ErrorTask means you get notified of any serialization errors (rather than > failing the container, which would be the default). Another option would > be to have the IncomingMessageEnvelope have an error flag, which we could > use to denote the serialization failure. > > I like the second approach because it's more generic. Instead of > pre-defining exact behavior when a failure occurs, it seems more flexible > to let the StreamTask know, and let the developer decide what the > appropriate action is. I hadn't even thought of the re-route case you > brought up, and I'm sure there are many other possible actions that we're > not thinking of right now. Of the second approach's potential > implementation options, I favor the ErrorTask approach right now, but I > haven't dug into it too much. > > Regardless of which way we choose, I think the default should be to fail > the container. This is the safest behavior, as it means there will be no > data loss, and developers will be alerted of the serialization error. > > As far as implementation goes, The four classes that are probably of most > interest to you are SamzaContainer, TaskInstance, StreamConsumers, and > SerdeManager. You'll need to catch the serde exceptions somewhere in the > StreamConsumer or SerdeManager class, and implement your new logic there. > I think the best approach is to read over these four classes, and ask us > any questions you might have. > > Cheers, > Chris > > On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]> wrote: > > >We have discussed this, and it is something that we want to look into. > > > >Do you have any thoughts on how to implement this feature? > >I assume you would want the failure behavior to be configurable. > > > >Like > >Drop the message, > >Send a message to a new queue, and drop. > >Fail the container (is that ever appropriate?) > >Anything else? > > > >I am not familiar with this code base. > >Do you have a suggestion on what classes I should be looking to modify? > > > >Is there someone who I should bounce ideas off of? > > > > > >Thanks > > > > > >Danny > > > > > > > >On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]> wrote: > > > >> It's not intentional, error handling just hasn't been added yet. If > >>you're > >> interested, we'd love to have the contribution. In particular, take a > >>look > >> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59), which > also > >> touches on how serdes should handle error conditions. > >> > >> Thanks, > >> Jakob > >> > >> > >> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti > >> <[email protected]>wrote: > >> > >> > I am currently using the JsonSerde/JsonSerdeFactory classes for > >> serializing > >> > kafka messages. > >> > > >> > I have noticed that if there is bad json input coming in through > >>kafka, > >> the > >> > samza container seems to crash. > >> > > >> > I was looking at JsonSerde.scala, which does not seem to have any > >>error > >> > handling. > >> > > >> > So I was curious if this was intentional? > >> > Or if there is a different way to handle these types of input errors? > >> > > >> > > >> > Thanks > >> > > >> > > >> > Danny > >> > > >> > >
