Totally on board with everybody's comments above this point. Jon
On Fri, Jan 18, 2019, 6:07 PM Michael Miklavcic <michael.miklav...@gmail.com> wrote: > Thanks for the write up, Ryan. I had to touch on some of this when > refactoring the kafka writer away from the async model so we could > guarantee delivery. We had potential to drop messages before that change > because of the async producer calls, which would ack the Storm tuple as > soon as the writer returned. > > - https://github.com/apache/metron/pull/1045 > > We'll want to talk about these fixes/updates in context of our message > delivery semantics, both in Storm and Kafka. As it currently stands, we do > NOT use Storm Trident, which means we have at-least-once message processing > in Storm. There is an inherent possibility that we will publish duplicate > messages in some instances. From a Kafka perspective, we have the same > issue. As of Kafka 0.11.0, they provide a way to get exactly-once > semantics, but I'm not sure we've done much to explicitly achieve that. > > - https://kafka.apache.org/10/documentation.html#semantics > > From a Kafka delivery guarantee perspective, it appears we're currently > setting # required acks to 1 by default. This means we get commit > confirmation as soon as the leader has written the message to its local > log. In this case should the leader fail immediately after acknowledging > the record but before the followers have replicated it then the record will > be lost. We could investigate settings acks=all or acks=-1, but this would > be a tradeoff in performance for us. > > - > > https://github.com/apache/metron/blob/341960b91f8fe742d5cf947633b7edd2275587d5/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L87 > - https://kafka.apache.org/10/documentation/#producerconfigs > > Per the KafkaProducer documentation, the flush() command will wait until > all messages are batched and sent, and will return with either success > (acked) or an error. "A request is considered completed when it is > successfully acknowledged according to the acks configuration you have > specified or else it results in an error." > > - > > https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html > > With this combination of factors, I believe we can continue to guarantee > at-least-once semantics in the writer, regardless of batch size. To your > point about not passing 2 separate lists, I suggest that we modify the API > by passing in something like Map<Tuple, List<JSONObject>> so that the > tuples always get acked with respect to their messages. This way we can > avoid the tuple-message batch boundary problem by ensuring we only ack a > tuple when all associated messages are successfully written to Kafka. > > Best, > Mike > > > On Fri, Jan 18, 2019 at 1:31 PM Otto Fowler <ottobackwa...@gmail.com> > wrote: > > > Agreed > > > > > > On January 18, 2019 at 14:52:32, Ryan Merriman (merrim...@gmail.com) > > wrote: > > > > I am on board with that. In that case, I think it's even more important > > that we get the Writer interfaces right. > > > > On Fri, Jan 18, 2019 at 1:34 PM Otto Fowler <ottobackwa...@gmail.com> > > wrote: > > > > > I think that the writers should be loaded as, and act as extension > > points, > > > such that it is possible to have 3rd party writers, and would structure > > > them as such. > > > > > > > > > > > > On January 18, 2019 at 13:55:00, Ryan Merriman (merrim...@gmail.com) > > > wrote: > > > > > > Recently there was a bug reported by a user where a parser that emits > > > multiple messages from a single tuple doesn't work correctly: > > > https://issues.apache.org/jira/browse/METRON-1968. This has exposed a > > > problem with how the writer classes work. > > > > > > The fundamental issue is this: the writer classes operate under the > > > assumption that there is a 1 to 1 mapping between tuples and messages > to > > > be > > > written. A couple of examples: > > > > > > KafkaWriter > > > < > > > > > > > > https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L236 > > > > > > > > > > > - > > > This class writes messages by iterating through the list of tuples and > > > fetching the message with the same index. This is the cause of the Jira > > > above. We could iterate through the message list instead but then we > > don't > > > know which tuples have been fully processed. It would be possible for a > > > batch to be flushed before all messages from a tuple are passed to the > > > writer. > > > > > > BulkWriterComponent > > > < > > > > > > > > https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java#L250 > > > > > > > > > > > - The tuple list size is used to determine when a batch should be > > flushed. > > > While inherently incorrect in my opinion (should be message list size), > > > this also causes an issue where only the first message from the last > > tuple > > > in a batch is written. > > > > > > I do not believe there are easy fixes to these problems. There is no > way > > > to properly store the relationship between tuples and messages to be > > > written with the current BulkMessageWriter interface and > > > BulkWriterResponse > > > class. If we did have a way, how should we handle partial failures? If > > > multiple messages are parsed from a tuple but only half of them are > > > written > > > successfully, what should happen? Should we replay the tuple? Should we > > > just report the failed messages and continue on? I think it may be a > good > > > time to review our writer classes and consider a refactor. Do others > > > agree? Are there easy fixes I'm missing? > > > > > > Assuming there is interest in refactoring, I will throw out some ideas > > for > > > consideration. For those not as familiar with the writer classes, they > > are > > > organized as follows (in order from lowest to highest level): > > > > > > Writers - These classes do the actual writing and implement the > > > BulkMessageWriter or MessageWriter interfaces. There are 6 > > implementations > > > I can see including KafkaWriter, SolrWriter, ElasticsearchWriter, > > > HdfsWriter, etc. There is also an implementation that adapts a > > > MessageWriter to a BulkMessageWriter (WriterToBulkWriter). The result > of > > a > > > writing operation is a BulkWriterResponse containing a list of either > > > successful or failed tuples. > > > > > > Writer Containers - This includes the BulkWriterComponent and > > > WriterHandler > > > classes. These are responsible for batching and flushing messages, > > > handling errors and acking tuples. > > > > > > Bolts - This includes ParserBolt, WriterBolt and BulkMessageWriterBolt. > > > These classes implement the Storm Bolt interfaces, setup > > > writers/components > > > and execute tuples. > > > > > > I think the first step is to reevaluate the separation of concerns for > > > these classes. Here is how I would change from what we currently have: > > > > > > Writers - These classes should only be concerned with writing messages > > and > > > reporting what happened. They would also manage the lifecycle and > > > configuration of the underlying client libraries as they do now. > Instead > > > of accepting 2 separate lists, they should accept a data structure that > > > accurately represents the relationship between tuples and messages. > > > > > > Writer Containers - These classes would continue to handling batching > and > > > flushing but would only report the results of a flush rather than > > actually > > > doing the acking or error handling. > > > > > > Bolts - These would now be responsible for acking and error reporting > on > > > tuples. They would transform a tuple into something the Writer > Containers > > > can accept as input. > > > > > > I think working through this and adjusting the contracts between the > > > different layers will be necessary to fix the bugs described above. > While > > > we're at it I think there are other improvements we could also make: > > > > > > Decouple Storm - It would be beneficial to remove the dependency on > > tuples > > > in our writers and writer containers. We could replace this with a > simple > > > abstraction (an id would probably work fine). This will allow us to > more > > > easily port Metron to other streaming platforms. > > > > > > Remove MessageWriter Interface - This is not being actively used as far > > as > > > I can tell. Is that true? Removing this will make our code simpler and > > > easier to follow (WriterHandler and WriterToBulkWriter classes can > > > probably > > > go away). I don't see any reason future writers, even those without > bulk > > > writing capabilities, could not fit into the BulkMessageWriter > interface. > > > A writer could either iterate through messages and write one at a time > or > > > throw an exception. As far as I know, the writer interfaces are not > > > something we advertise as extension points. Is that true? > > > > > > Consolidate our BulkMessageWriterBolt and WriterBolt classes - Is there > > > any > > > reason we need both? > > > > > > I'll add another item to the list that I consider absolutely necessary: > > we > > > need better tests. None of our integration tests or unit tests catch > > these > > > bugs. > > > > > > This is a complex issue and there is a lot of information to process. I > > > realize there are upgrade complications that may come with some of > these > > > and probably other things I haven't thought of. I will pause here and > > wait > > > for feedback or provide more clarification if needed. In summary, here > is > > > the feedback I'm requesting: > > > > > > - We have a problem with our writers. Is there an easy fix or should we > > > consider a broader refactor? > > > - How should partial failures be handled when multiple messages are > > > produced from a single tuple? This could be tricky because we might not > > > know there were failures until after some messages have already been > > > written. > > > - If we do decide to reevaluate our writer classes, what should the > > > separate of concerns be? > > > - Do we want to include other changes that may be optional but could > > > improve our code? Some of these may even make the refactor easier. > > > > > > If someone does have an easy fix, we can work through that next. > > Otherwise > > > we can go further into details and work on designing how the interfaces > > > should look after we make some high level decisions. From there I think > > > we'll have a clear picture of how a refactor would look. Thanks in > > advance > > > for your input. > > > > > > Ryan > > > > > > > > > -- Jon Zeolla