Thanks Chris,
I have put up a review board here https://reviews.apache.org/r/17869/ I am new to scala/samza, but I tried to follow the style guide and other examples in the code. 2) I could not get the configuration working, and when I looked at the other Config classes they did not seem to be referenced anywhere except in an import. I am not sure what I was doing wrong there. 4/6) I have moved the deserialization before the updateFetchMap() function call, because I thought that was better than incrementing and then decrementing. If there is a reason I should not have done it that way I can fix it. Thanks Danny On Fri, Feb 7, 2014 at 9:56 AM, Chris Riccomini <[email protected]>wrote: > Hey Danny, > > Nice! A few comments. > > 1. Could you please use JIRA ( > https://issues.apache.org/jira/browse/SAMZA-59) review board to upload > your patch, and instigate discussion on the ticket? Have a look at > http://samza.incubator.apache.org/contribute/rules.html and > http://wiki.apache.org/samza/ContributorsCorner. > 2. Regarding enum, we use a config pattern instead. You should create > a SystemConsumersConfig object in org.apache.samza.config (see other > classes in there for examples). You should then use this class in > SamzaContainer to get the drop on failure setting when instantiating the > SystemConsumers object. Something like > config.getDropIncomingMessagesOnSerdeFailure. > 3. I don't think we need to distinguish between drop and log. A > boolean is fine. We should just debug() if drop is enabled. > 4. It's a safer to isolate the serdeManager.fromBytes(envelope) call > in the try/catch block, and move > the unprocessedMessages(envelope.getSystemStreamPartition).enqueue(…) call > outside of it, just so we don't actually catch object not found exceptions > when accessing the map, and treat them as serde exceptions. > 5. You are correct in assuming that you can add the param to the > SystemConsumers, and have SamzaContainer set it. > 6. I don't think the "add a null message" logic is the correct way to > handle your "queue empty" issue. This exception is being thrown because the > SystemConsumer's fetch map is getting out of sync with reality when we drop > a message. If you look at the poll() method, you'll see that we call > updateFetchMap(systemStreamPartition, -1), and then try to enqueue the > message. The updateFetchMap call is updating a count for the number of > messages in the SystemConsumers' buffers. If you then fail, the message > never actually goes into the buffer. You need to then go and update the > buffers AGAIN, this time incrementing the buffers by one, since the failed > message never actually made it into the buffer. Try > adding updateFetchMap(systemStreamPartition) to your catch clause, rather > than adding a null message. The logic would then be, "decrement the number > of messages we need to fetch by one since got a message and are adding it > to our buffer; add the message; if the message fails to add, increment the > number of messages we need to fetch by one, since we never actually added > our failed message." > > Cheers, > Chris > > From: Danny Antonetti <[email protected]> > Date: Thursday, February 6, 2014 5:49 PM > To: "[email protected]" <[email protected]>, > Chris Riccomini <[email protected]> > Subject: Re: JsonSerde error handling > > Hey Chris > > This is my first stab at a diff for this. > > I an new to scala and samza, and I am not clear on the best practices > here. > > > I used an enumeration for the flag, but I was not sure if that is what > you wanted or just a boolean. I have a metric that is incremented, and > support Drop, Fail, and Log. > > I could not find a good place to configure this flag, because there is > nothing else for this class that is configured from the property file (that > I could tell). > > I was assuming that I wanted to add the flag to the constructor which is > called in SamzaContainer.scala? > > > I had to add an enqueue with a blank Envelope or I was getting the > following Exception. > So since I have to enqueue anyway (Or am I missing something) > > Since I have to a new Envelope I am not sure what to put for the > Key/Message. > > Exception in thread "main" java.util.NoSuchElementException: queue empty > at scala.collection.mutable.Queue.dequeue(Queue.scala:47) > at > org.apache.samza.system.SystemConsumers$$anonfun$refresh$3.apply(SystemConsumers.scala:240) > at > org.apache.samza.system.SystemConsumers$$anonfun$refresh$3.apply(SystemConsumers.scala:237) > at scala.collection.immutable.Set$Set1.foreach(Set.scala:81) > at > org.apache.samza.system.SystemConsumers.refresh(SystemConsumers.scala:237) > at > org.apache.samza.system.SystemConsumers.choose(SystemConsumers.scala:226) > at > org.apache.samza.container.SamzaContainer.process(SamzaContainer.scala:556) > at > org.apache.samza.container.SamzaContainer.run(SamzaContainer.scala:457) > at > org.apache.samza.container.SamzaContainer$.main(SamzaContainer.scala:78) > at org.apache.samza.container.SamzaContainer.main(SamzaContainer.scala) > > > > > > On Wed, Feb 5, 2014 at 6:33 PM, Chris Riccomini > <[email protected]>wrote: > >> Hey Danny, >> >> Yep, pretty much. >> >> We should also have a counter metric to measure how many messages were >> dropped, and a config setting to toggle the dropping behavior. >> >> Logging is a little tricky. I'm kind of sensitive to spewing a whole bunch >> of "we just dropped your message from system stream partition X" log lines >> in the container logs. It's conceivable that folks might have a ton of bad >> messages, and logging every one would slow down the processing, and also >> fill the disk. Alternatively, it'd be really useful to know that messages >> had been dropped. The best I can come up with is to set it at DEBUG, so >> it's not verbose, but still there if people want it. >> >> Cheers, >> Chris >> >> On 2/5/14 6:13 PM, "Danny Antonetti" <[email protected]> wrote: >> >> >So it sounds like this would just be a try catch around this line >> >in SystemConsumers.scala >> > >> >> >unprocessedMessages(envelope.getSystemStreamPartition).enqueue(serdeManage >> >r.fromBytes(envelope)) >> > >> >And just do some logging in the catch statement? >> > >> > >> >On Wed, Feb 5, 2014 at 12:32 PM, Chris Riccomini >> ><[email protected]>wrote: >> > >> >> Hey Guys, >> >> >> >> Thinking on this more. Given that we don't have a good grasp on this, >> >>and >> >> it seems fairly complicated for now, I'm proposing that we hold off on >> >>all >> >> this complex work, and just support the ability to drop messages for >> >>now. >> >> This should be pretty straightforward to implement, and is generally >> >> pretty useful, and not too invasive. >> >> >> >> Does that sound cool with folks? Danny? >> >> >> >> Cheers, >> >> Chris >> >> >> >> On 2/5/14 11:43 AM, "Chris Riccomini" <[email protected]> wrote: >> >> >> >> >Hey Martin, >> >> > >> >> >I agree with you about ordering. It would be ideal to give the error() >> >> >callback exactly when we would normally call process() on the message, >> >>but >> >> >can't because of the serde error. I think this is do-able if we tweak >> >> >things in SystemConsumers a bit. >> >> > >> >> >The problem I'm having trouble resolving is how to handle the >> >> >MessageChooser. The MessageChooser chooses the processing order >> between >> >> >partitions ("I have a message from stream/partition X, and >> >> >stream/partition Y, which one do I process next?"). These choosers >> >> >occasionally look at the actual message payload (e.g. The timestamp >> >>field >> >> >of an incoming message) when doing the choosing. The chooser won't >> know >> >> >how to handle an IncomingMessageEnvelope where the key and value are >> >>both >> >> >byte[], when it's expecting Avro, ProtoBuf, JSON, etc. One solution I >> >>can >> >> >think of would be to add an error() call to the MessageChooser >> >>interface. >> >> >The other solution would be to just give the MessageChooser >> everything, >> >> >and in cases where it is looking at the message payload, it just has >> >>to be >> >> >careful. I think there might be other solutions here as well. >> >> > >> >> >More thought required. >> >> > >> >> >Cheers, >> >> >Chris >> >> > >> >> >On 2/4/14 4:13 PM, "Martin Kleppmann" <[email protected]> >> wrote: >> >> > >> >> >>Ok -- I had originally misunderstood your ErrorTask proposal. The way >> >>you >> >> >>described it looks good to me. I agree that a single error method, >> >>which >> >> >>takes a generic error envelope object, is better than lots of >> >>different >> >> >>methods. >> >> >> >> >> >>Rather than >> >> >> >> >> >>if (DeserializationError.ERROR_CODE.equals(envelope.getErrorCode()) >> >> >> >> >> >>could we just say this? >> >> >> >> >> >>if (envelope.getError() instanceof DeserializationError) >> >> >> >> >> >>Regarding ordering: my instinctive reaction is that it would be least >> >> >>surprising if the error is received at the same point in the message >> >> >>sequence as the message would have been received, had it not been an >> >> >>error. But that may need some more thinking. >> >> >> >> >> >>Cheers, >> >> >>Martin >> >> >> >> >> >>On 4 Feb 2014, at 22:23, Chris Riccomini <[email protected]> >> >> wrote: >> >> >>> Hey Guys, >> >> >>> >> >> >>> One other thing to consider here, that didn't strike me at first is >> >>how >> >> >>>to >> >> >>> handle ordering when a serialization error occurs. >> >> >>> >> >> >>> Right now, the SystemConsumers class reads messages in, buffers >> >>them, >> >> >>>and >> >> >>> feeds them slowly to the MessageChooser. The MessageChooser takes >> >>only >> >> >>> IncomingMessageEnvelope right now. So the question is, if we have a >> >> >>>serde >> >> >>> error, when does the StreamTask see it? It shouldn't see it before >> >> >>>other >> >> >>> messages for the same stream partition, since this would mean we've >> >> >>>broken >> >> >>> the ordering of the messages. Arguably, the MessageChooser should >> >>also >> >> >>>get >> >> >>> a chance to see it, to choose in what order to process it. >> >> >>> >> >> >>> One could make the argument, though, that a message that can't be >> >> >>> deserialized is invalid, and you're looking at potential data loss >> >> >>>anyway, >> >> >>> so ordering doesn't matter. I'm not sure if this assumption is true >> >>in >> >> >>>all >> >> >>> cases though. >> >> >>> >> >> >>> I don't really have a good answer for this at the moment. I think >> we >> >> >>> should think about it. >> >> >>> >> >> >>> Cheers, >> >> >>> Chris >> >> >>> >> >> >>> On 2/4/14 11:36 AM, "Danny Antonetti" <[email protected]> >> >> >>>wrote: >> >> >>> >> >> >>>> OK Great, that looks good. >> >> >>>> >> >> >>>> I will look into this as I have time and get back to you with >> >> >>>>questions. >> >> >>>> >> >> >>>> >> >> >>>> Thanks for the help. >> >> >>>> >> >> >>>> >> >> >>>> Danny >> >> >>>> >> >> >>>> >> >> >>>> On Tue, Feb 4, 2014 at 11:29 AM, Chris Riccomini >> >> >>>> <[email protected]>wrote: >> >> >>>> >> >> >>>>> Hey Guys, >> >> >>>>> >> >> >>>>> Here's a pseudo-code proposal for an error API. >> >> >>>>> >> >> >>>>> interface ErrorTask { >> >> >>>>> public void error(ErrorEnvelope envelope, MessageCollector >> >> >>>>>collector, >> >> >>>>> TaskCoordinator coordinator); >> >> >>>>> } >> >> >>>>> >> >> >>>>> interface ErrorEnvelope { >> >> >>>>> ErrorCode getErrorCode(); >> >> >>>>> Object getError(); >> >> >>>>> } >> >> >>>>> >> >> >>>>> class DeserializationError { >> >> >>>>> byte[] getKey(); >> >> >>>>> >> >> >>>>> byte[] getMessage(); >> >> >>>>> boolean keyFailed(); >> >> >>>>> boolean valueFailed(); >> >> >>>>> } >> >> >>>>> >> >> >>>>> Then you could implement something like: >> >> >>>>> >> >> >>>>> class MyStreamTask implements StreamTask, ErrorTask { >> >> >>>>> public void proces(Š) { >> >> >>>>> // do stuff >> >> >>>>> } >> >> >>>>> >> >> >>>>> public void error(ErrorEnvelope envelope, MessageCollector >> >> >>>>>collector, >> >> >>>>> TaskCoordinator coordinator) { >> >> >>>>> if >> >> >>>>>(DeserializationError.ERROR_CODE.equals(envelope.getErrorCode()) >> >> >>>>> { >> >> >>>>> DeserializationError error = (DeserializationError) >> >> >>>>> envelope.getError(); >> >> >>>>> collector.send("kafka", "bad-data", error.getKey(), >> >> >>>>> error.getValue()); >> >> >>>>> } >> >> >>>>> } >> >> >>>>> } >> >> >>>>> >> >> >>>>> Arguably, we've just moved our big if-statement block from the >> >> >>>>>process() >> >> >>>>> method into the ErrorTask.error method, but the alternative, as I >> >>see >> >> >>>>> it, >> >> >>>>> is to have on call back per error message. This moves us into a >> >>state >> >> >>>>> where every time we want to add a new callback, we break all >> >>existing >> >> >>>>> implementations (since they must now implement the new method, as >> >> >>>>> well). I >> >> >>>>> chose the single callback and generic object approach because it >> >> >>>>>matches >> >> >>>>> what our process method does when multiple input streams are >> >>defined >> >> >>>>>(if >> >> >>>>> envelope is from stream A, do X, if envelope is from stream B, do >> >>Y), >> >> >>>>> but >> >> >>>>> I'm open to suggestions if people have them. >> >> >>>>> >> >> >>>>> Regarding your JSON use case, yes, I think you'd be given the raw >> >> >>>>>bytes >> >> >>>>> for the key and message, and it'd be up to you to decide what to >> >>do >> >> >>>>>with >> >> >>>>> them. Samza allows you to define systems with no serde. Systems >> >> >>>>>without >> >> >>>>> a >> >> >>>>> serde default to byte[] for both key and value, which means that >> >>you >> >> >>>>> could >> >> >>>>> define a system (or stream) with a pass-through serde, and send >> it >> >> >>>>>the >> >> >>>>> raw >> >> >>>>> bytes for the key and message. Alternatively, you could try >> >>decoding >> >> >>>>>the >> >> >>>>> data using some other serde, posting the message, to a DB, >> logging >> >> >>>>>the >> >> >>>>> message in Log4j, discarding the message, etc. >> >> >>>>> >> >> >>>>> Cheers, >> >> >>>>> Chris >> >> >>>>> >> >> >>>>> On 2/4/14 10:48 AM, "Danny Antonetti" <[email protected] >> > >> >> >>>>>wrote: >> >> >>>>> >> >> >>>>>> Hey Chris, >> >> >>>>>> >> >> >>>>>> I am not sure I understand your suggestion about the ErrorTask. >> >> >>>>>>What >> >> >>>>>> would >> >> >>>>>> this new functions method signature be? I would assume it would >> >> >>>>>>take >> >> >>>>> in >> >> >>>>>> the byte[] from the fromBytes function. It seems like that ties >> >>the >> >> >>>>> Serde >> >> >>>>>> implementation to the StreamTask implementation. Unless you are >> >> >>>>>> suggesting >> >> >>>>>> that it should be notified without giving the input byte[]. >> >> >>>>>> >> >> >>>>>> In our case we are using the JsonSerde, so the byte[] for the >> >>json >> >> >>>>>>data >> >> >>>>>> would be given to onDeserializationError and then our task >> would >> >> >>>>>>have >> >> >>>>> to >> >> >>>>>> decode the bytes? >> >> >>>>>> >> >> >>>>>> Just to clarify my suggestion, I was not thinking that we would >> >>have >> >> >>>>>>a >> >> >>>>>> predefined set of behaviors. I was thinking that I would have >> an >> >> >>>>>> interface >> >> >>>>>> (That would not be part of the StreamTask), maybe ErrorHandler. >> >> >>>>>>With >> >> >>>>> this >> >> >>>>>> option there would be implementations of that interface for >> >>Dropping >> >> >>>>> the >> >> >>>>>> message, redirect to a new Queue, or drop it. But this would >> >> >>>>>>require >> >> >>>>>> extra >> >> >>>>>> configuration as you mentioned. >> >> >>>>>> >> >> >>>>>> I am not invested in my approach, I just wanted to make sure >> >>that I >> >> >>>>>> understand the suggestions/options. >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> Thanks >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> Danny >> >> >>>>>> >> >> >>>>>> >> >> >>>>>> On Tue, Feb 4, 2014 at 9:46 AM, Chris Riccomini >> >> >>>>>> <[email protected]>wrote: >> >> >>>>>> >> >> >>>>>>> Hey Danny, >> >> >>>>>>> >> >> >>>>>>> I can think of two ways to accomplish this. >> >> >>>>>>> >> >> >>>>>>> The first way is essentially what you've described. Allow the >> >> >>>>> framework >> >> >>>>>>> to >> >> >>>>>>> have a pre-defined set of options defined via config (drop the >> >> >>>>> message, >> >> >>>>>>> re-route to another topic, etc). >> >> >>>>>>> >> >> >>>>>>> The second way is to catch the serialization issues, and still >> >>pass >> >> >>>>> the >> >> >>>>>>> failed message to the StreamTask. The way in which the >> >>StreamTask >> >> >>>>> would >> >> >>>>>>> be >> >> >>>>>>> notified of the failure is up for debate. One option would be >> to >> >> >>>>> have a >> >> >>>>>>> ErrorTask interface, which has an onDeserializationError call >> >>back. >> >> >>>>> No >> >> >>>>>>> new >> >> >>>>>>> configuration would be required in this case--simply >> >>implementing >> >> >>>>>>>the >> >> >>>>>>> ErrorTask means you get notified of any serialization errors >> >> >>>>>>>(rather >> >> >>>>>>> than >> >> >>>>>>> failing the container, which would be the default). Another >> >>option >> >> >>>>> would >> >> >>>>>>> be to have the IncomingMessageEnvelope have an error flag, >> >>which we >> >> >>>>>>> could >> >> >>>>>>> use to denote the serialization failure. >> >> >>>>>>> >> >> >>>>>>> I like the second approach because it's more generic. Instead >> of >> >> >>>>>>> pre-defining exact behavior when a failure occurs, it seems >> more >> >> >>>>>>> flexible >> >> >>>>>>> to let the StreamTask know, and let the developer decide what >> >>the >> >> >>>>>>> appropriate action is. I hadn't even thought of the re-route >> >>case >> >> >>>>>>>you >> >> >>>>>>> brought up, and I'm sure there are many other possible actions >> >>that >> >> >>>>>>> we're >> >> >>>>>>> not thinking of right now. Of the second approach's potential >> >> >>>>>>> implementation options, I favor the ErrorTask approach right >> >>now, >> >> >>>>> but I >> >> >>>>>>> haven't dug into it too much. >> >> >>>>>>> >> >> >>>>>>> Regardless of which way we choose, I think the default should >> >>be to >> >> >>>>> fail >> >> >>>>>>> the container. This is the safest behavior, as it means there >> >>will >> >> >>>>> be no >> >> >>>>>>> data loss, and developers will be alerted of the serialization >> >> >>>>>>>error. >> >> >>>>>>> >> >> >>>>>>> As far as implementation goes, The four classes that are >> >>probably >> >> >>>>>>>of >> >> >>>>>>> most >> >> >>>>>>> interest to you are SamzaContainer, TaskInstance, >> >>StreamConsumers, >> >> >>>>> and >> >> >>>>>>> SerdeManager. You'll need to catch the serde exceptions >> >>somewhere >> >> >>>>>>>in >> >> >>>>> the >> >> >>>>>>> StreamConsumer or SerdeManager class, and implement your new >> >>logic >> >> >>>>>>> there. >> >> >>>>>>> I think the best approach is to read over these four classes, >> >>and >> >> >>>>> ask us >> >> >>>>>>> any questions you might have. >> >> >>>>>>> >> >> >>>>>>> Cheers, >> >> >>>>>>> Chris >> >> >>>>>>> >> >> >>>>>>> On 2/3/14 3:42 PM, "Danny Antonetti" < >> [email protected]> >> >> >>>>> wrote: >> >> >>>>>>> >> >> >>>>>>>> We have discussed this, and it is something that we want to >> >>look >> >> >>>>> into. >> >> >>>>>>>> >> >> >>>>>>>> Do you have any thoughts on how to implement this feature? >> >> >>>>>>>> I assume you would want the failure behavior to be >> >>configurable. >> >> >>>>>>>> >> >> >>>>>>>> Like >> >> >>>>>>>> Drop the message, >> >> >>>>>>>> Send a message to a new queue, and drop. >> >> >>>>>>>> Fail the container (is that ever appropriate?) >> >> >>>>>>>> Anything else? >> >> >>>>>>>> >> >> >>>>>>>> I am not familiar with this code base. >> >> >>>>>>>> Do you have a suggestion on what classes I should be looking >> to >> >> >>>>> modify? >> >> >>>>>>>> >> >> >>>>>>>> Is there someone who I should bounce ideas off of? >> >> >>>>>>>> >> >> >>>>>>>> >> >> >>>>>>>> Thanks >> >> >>>>>>>> >> >> >>>>>>>> >> >> >>>>>>>> Danny >> >> >>>>>>>> >> >> >>>>>>>> >> >> >>>>>>>> >> >> >>>>>>>> On Tue, Jan 21, 2014 at 9:52 PM, Jakob Homan >> >><[email protected]> >> >> >>>>> wrote: >> >> >>>>>>>> >> >> >>>>>>>>> It's not intentional, error handling just hasn't been added >> >>yet. >> >> >>>>> If >> >> >>>>>>>>> you're >> >> >>>>>>>>> interested, we'd love to have the contribution. In >> >>particular, >> >> >>>>> take >> >> >>>>>>> a >> >> >>>>>>>>> look >> >> >>>>>>>>> at SAMZA-59 (https://issues.apache.org/jira/browse/SAMZA-59 >> ), >> >> >>>>> which >> >> >>>>>>> also >> >> >>>>>>>>> touches on how serdes should handle error conditions. >> >> >>>>>>>>> >> >> >>>>>>>>> Thanks, >> >> >>>>>>>>> Jakob >> >> >>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>>>> On Tue, Jan 21, 2014 at 6:14 PM, Danny Antonetti >> >> >>>>>>>>> <[email protected]>wrote: >> >> >>>>>>>>> >> >> >>>>>>>>>> I am currently using the JsonSerde/JsonSerdeFactory classes >> >>for >> >> >>>>>>>>> serializing >> >> >>>>>>>>>> kafka messages. >> >> >>>>>>>>>> >> >> >>>>>>>>>> I have noticed that if there is bad json input coming in >> >>through >> >> >>>>>>>>> kafka, >> >> >>>>>>>>> the >> >> >>>>>>>>>> samza container seems to crash. >> >> >>>>>>>>>> >> >> >>>>>>>>>> I was looking at JsonSerde.scala, which does not seem to >> have >> >> >>>>> any >> >> >>>>>>>>> error >> >> >>>>>>>>>> handling. >> >> >>>>>>>>>> >> >> >>>>>>>>>> So I was curious if this was intentional? >> >> >>>>>>>>>> Or if there is a different way to handle these types of >> input >> >> >>>>>>> errors? >> >> >>>>>>>>>> >> >> >>>>>>>>>> >> >> >>>>>>>>>> Thanks >> >> >>>>>>>>>> >> >> >>>>>>>>>> >> >> >>>>>>>>>> Danny >> >> >>>>>>>>>> >> >> >>>>>>>>> >> >> >>>>>>> >> >> >>>>>>> >> >> >>>>> >> >> >>>>> >> >> >>> >> >> >> >> >> > >> >> >> >> >> >> >
