So it sounds like this would just be a try catch around this line
in SystemConsumers.scala

unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManager.fromBytes(envelope))

And just do some logging in the catch statement?


On Wed, Feb 5, 2014 at 12:32 PM, Chris Riccomini <[email protected]>wrote:

> Hey Guys,
>
> Thinking on this more. Given that we don't have a good grasp on this, and
> it seems fairly complicated for now, I'm proposing that we hold off on all
> this complex work, and just support the ability to drop messages for now.
> This should be pretty straightforward to implement, and is generally
> pretty useful, and not too invasive.
>
> Does that sound cool with folks? Danny?
>
> Cheers,
> Chris
>
> On 2/5/14 11:43 AM, "Chris Riccomini" <[email protected]> wrote:
>
> >Hey Martin,
> >
> >I agree with you about ordering. It would be ideal to give the error()
> >callback exactly when we would normally call process() on the message, but
> >can't because of the serde error. I think this is do-able if we tweak
> >things in SystemConsumers a bit.
> >
> >The problem I'm having trouble resolving is how to handle the
> >MessageChooser. The MessageChooser chooses the processing order between
> >partitions ("I have a message from stream/partition X, and
> >stream/partition Y, which one do I process next?"). These choosers
> >occasionally look at the actual message payload (e.g. The timestamp field
> >of an incoming message) when doing the choosing. The chooser won't know
> >how to handle an IncomingMessageEnvelope where the key and value are both
> >byte[], when it's expecting Avro, ProtoBuf, JSON, etc. One solution I can
> >think of would be to add an error() call to the MessageChooser interface.
> >The other solution would be to just give the MessageChooser everything,
> >and in cases where it is looking at the message payload, it just has to be
> >careful. I think there might be other solutions here as well.
> >
> >More thought required.
> >
> >Cheers,
> >Chris
> >
> >On 2/4/14 4:13 PM, "Martin Kleppmann" <[email protected]> wrote:
> >
> >>Ok -- I had originally misunderstood your ErrorTask proposal. The way you
> >>described it looks good to me. I agree that a single error method, which
> >>takes a generic error envelope object, is better than lots of different
> >>methods.
> >>
> >>Rather than
> >>
> >>if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())
> >>
> >>could we just say this?
> >>
> >>if (envelope.getError() instanceof DeserializationError)
> >>
> >>Regarding ordering: my instinctive reaction is that it would be least
> >>surprising if the error is received at the same point in the message
> >>sequence as the message would have been received, had it not been an
> >>error. But that may need some more thinking.
> >>
> >>Cheers,
> >>Martin
> >>
> >>On 4 Feb 2014, at 22:23, Chris Riccomini <[email protected]>
> wrote:
> >>> Hey Guys,
> >>>
> >>> One other thing to consider here, that didn't strike me at first is how
> >>>to
> >>> handle ordering when a serialization error occurs.
> >>>
> >>> Right now, the SystemConsumers class reads messages in, buffers them,
> >>>and
> >>> feeds them slowly to the MessageChooser. The MessageChooser takes only
> >>> IncomingMessageEnvelope right now. So the question is, if we have a
> >>>serde
> >>> error, when does the StreamTask see it? It shouldn't see it before
> >>>other
> >>> messages for the same stream partition, since this would mean we've
> >>>broken
> >>> the ordering of the messages. Arguably, the MessageChooser should also
> >>>get
> >>> a chance to see it, to choose in what order to process it.
> >>>
> >>> One could make the argument, though, that a message that can't be
> >>> deserialized is invalid, and you're looking at potential data loss
> >>>anyway,
> >>> so ordering doesn't matter. I'm not sure if this assumption is true in
> >>>all
> >>> cases though.
> >>>
> >>> I don't really have a good answer for this at the moment. I think we
> >>> should think about it.
> >>>
> >>> Cheers,
> >>> Chris
> >>>
> >>> On 2/4/14 11:36 AM, "Danny Antonetti" <[email protected]>
> >>>wrote:
> >>>
> >>>> OK Great, that looks good.
> >>>>
> >>>> I will look into this as I have time and get back to you with
> >>>>questions.
> >>>>
> >>>>
> >>>> Thanks for the help.
> >>>>
> >>>>
> >>>> Danny
> >>>>
> >>>>
> >>>> On Tue, Feb 4, 2014 at 11:29 AM, Chris Riccomini
> >>>> <[email protected]>wrote:
> >>>>
> >>>>> Hey Guys,
> >>>>>
> >>>>> Here's a pseudo-code proposal for an error API.
> >>>>>
> >>>>> interface ErrorTask {
> >>>>>  public void error(ErrorEnvelope envelope, MessageCollector
> >>>>>collector,
> >>>>> TaskCoordinator coordinator);
> >>>>> }
> >>>>>
> >>>>> interface ErrorEnvelope {
> >>>>>  ErrorCode getErrorCode();
> >>>>>  Object getError();
> >>>>> }
> >>>>>
> >>>>> class DeserializationError {
> >>>>>  byte[] getKey();
> >>>>>
> >>>>>  byte[] getMessage();
> >>>>>  boolean keyFailed();
> >>>>>  boolean valueFailed();
> >>>>> }
> >>>>>
> >>>>> Then you could implement something like:
> >>>>>
> >>>>> class MyStreamTask implements StreamTask, ErrorTask {
> >>>>>  public void proces(Š) {
> >>>>>    // do stuff
> >>>>>  }
> >>>>>
> >>>>>  public void error(ErrorEnvelope envelope, MessageCollector
> >>>>>collector,
> >>>>> TaskCoordinator coordinator) {
> >>>>>    if
> >>>>>(DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())
> >>>>> {
> >>>>>      DeserializationError error = (DeserializationError)
> >>>>> envelope.getError();
> >>>>>      collector.send("kafka", "bad-data", error.getKey(),
> >>>>> error.getValue());
> >>>>>    }
> >>>>>  }
> >>>>> }
> >>>>>
> >>>>> Arguably, we've just moved our big if-statement block from the
> >>>>>process()
> >>>>> method into the ErrorTask.error method, but the alternative, as I see
> >>>>> it,
> >>>>> is to have on call back per error message. This moves us into a state
> >>>>> where every time we want to add a new callback, we break all existing
> >>>>> implementations (since they must now implement the new method, as
> >>>>> well). I
> >>>>> chose the single callback and generic object approach because it
> >>>>>matches
> >>>>> what our process method does when multiple input streams are defined
> >>>>>(if
> >>>>> envelope is from stream A, do X, if envelope is from stream B, do Y),
> >>>>> but
> >>>>> I'm open to suggestions if people have them.
> >>>>>
> >>>>> Regarding your JSON use case, yes, I think you'd be given the raw
> >>>>>bytes
> >>>>> for the key and message, and it'd be up to you to decide what to do
> >>>>>with
> >>>>> them. Samza allows you to define systems with no serde. Systems
> >>>>>without
> >>>>> a
> >>>>> serde default to byte[] for both key and value, which means that you
> >>>>> could
> >>>>> define a system (or stream) with a pass-through serde, and send it
> >>>>>the
> >>>>> raw
> >>>>> bytes for the key and message. Alternatively, you could try decoding
> >>>>>the
> >>>>> data using some other serde, posting the message, to a DB, logging
> >>>>>the
> >>>>> message in Log4j, discarding the message, etc.
> >>>>>
> >>>>> Cheers,
> >>>>> Chris
> >>>>>
> >>>>> On 2/4/14 10:48 AM, "Danny Antonetti" <[email protected]>
> >>>>>wrote:
> >>>>>
> >>>>>> Hey Chris,
> >>>>>>
> >>>>>> I am not sure I understand your suggestion about the ErrorTask.
> >>>>>>What
> >>>>>> would
> >>>>>> this new functions method signature be?  I would assume it would
> >>>>>>take
> >>>>> in
> >>>>>> the byte[] from the fromBytes function.  It seems like that ties the
> >>>>> Serde
> >>>>>> implementation to the StreamTask implementation.  Unless you are
> >>>>>> suggesting
> >>>>>> that it should be notified without giving the input byte[].
> >>>>>>
> >>>>>> In our case we are using the JsonSerde, so the byte[] for the json
> >>>>>>data
> >>>>>> would be given to  onDeserializationError and then our task would
> >>>>>>have
> >>>>> to
> >>>>>> decode the bytes?
> >>>>>>
> >>>>>> Just to clarify my suggestion, I was not thinking that we would have
> >>>>>>a
> >>>>>> predefined set of behaviors.  I was thinking that I would have an
> >>>>>> interface
> >>>>>> (That would not be part of the StreamTask), maybe ErrorHandler.
> >>>>>>With
> >>>>> this
> >>>>>> option there would be implementations of that interface for Dropping
> >>>>> the
> >>>>>> message, redirect to a new Queue, or drop it.  But this would
> >>>>>>require
> >>>>>> extra
> >>>>>> configuration as you mentioned.
> >>>>>>
> >>>>>> I am not invested in my approach, I just wanted to make sure that I
> >>>>>> understand the suggestions/options.
> >>>>>>
> >>>>>>
> >>>>>> Thanks
> >>>>>>
> >>>>>>
> >>>>>> Danny
> >>>>>>
> >>>>>>
> >>>>>> On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini
> >>>>>> <[email protected]>wrote:
> >>>>>>
> >>>>>>> Hey Danny,
> >>>>>>>
> >>>>>>> I can think of two ways to accomplish this.
> >>>>>>>
> >>>>>>> The first way is essentially what you've described. Allow the
> >>>>> framework
> >>>>>>> to
> >>>>>>> have a pre-defined set of options defined via config (drop the
> >>>>> message,
> >>>>>>> re-route to another topic, etc).
> >>>>>>>
> >>>>>>> The second way is to catch the serialization issues, and still pass
> >>>>> the
> >>>>>>> failed message to the StreamTask. The way in which the StreamTask
> >>>>> would
> >>>>>>> be
> >>>>>>> notified of the failure is up for debate. One option would be to
> >>>>> have a
> >>>>>>> ErrorTask interface, which has an onDeserializationError call back.
> >>>>> No
> >>>>>>> new
> >>>>>>> configuration would be required in this case--simply implementing
> >>>>>>>the
> >>>>>>> ErrorTask means you get notified of any serialization errors
> >>>>>>>(rather
> >>>>>>> than
> >>>>>>> failing the container, which would be the default). Another option
> >>>>> would
> >>>>>>> be to have the IncomingMessageEnvelope have an error flag, which we
> >>>>>>> could
> >>>>>>> use to denote the serialization failure.
> >>>>>>>
> >>>>>>> I like the second approach because it's more generic. Instead of
> >>>>>>> pre-defining exact behavior when a failure occurs, it seems more
> >>>>>>> flexible
> >>>>>>> to let the StreamTask know, and let the developer decide what the
> >>>>>>> appropriate action is. I hadn't even thought of the re-route case
> >>>>>>>you
> >>>>>>> brought up, and I'm sure there are many other possible actions that
> >>>>>>> we're
> >>>>>>> not thinking of right now. Of the second approach's potential
> >>>>>>> implementation options, I favor the ErrorTask approach right now,
> >>>>> but I
> >>>>>>> haven't dug into it too much.
> >>>>>>>
> >>>>>>> Regardless of which way we choose, I think the default should be to
> >>>>> fail
> >>>>>>> the container. This is the safest behavior, as it means there will
> >>>>> be no
> >>>>>>> data loss, and developers will be alerted of the serialization
> >>>>>>>error.
> >>>>>>>
> >>>>>>> As far as implementation goes, The four classes that are probably
> >>>>>>>of
> >>>>>>> most
> >>>>>>> interest to you are SamzaContainer, TaskInstance, StreamConsumers,
> >>>>> and
> >>>>>>> SerdeManager. You'll need to catch the serde exceptions somewhere
> >>>>>>>in
> >>>>> the
> >>>>>>> StreamConsumer or SerdeManager class, and implement your new logic
> >>>>>>> there.
> >>>>>>> I think the best approach is to read over these four classes, and
> >>>>> ask us
> >>>>>>> any questions you might have.
> >>>>>>>
> >>>>>>> Cheers,
> >>>>>>> Chris
> >>>>>>>
> >>>>>>> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]>
> >>>>> wrote:
> >>>>>>>
> >>>>>>>> We have discussed this, and it is something that we want to look
> >>>>> into.
> >>>>>>>>
> >>>>>>>> Do you have any thoughts on how to implement this feature?
> >>>>>>>> I assume you would want the failure behavior to be configurable.
> >>>>>>>>
> >>>>>>>> Like
> >>>>>>>> Drop the message,
> >>>>>>>> Send a message to a new queue, and drop.
> >>>>>>>> Fail the container (is that ever appropriate?)
> >>>>>>>> Anything else?
> >>>>>>>>
> >>>>>>>> I am not familiar with this code base.
> >>>>>>>> Do you have a suggestion on what classes I should be looking to
> >>>>> modify?
> >>>>>>>>
> >>>>>>>> Is there someone who I should bounce ideas off of?
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Thanks
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> Danny
> >>>>>>>>
> >>>>>>>>
> >>>>>>>>
> >>>>>>>> On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan <[email protected]>
> >>>>> wrote:
> >>>>>>>>
> >>>>>>>>> It's not intentional, error handling just hasn't been added yet.
> >>>>> If
> >>>>>>>>> you're
> >>>>>>>>> interested, we'd love to have the contribution.  In particular,
> >>>>> take
> >>>>>>> a
> >>>>>>>>> look
> >>>>>>>>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59),
> >>>>> which
> >>>>>>> also
> >>>>>>>>> touches on how serdes should handle error conditions.
> >>>>>>>>>
> >>>>>>>>> Thanks,
> >>>>>>>>> Jakob
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
> >>>>>>>>> <[email protected]>wrote:
> >>>>>>>>>
> >>>>>>>>>> I am currently using the JsonSerde/JsonSerdeFactory classes for
> >>>>>>>>> serializing
> >>>>>>>>>> kafka messages.
> >>>>>>>>>>
> >>>>>>>>>> I have noticed that if there is bad json input coming in through
> >>>>>>>>> kafka,
> >>>>>>>>> the
> >>>>>>>>>> samza container seems to crash.
> >>>>>>>>>>
> >>>>>>>>>> I was looking at JsonSerde.scala, which does not seem to have
> >>>>> any
> >>>>>>>>> error
> >>>>>>>>>> handling.
> >>>>>>>>>>
> >>>>>>>>>> So I was curious if this was intentional?
> >>>>>>>>>> Or if there is a different way to handle these types of input
> >>>>>>> errors?
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Thanks
> >>>>>>>>>>
> >>>>>>>>>>
> >>>>>>>>>> Danny
> >>>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>>>
> >>>>>
> >>>
> >>
> >
>
>

Reply via email to