Hey Chris,

I am not sure I understand your suggestion about the ErrorTask.  What would
this new functions method signature be?  I would assume it would take in
the byte[] from the fromBytes function.  It seems like that ties the Serde
implementation to the StreamTask implementation.  Unless you are suggesting
that it should be notified without giving the input byte[].

In our case we are using the JsonSerde, so the byte[] for the json data
would be given to  onDeserializationError and then our task would have to
decode the bytes?

Just to clarify my suggestion, I was not thinking that we would have a
predefined set of behaviors.  I was thinking that I would have an interface
(That would not be part of the StreamTask), maybe ErrorHandler.  With this
option there would be implementations of that interface for Dropping the
message, redirect to a new Queue, or drop it.  But this would require extra
configuration as you mentioned.

I am not invested in my approach, I just wanted to make sure that I
understand the suggestions/options.


Thanks


Danny


On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini <[email protected]>wrote:

> Hey Danny,
>
> I can think of two ways to accomplish this.
>
> The first way is essentially what you've described. Allow the framework to
> have a pre-defined set of options defined via config (drop the message,
> re-route to another topic, etc).
>
> The second way is to catch the serialization issues, and still pass the
> failed message to the StreamTask. The way in which the StreamTask would be
> notified of the failure is up for debate. One option would be to have a
> ErrorTask interface, which has an onDeserializationError call back. No new
> configuration would be required in this case--simply implementing the
> ErrorTask means you get notified of any serialization errors (rather than
> failing the container, which would be the default). Another option would
> be to have the IncomingMessageEnvelope have an error flag, which we could
> use to denote the serialization failure.
>
> I like the second approach because it's more generic. Instead of
> pre-defining exact behavior when a failure occurs, it seems more flexible
> to let the StreamTask know, and let the developer decide what the
> appropriate action is. I hadn't even thought of the re-route case you
> brought up, and I'm sure there are many other possible actions that we're
> not thinking of right now. Of the second approach's potential
> implementation options, I favor the ErrorTask approach right now, but I
> haven't dug into it too much.
>
> Regardless of which way we choose, I think the default should be to fail
> the container. This is the safest behavior, as it means there will be no
> data loss, and developers will be alerted of the serialization error.
>
> As far as implementation goes, The four classes that are probably of most
> interest to you are SamzaContainer, TaskInstance, StreamConsumers, and
> SerdeManager. You'll need to catch the serde exceptions somewhere in the
> StreamConsumer or SerdeManager class, and implement your new logic there.
> I think the best approach is to read over these four classes, and ask us
> any questions you might have.
>
> Cheers,
> Chris
>
> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]> wrote:
>
> >We have discussed this, and it is something that we want to look into.
> >
> >Do you have any thoughts on how to implement this feature?
> >I assume you would want the failure behavior to be configurable.
> >
> >Like
> >Drop the message,
> >Send a message to a new queue, and drop.
> >Fail the container (is that ever appropriate?)
> >Anything else?
> >
> >I am not familiar with this code base.
> >Do you have a suggestion on what classes I should be looking to modify?
> >
> >Is there someone who I should bounce ideas off of?
> >
> >
> >Thanks
> >
> >
> >Danny
> >
> >
> >
> >On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]> wrote:
> >
> >> It's not intentional, error handling just hasn't been added yet.  If
> >>you're
> >> interested, we'd love to have the contribution.  In particular, take a
> >>look
> >> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59), which
> also
> >> touches on how serdes should handle error conditions.
> >>
> >> Thanks,
> >> Jakob
> >>
> >>
> >> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
> >> <[email protected]>wrote:
> >>
> >> > I am currently using the JsonSerde/JsonSerdeFactory classes for
> >> serializing
> >> > kafka messages.
> >> >
> >> > I have noticed that if there is bad json input coming in through
> >>kafka,
> >> the
> >> > samza container seems to crash.
> >> >
> >> > I was looking at JsonSerde.scala, which does not seem to have any
> >>error
> >> > handling.
> >> >
> >> > So I was curious if this was intentional?
> >> > Or if there is a different way to handle these types of input errors?
> >> >
> >> >
> >> > Thanks
> >> >
> >> >
> >> > Danny
> >> >
> >>
>
>

Reply via email to