Hey Danny, I can think of two ways to accomplish this.
The first way is essentially what you've described. Allow the framework to have a pre-defined set of options defined via config (drop the message, re-route to another topic, etc). The second way is to catch the serialization issues, and still pass the failed message to the StreamTask. The way in which the StreamTask would be notified of the failure is up for debate. One option would be to have a ErrorTask interface, which has an onDeserializationError call back. No new configuration would be required in this case--simply implementing the ErrorTask means you get notified of any serialization errors (rather than failing the container, which would be the default). Another option would be to have the IncomingMessageEnvelope have an error flag, which we could use to denote the serialization failure. I like the second approach because it's more generic. Instead of pre-defining exact behavior when a failure occurs, it seems more flexible to let the StreamTask know, and let the developer decide what the appropriate action is. I hadn't even thought of the re-route case you brought up, and I'm sure there are many other possible actions that we're not thinking of right now. Of the second approach's potential implementation options, I favor the ErrorTask approach right now, but I haven't dug into it too much. Regardless of which way we choose, I think the default should be to fail the container. This is the safest behavior, as it means there will be no data loss, and developers will be alerted of the serialization error. As far as implementation goes, The four classes that are probably of most interest to you are SamzaContainer, TaskInstance, StreamConsumers, and SerdeManager. You'll need to catch the serde exceptions somewhere in the StreamConsumer or SerdeManager class, and implement your new logic there. I think the best approach is to read over these four classes, and ask us any questions you might have. Cheers, Chris On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]> wrote: >We have discussed this, and it is something that we want to look into. > >Do you have any thoughts on how to implement this feature? >I assume you would want the failure behavior to be configurable. > >Like >Drop the message, >Send a message to a new queue, and drop. >Fail the container (is that ever appropriate?) >Anything else? > >I am not familiar with this code base. >Do you have a suggestion on what classes I should be looking to modify? > >Is there someone who I should bounce ideas off of? > > >Thanks > > >Danny > > > >On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]> wrote: > >> It's not intentional, error handling just hasn't been added yet. If >>you're >> interested, we'd love to have the contribution. In particular, take a >>look >> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59), which also >> touches on how serdes should handle error conditions. >> >> Thanks, >> Jakob >> >> >> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti >> <[email protected]>wrote: >> >> > I am currently using the JsonSerde/JsonSerdeFactory classes for >> serializing >> > kafka messages. >> > >> > I have noticed that if there is bad json input coming in through >>kafka, >> the >> > samza container seems to crash. >> > >> > I was looking at JsonSerde.scala, which does not seem to have any >>error >> > handling. >> > >> > So I was curious if this was intentional? >> > Or if there is a different way to handle these types of input errors? >> > >> > >> > Thanks >> > >> > >> > Danny >> > >>
