OK Great, that looks good.

I will look into this as I have time and get back to you with questions.


Thanks for the help.


Danny


On Tue, Feb 4, 2014 at 11:29 AM, Chris Riccomini <[email protected]>wrote:

> Hey Guys,
>
> Here's a pseudo-code proposal for an error API.
>
> interface ErrorTask {
>   public void error(ErrorEnvelope envelope, MessageCollector collector,
> TaskCoordinator coordinator);
> }
>
> interface ErrorEnvelope {
>   ErrorCode getErrorCode();
>   Object getError();
> }
>
> class DeserializationError {
>   byte[] getKey();
>
>   byte[] getMessage();
>   boolean keyFailed();
>   boolean valueFailed();
> }
>
> Then you could implement something like:
>
> class MyStreamTask implements StreamTask, ErrorTask {
>   public void proces(Š) {
>     // do stuff
>   }
>
>   public void error(ErrorEnvelope envelope, MessageCollector collector,
> TaskCoordinator coordinator) {
>     if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode()) {
>       DeserializationError error = (DeserializationError)
> envelope.getError();
>       collector.send("kafka", "bad-data", error.getKey(),
> error.getValue());
>     }
>   }
> }
>
> Arguably, we've just moved our big if-statement block from the process()
> method into the ErrorTask.error method, but the alternative, as I see it,
> is to have on call back per error message. This moves us into a state
> where every time we want to add a new callback, we break all existing
> implementations (since they must now implement the new method, as well). I
> chose the single callback and generic object approach because it matches
> what our process method does when multiple input streams are defined (if
> envelope is from stream A, do X, if envelope is from stream B, do Y), but
> I'm open to suggestions if people have them.
>
> Regarding your JSON use case, yes, I think you'd be given the raw bytes
> for the key and message, and it'd be up to you to decide what to do with
> them. Samza allows you to define systems with no serde. Systems without a
> serde default to byte[] for both key and value, which means that you could
> define a system (or stream) with a pass-through serde, and send it the raw
> bytes for the key and message. Alternatively, you could try decoding the
> data using some other serde, posting the message, to a DB, logging the
> message in Log4j, discarding the message, etc.
>
> Cheers,
> Chris
>
> On 2/4/14 10:48 AM, "Danny Antonetti" <[email protected]> wrote:
>
> >Hey Chris,
> >
> >I am not sure I understand your suggestion about the ErrorTask.  What
> >would
> >this new functions method signature be?  I would assume it would take in
> >the byte[] from the fromBytes function.  It seems like that ties the Serde
> >implementation to the StreamTask implementation.  Unless you are
> >suggesting
> >that it should be notified without giving the input byte[].
> >
> >In our case we are using the JsonSerde, so the byte[] for the json data
> >would be given to  onDeserializationError and then our task would have to
> >decode the bytes?
> >
> >Just to clarify my suggestion, I was not thinking that we would have a
> >predefined set of behaviors.  I was thinking that I would have an
> >interface
> >(That would not be part of the StreamTask), maybe ErrorHandler.  With this
> >option there would be implementations of that interface for Dropping the
> >message, redirect to a new Queue, or drop it.  But this would require
> >extra
> >configuration as you mentioned.
> >
> >I am not invested in my approach, I just wanted to make sure that I
> >understand the suggestions/options.
> >
> >
> >Thanks
> >
> >
> >Danny
> >
> >
> >On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini
> ><[email protected]>wrote:
> >
> >> Hey Danny,
> >>
> >> I can think of two ways to accomplish this.
> >>
> >> The first way is essentially what you've described. Allow the framework
> >>to
> >> have a pre-defined set of options defined via config (drop the message,
> >> re-route to another topic, etc).
> >>
> >> The second way is to catch the serialization issues, and still pass the
> >> failed message to the StreamTask. The way in which the StreamTask would
> >>be
> >> notified of the failure is up for debate. One option would be to have a
> >> ErrorTask interface, which has an onDeserializationError call back. No
> >>new
> >> configuration would be required in this case--simply implementing the
> >> ErrorTask means you get notified of any serialization errors (rather
> >>than
> >> failing the container, which would be the default). Another option would
> >> be to have the IncomingMessageEnvelope have an error flag, which we
> >>could
> >> use to denote the serialization failure.
> >>
> >> I like the second approach because it's more generic. Instead of
> >> pre-defining exact behavior when a failure occurs, it seems more
> >>flexible
> >> to let the StreamTask know, and let the developer decide what the
> >> appropriate action is. I hadn't even thought of the re-route case you
> >> brought up, and I'm sure there are many other possible actions that
> >>we're
> >> not thinking of right now. Of the second approach's potential
> >> implementation options, I favor the ErrorTask approach right now, but I
> >> haven't dug into it too much.
> >>
> >> Regardless of which way we choose, I think the default should be to fail
> >> the container. This is the safest behavior, as it means there will be no
> >> data loss, and developers will be alerted of the serialization error.
> >>
> >> As far as implementation goes, The four classes that are probably of
> >>most
> >> interest to you are SamzaContainer, TaskInstance, StreamConsumers, and
> >> SerdeManager. You'll need to catch the serde exceptions somewhere in the
> >> StreamConsumer or SerdeManager class, and implement your new logic
> >>there.
> >> I think the best approach is to read over these four classes, and ask us
> >> any questions you might have.
> >>
> >> Cheers,
> >> Chris
> >>
> >> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]> wrote:
> >>
> >> >We have discussed this, and it is something that we want to look into.
> >> >
> >> >Do you have any thoughts on how to implement this feature?
> >> >I assume you would want the failure behavior to be configurable.
> >> >
> >> >Like
> >> >Drop the message,
> >> >Send a message to a new queue, and drop.
> >> >Fail the container (is that ever appropriate?)
> >> >Anything else?
> >> >
> >> >I am not familiar with this code base.
> >> >Do you have a suggestion on what classes I should be looking to modify?
> >> >
> >> >Is there someone who I should bounce ideas off of?
> >> >
> >> >
> >> >Thanks
> >> >
> >> >
> >> >Danny
> >> >
> >> >
> >> >
> >> >On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]>
> wrote:
> >> >
> >> >> It's not intentional, error handling just hasn't been added yet.  If
> >> >>you're
> >> >> interested, we'd love to have the contribution.  In particular, take
> >>a
> >> >>look
> >> >> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59), which
> >> also
> >> >> touches on how serdes should handle error conditions.
> >> >>
> >> >> Thanks,
> >> >> Jakob
> >> >>
> >> >>
> >> >> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
> >> >> <[email protected]>wrote:
> >> >>
> >> >> > I am currently using the JsonSerde/JsonSerdeFactory classes for
> >> >> serializing
> >> >> > kafka messages.
> >> >> >
> >> >> > I have noticed that if there is bad json input coming in through
> >> >>kafka,
> >> >> the
> >> >> > samza container seems to crash.
> >> >> >
> >> >> > I was looking at JsonSerde.scala, which does not seem to have any
> >> >>error
> >> >> > handling.
> >> >> >
> >> >> > So I was curious if this was intentional?
> >> >> > Or if there is a different way to handle these types of input
> >>errors?
> >> >> >
> >> >> >
> >> >> > Thanks
> >> >> >
> >> >> >
> >> >> > Danny
> >> >> >
> >> >>
> >>
> >>
>
>

Reply via email to