Hey Danny,

Yep, pretty much.

We should also have a counter metric to measure how many messages were
dropped, and a config setting to toggle the dropping behavior.

Logging is a little tricky. I'm kind of sensitive to spewing a whole bunch
of "we just dropped your message from system stream partition X" log lines
in the container logs. It's conceivable that folks might have a ton of bad
messages, and logging every one would slow down the processing, and also
fill the disk. Alternatively, it'd be really useful to know that messages
had been dropped. The best I can come up with is to set it at DEBUG, so
it's not verbose, but still there if people want it.

Cheers,
Chris

On 2/5/14 6:13 PM, "Danny Antonetti" <[email protected]> wrote:

>So it sounds like this would just be a try catch around this line
>in SystemConsumers.scala
>
>unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManage
>r.fromBytes(envelope))
>
>And just do some logging in the catch statement?
>
>
>On Wed, Feb 5, 2014 at 12:32 PM, Chris Riccomini
><[email protected]>wrote:
>
>> Hey Guys,
>>
>> Thinking on this more. Given that we don't have a good grasp on this,
>>and
>> it seems fairly complicated for now, I'm proposing that we hold off on
>>all
>> this complex work, and just support the ability to drop messages for
>>now.
>> This should be pretty straightforward to implement, and is generally
>> pretty useful, and not too invasive.
>>
>> Does that sound cool with folks? Danny?
>>
>> Cheers,
>> Chris
>>
>> On 2/5/14 11:43 AM, "Chris Riccomini" <[email protected]> wrote:
>>
>> >Hey Martin,
>> >
>> >I agree with you about ordering. It would be ideal to give the error()
>> >callback exactly when we would normally call process() on the message,
>>but
>> >can't because of the serde error. I think this is do-able if we tweak
>> >things in SystemConsumers a bit.
>> >
>> >The problem I'm having trouble resolving is how to handle the
>> >MessageChooser. The MessageChooser chooses the processing order between
>> >partitions ("I have a message from stream/partition X, and
>> >stream/partition Y, which one do I process next?"). These choosers
>> >occasionally look at the actual message payload (e.g. The timestamp
>>field
>> >of an incoming message) when doing the choosing. The chooser won't know
>> >how to handle an IncomingMessageEnvelope where the key and value are
>>both
>> >byte[], when it's expecting Avro, ProtoBuf, JSON, etc. One solution I
>>can
>> >think of would be to add an error() call to the MessageChooser
>>interface.
>> >The other solution would be to just give the MessageChooser everything,
>> >and in cases where it is looking at the message payload, it just has
>>to be
>> >careful. I think there might be other solutions here as well.
>> >
>> >More thought required.
>> >
>> >Cheers,
>> >Chris
>> >
>> >On 2/4/14 4:13 PM, "Martin Kleppmann" <[email protected]> wrote:
>> >
>> >>Ok -- I had originally misunderstood your ErrorTask proposal. The way
>>you
>> >>described it looks good to me. I agree that a single error method,
>>which
>> >>takes a generic error envelope object, is better than lots of
>>different
>> >>methods.
>> >>
>> >>Rather than
>> >>
>> >>if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())
>> >>
>> >>could we just say this?
>> >>
>> >>if (envelope.getError() instanceof DeserializationError)
>> >>
>> >>Regarding ordering: my instinctive reaction is that it would be least
>> >>surprising if the error is received at the same point in the message
>> >>sequence as the message would have been received, had it not been an
>> >>error. But that may need some more thinking.
>> >>
>> >>Cheers,
>> >>Martin
>> >>
>> >>On 4 Feb 2014, at 22:23, Chris Riccomini <[email protected]>
>> wrote:
>> >>> Hey Guys,
>> >>>
>> >>> One other thing to consider here, that didn't strike me at first is
>>how
>> >>>to
>> >>> handle ordering when a serialization error occurs.
>> >>>
>> >>> Right now, the SystemConsumers class reads messages in, buffers
>>them,
>> >>>and
>> >>> feeds them slowly to the MessageChooser. The MessageChooser takes
>>only
>> >>> IncomingMessageEnvelope right now. So the question is, if we have a
>> >>>serde
>> >>> error, when does the StreamTask see it? It shouldn't see it before
>> >>>other
>> >>> messages for the same stream partition, since this would mean we've
>> >>>broken
>> >>> the ordering of the messages. Arguably, the MessageChooser should
>>also
>> >>>get
>> >>> a chance to see it, to choose in what order to process it.
>> >>>
>> >>> One could make the argument, though, that a message that can't be
>> >>> deserialized is invalid, and you're looking at potential data loss
>> >>>anyway,
>> >>> so ordering doesn't matter. I'm not sure if this assumption is true
>>in
>> >>>all
>> >>> cases though.
>> >>>
>> >>> I don't really have a good answer for this at the moment. I think we
>> >>> should think about it.
>> >>>
>> >>> Cheers,
>> >>> Chris
>> >>>
>> >>> On 2/4/14 11:36 AM, "Danny Antonetti" <[email protected]>
>> >>>wrote:
>> >>>
>> >>>> OK Great, that looks good.
>> >>>>
>> >>>> I will look into this as I have time and get back to you with
>> >>>>questions.
>> >>>>
>> >>>>
>> >>>> Thanks for the help.
>> >>>>
>> >>>>
>> >>>> Danny
>> >>>>
>> >>>>
>> >>>> On Tue, Feb 4, 2014 at 11:29 AM, Chris Riccomini
>> >>>> <[email protected]>wrote:
>> >>>>
>> >>>>> Hey Guys,
>> >>>>>
>> >>>>> Here's a pseudo-code proposal for an error API.
>> >>>>>
>> >>>>> interface ErrorTask {
>> >>>>>  public void error(ErrorEnvelope envelope, MessageCollector
>> >>>>>collector,
>> >>>>> TaskCoordinator coordinator);
>> >>>>> }
>> >>>>>
>> >>>>> interface ErrorEnvelope {
>> >>>>>  ErrorCode getErrorCode();
>> >>>>>  Object getError();
>> >>>>> }
>> >>>>>
>> >>>>> class DeserializationError {
>> >>>>>  byte[] getKey();
>> >>>>>
>> >>>>>  byte[] getMessage();
>> >>>>>  boolean keyFailed();
>> >>>>>  boolean valueFailed();
>> >>>>> }
>> >>>>>
>> >>>>> Then you could implement something like:
>> >>>>>
>> >>>>> class MyStreamTask implements StreamTask, ErrorTask {
>> >>>>>  public void proces(Š) {
>> >>>>>    // do stuff
>> >>>>>  }
>> >>>>>
>> >>>>>  public void error(ErrorEnvelope envelope, MessageCollector
>> >>>>>collector,
>> >>>>> TaskCoordinator coordinator) {
>> >>>>>    if
>> >>>>>(DeserializationError.ERROR_CODE.equals(envelope.getErrorCode())
>> >>>>> {
>> >>>>>      DeserializationError error = (DeserializationError)
>> >>>>> envelope.getError();
>> >>>>>      collector.send("kafka", "bad-data", error.getKey(),
>> >>>>> error.getValue());
>> >>>>>    }
>> >>>>>  }
>> >>>>> }
>> >>>>>
>> >>>>> Arguably, we've just moved our big if-statement block from the
>> >>>>>process()
>> >>>>> method into the ErrorTask.error method, but the alternative, as I
>>see
>> >>>>> it,
>> >>>>> is to have on call back per error message. This moves us into a
>>state
>> >>>>> where every time we want to add a new callback, we break all
>>existing
>> >>>>> implementations (since they must now implement the new method, as
>> >>>>> well). I
>> >>>>> chose the single callback and generic object approach because it
>> >>>>>matches
>> >>>>> what our process method does when multiple input streams are
>>defined
>> >>>>>(if
>> >>>>> envelope is from stream A, do X, if envelope is from stream B, do
>>Y),
>> >>>>> but
>> >>>>> I'm open to suggestions if people have them.
>> >>>>>
>> >>>>> Regarding your JSON use case, yes, I think you'd be given the raw
>> >>>>>bytes
>> >>>>> for the key and message, and it'd be up to you to decide what to
>>do
>> >>>>>with
>> >>>>> them. Samza allows you to define systems with no serde. Systems
>> >>>>>without
>> >>>>> a
>> >>>>> serde default to byte[] for both key and value, which means that
>>you
>> >>>>> could
>> >>>>> define a system (or stream) with a pass-through serde, and send it
>> >>>>>the
>> >>>>> raw
>> >>>>> bytes for the key and message. Alternatively, you could try
>>decoding
>> >>>>>the
>> >>>>> data using some other serde, posting the message, to a DB, logging
>> >>>>>the
>> >>>>> message in Log4j, discarding the message, etc.
>> >>>>>
>> >>>>> Cheers,
>> >>>>> Chris
>> >>>>>
>> >>>>> On 2/4/14 10:48 AM, "Danny Antonetti" <[email protected]>
>> >>>>>wrote:
>> >>>>>
>> >>>>>> Hey Chris,
>> >>>>>>
>> >>>>>> I am not sure I understand your suggestion about the ErrorTask.
>> >>>>>>What
>> >>>>>> would
>> >>>>>> this new functions method signature be?  I would assume it would
>> >>>>>>take
>> >>>>> in
>> >>>>>> the byte[] from the fromBytes function.  It seems like that ties
>>the
>> >>>>> Serde
>> >>>>>> implementation to the StreamTask implementation.  Unless you are
>> >>>>>> suggesting
>> >>>>>> that it should be notified without giving the input byte[].
>> >>>>>>
>> >>>>>> In our case we are using the JsonSerde, so the byte[] for the
>>json
>> >>>>>>data
>> >>>>>> would be given to  onDeserializationError and then our task would
>> >>>>>>have
>> >>>>> to
>> >>>>>> decode the bytes?
>> >>>>>>
>> >>>>>> Just to clarify my suggestion, I was not thinking that we would
>>have
>> >>>>>>a
>> >>>>>> predefined set of behaviors.  I was thinking that I would have an
>> >>>>>> interface
>> >>>>>> (That would not be part of the StreamTask), maybe ErrorHandler.
>> >>>>>>With
>> >>>>> this
>> >>>>>> option there would be implementations of that interface for
>>Dropping
>> >>>>> the
>> >>>>>> message, redirect to a new Queue, or drop it.  But this would
>> >>>>>>require
>> >>>>>> extra
>> >>>>>> configuration as you mentioned.
>> >>>>>>
>> >>>>>> I am not invested in my approach, I just wanted to make sure
>>that I
>> >>>>>> understand the suggestions/options.
>> >>>>>>
>> >>>>>>
>> >>>>>> Thanks
>> >>>>>>
>> >>>>>>
>> >>>>>> Danny
>> >>>>>>
>> >>>>>>
>> >>>>>> On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini
>> >>>>>> <[email protected]>wrote:
>> >>>>>>
>> >>>>>>> Hey Danny,
>> >>>>>>>
>> >>>>>>> I can think of two ways to accomplish this.
>> >>>>>>>
>> >>>>>>> The first way is essentially what you've described. Allow the
>> >>>>> framework
>> >>>>>>> to
>> >>>>>>> have a pre-defined set of options defined via config (drop the
>> >>>>> message,
>> >>>>>>> re-route to another topic, etc).
>> >>>>>>>
>> >>>>>>> The second way is to catch the serialization issues, and still
>>pass
>> >>>>> the
>> >>>>>>> failed message to the StreamTask. The way in which the
>>StreamTask
>> >>>>> would
>> >>>>>>> be
>> >>>>>>> notified of the failure is up for debate. One option would be to
>> >>>>> have a
>> >>>>>>> ErrorTask interface, which has an onDeserializationError call
>>back.
>> >>>>> No
>> >>>>>>> new
>> >>>>>>> configuration would be required in this case--simply
>>implementing
>> >>>>>>>the
>> >>>>>>> ErrorTask means you get notified of any serialization errors
>> >>>>>>>(rather
>> >>>>>>> than
>> >>>>>>> failing the container, which would be the default). Another
>>option
>> >>>>> would
>> >>>>>>> be to have the IncomingMessageEnvelope have an error flag,
>>which we
>> >>>>>>> could
>> >>>>>>> use to denote the serialization failure.
>> >>>>>>>
>> >>>>>>> I like the second approach because it's more generic. Instead of
>> >>>>>>> pre-defining exact behavior when a failure occurs, it seems more
>> >>>>>>> flexible
>> >>>>>>> to let the StreamTask know, and let the developer decide what
>>the
>> >>>>>>> appropriate action is. I hadn't even thought of the re-route
>>case
>> >>>>>>>you
>> >>>>>>> brought up, and I'm sure there are many other possible actions
>>that
>> >>>>>>> we're
>> >>>>>>> not thinking of right now. Of the second approach's potential
>> >>>>>>> implementation options, I favor the ErrorTask approach right
>>now,
>> >>>>> but I
>> >>>>>>> haven't dug into it too much.
>> >>>>>>>
>> >>>>>>> Regardless of which way we choose, I think the default should
>>be to
>> >>>>> fail
>> >>>>>>> the container. This is the safest behavior, as it means there
>>will
>> >>>>> be no
>> >>>>>>> data loss, and developers will be alerted of the serialization
>> >>>>>>>error.
>> >>>>>>>
>> >>>>>>> As far as implementation goes, The four classes that are
>>probably
>> >>>>>>>of
>> >>>>>>> most
>> >>>>>>> interest to you are SamzaContainer, TaskInstance,
>>StreamConsumers,
>> >>>>> and
>> >>>>>>> SerdeManager. You'll need to catch the serde exceptions
>>somewhere
>> >>>>>>>in
>> >>>>> the
>> >>>>>>> StreamConsumer or SerdeManager class, and implement your new
>>logic
>> >>>>>>> there.
>> >>>>>>> I think the best approach is to read over these four classes,
>>and
>> >>>>> ask us
>> >>>>>>> any questions you might have.
>> >>>>>>>
>> >>>>>>> Cheers,
>> >>>>>>> Chris
>> >>>>>>>
>> >>>>>>> On 2/3/14 3:42 PM, "Danny Antonetti" <[email protected]>
>> >>>>> wrote:
>> >>>>>>>
>> >>>>>>>> We have discussed this, and it is something that we want to
>>look
>> >>>>> into.
>> >>>>>>>>
>> >>>>>>>> Do you have any thoughts on how to implement this feature?
>> >>>>>>>> I assume you would want the failure behavior to be
>>configurable.
>> >>>>>>>>
>> >>>>>>>> Like
>> >>>>>>>> Drop the message,
>> >>>>>>>> Send a message to a new queue, and drop.
>> >>>>>>>> Fail the container (is that ever appropriate?)
>> >>>>>>>> Anything else?
>> >>>>>>>>
>> >>>>>>>> I am not familiar with this code base.
>> >>>>>>>> Do you have a suggestion on what classes I should be looking to
>> >>>>> modify?
>> >>>>>>>>
>> >>>>>>>> Is there someone who I should bounce ideas off of?
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Thanks
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> Danny
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>>
>> >>>>>>>> On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan
>><[email protected]>
>> >>>>> wrote:
>> >>>>>>>>
>> >>>>>>>>> It's not intentional, error handling just hasn't been added
>>yet.
>> >>>>> If
>> >>>>>>>>> you're
>> >>>>>>>>> interested, we'd love to have the contribution.  In
>>particular,
>> >>>>> take
>> >>>>>>> a
>> >>>>>>>>> look
>> >>>>>>>>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59),
>> >>>>> which
>> >>>>>>> also
>> >>>>>>>>> touches on how serdes should handle error conditions.
>> >>>>>>>>>
>> >>>>>>>>> Thanks,
>> >>>>>>>>> Jakob
>> >>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti
>> >>>>>>>>> <[email protected]>wrote:
>> >>>>>>>>>
>> >>>>>>>>>> I am currently using the JsonSerde/JsonSerdeFactory classes
>>for
>> >>>>>>>>> serializing
>> >>>>>>>>>> kafka messages.
>> >>>>>>>>>>
>> >>>>>>>>>> I have noticed that if there is bad json input coming in
>>through
>> >>>>>>>>> kafka,
>> >>>>>>>>> the
>> >>>>>>>>>> samza container seems to crash.
>> >>>>>>>>>>
>> >>>>>>>>>> I was looking at JsonSerde.scala, which does not seem to have
>> >>>>> any
>> >>>>>>>>> error
>> >>>>>>>>>> handling.
>> >>>>>>>>>>
>> >>>>>>>>>> So I was curious if this was intentional?
>> >>>>>>>>>> Or if there is a different way to handle these types of input
>> >>>>>>> errors?
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Thanks
>> >>>>>>>>>>
>> >>>>>>>>>>
>> >>>>>>>>>> Danny
>> >>>>>>>>>>
>> >>>>>>>>>
>> >>>>>>>
>> >>>>>>>
>> >>>>>
>> >>>>>
>> >>>
>> >>
>> >
>>
>>

Reply via email to