OK Great, that looks good. I will look into this as I have time and get back to you with questions.
Thanks for the help. Danny On Tue, Feb 4, 2014 at 11:29 AM, Chris Riccomini <[email protected]>wrote: > Hey Guys, > > Here's a pseudo-code proposal for an error API. > > interface ErrorTask { > public void error(ErrorEnvelope envelope, MessageCollector collector, > TaskCoordinator coordinator); > } > > interface ErrorEnvelope { > ErrorCode getErrorCode(); > Object getError(); > } > > class DeserializationError { > byte[] getKey(); > > byte[] getMessage(); > boolean keyFailed(); > boolean valueFailed(); > } > > Then you could implement something like: > > class MyStreamTask implements StreamTask, ErrorTask { > public void proces(Š) { > // do stuff > } > > public void error(ErrorEnvelope envelope, MessageCollector collector, > TaskCoordinator coordinator) { > if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode()) { > DeserializationError error = (DeserializationError) > envelope.getError(); > collector.send("kafka", "bad-data", error.getKey(), > error.getValue()); > } > } > } > > Arguably, we've just moved our big if-statement block from the process() > method into the ErrorTask.error method, but the alternative, as I see it, > is to have on call back per error message. This moves us into a state > where every time we want to add a new callback, we break all existing > implementations (since they must now implement the new method, as well). I > chose the single callback and generic object approach because it matches > what our process method does when multiple input streams are defined (if > envelope is from stream A, do X, if envelope is from stream B, do Y), but > I'm open to suggestions if people have them. > > Regarding your JSON use case, yes, I think you'd be given the raw bytes > for the key and message, and it'd be up to you to decide what to do with > them. Samza allows you to define systems with no serde. Systems without a > serde default to byte[] for both key and value, which means that you could > define a system (or stream) with a pass-through serde, and send it the raw > bytes for the key and message. Alternatively, you could try decoding the > data using some other serde, posting the message, to a DB, logging the > message in Log4j, discarding the message, etc. > > Cheers, > Chris > > On 2/4/14 10:48 AM, "Danny Antonetti" <[email protected]> wrote: > > >Hey Chris, > > > >I am not sure I understand your suggestion about the ErrorTask. What > >would > >this new functions method signature be? I would assume it would take in > >the byte[] from the fromBytes function. It seems like that ties the Serde > >implementation to the StreamTask implementation. Unless you are > >suggesting > >that it should be notified without giving the input byte[]. > > > >In our case we are using the JsonSerde, so the byte[] for the json data > >would be given to onDeserializationError and then our task would have to > >decode the bytes? > > > >Just to clarify my suggestion, I was not thinking that we would have a > >predefined set of behaviors. I was thinking that I would have an > >interface > >(That would not be part of the StreamTask), maybe ErrorHandler. With this > >option there would be implementations of that interface for Dropping the > >message, redirect to a new Queue, or drop it. But this would require > >extra > >configuration as you mentioned. > > > >I am not invested in my approach, I just wanted to make sure that I > >understand the suggestions/options. > > > > > >Thanks > > > > > >Danny > > > > > >On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini > ><[email protected]>wrote: > > > >> Hey Danny, > >> > >> I can think of two ways to accomplish this. > >> > >> The first way is essentially what you've described. Allow the framework > >>to > >> have a pre-defined set of options defined via config (drop the message, > >> re-route to another topic, etc). > >> > >> The second way is to catch the serialization issues, and still pass the > >> failed message to the StreamTask. The way in which the StreamTask would > >>be > >> notified of the failure is up for debate. One option would be to have a > >> ErrorTask interface, which has an onDeserializationError call back. No > >>new > >> configuration would be required in this case--simply implementing the > >> ErrorTask means you get notified of any serialization errors (rather > >>than > >> failing the container, which would be the default). Another option would > >> be to have the IncomingMessageEnvelope have an error flag, which we > >>could > >> use to denote the serialization failure. > >> > >> I like the second approach because it's more generic. Instead of > >> pre-defining exact behavior when a failure occurs, it seems more > >>flexible > >> to let the StreamTask know, and let the developer decide what the > >> appropriate action is. I hadn't even thought of the re-route case you > >> brought up, and I'm sure there are many other possible actions that > >>we're > >> not thinking of right now. Of the second approach's potential > >> implementation options, I favor the ErrorTask approach right now, but I > >> haven't dug into it too much. > >> > >> Regardless of which way we choose, I think the default should be to fail > >> the container. This is the safest behavior, as it means there will be no > >> data loss, and developers will be alerted of the serialization error. > >> > >> As far as implementation goes, The four classes that are probably of > >>most > >> interest to you are SamzaContainer, TaskInstance, StreamConsumers, and > >> SerdeManager. You'll need to catch the serde exceptions somewhere in the > >> StreamConsumer or SerdeManager class, and implement your new logic > >>there. > >> I think the best approach is to read over these four classes, and ask us > >> any questions you might have. > >> > >> Cheers, > >> Chris > >> > >> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]> wrote: > >> > >> >We have discussed this, and it is something that we want to look into. > >> > > >> >Do you have any thoughts on how to implement this feature? > >> >I assume you would want the failure behavior to be configurable. > >> > > >> >Like > >> >Drop the message, > >> >Send a message to a new queue, and drop. > >> >Fail the container (is that ever appropriate?) > >> >Anything else? > >> > > >> >I am not familiar with this code base. > >> >Do you have a suggestion on what classes I should be looking to modify? > >> > > >> >Is there someone who I should bounce ideas off of? > >> > > >> > > >> >Thanks > >> > > >> > > >> >Danny > >> > > >> > > >> > > >> >On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]> > wrote: > >> > > >> >> It's not intentional, error handling just hasn't been added yet. If > >> >>you're > >> >> interested, we'd love to have the contribution. In particular, take > >>a > >> >>look > >> >> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59), which > >> also > >> >> touches on how serdes should handle error conditions. > >> >> > >> >> Thanks, > >> >> Jakob > >> >> > >> >> > >> >> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti > >> >> <[email protected]>wrote: > >> >> > >> >> > I am currently using the JsonSerde/JsonSerdeFactory classes for > >> >> serializing > >> >> > kafka messages. > >> >> > > >> >> > I have noticed that if there is bad json input coming in through > >> >>kafka, > >> >> the > >> >> > samza container seems to crash. > >> >> > > >> >> > I was looking at JsonSerde.scala, which does not seem to have any > >> >>error > >> >> > handling. > >> >> > > >> >> > So I was curious if this was intentional? > >> >> > Or if there is a different way to handle these types of input > >>errors? > >> >> > > >> >> > > >> >> > Thanks > >> >> > > >> >> > > >> >> > Danny > >> >> > > >> >> > >> > >> > >
