Recently there was a bug reported by a user where a parser that emits multiple messages from a single tuple doesn't work correctly: https://issues.apache.org/jira/browse/METRON-1968. This has exposed a problem with how the writer classes work.
The fundamental issue is this: the writer classes operate under the assumption that there is a 1 to 1 mapping between tuples and messages to be written. A couple of examples: KafkaWriter <https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L236> - This class writes messages by iterating through the list of tuples and fetching the message with the same index. This is the cause of the Jira above. We could iterate through the message list instead but then we don't know which tuples have been fully processed. It would be possible for a batch to be flushed before all messages from a tuple are passed to the writer. BulkWriterComponent <https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java#L250> - The tuple list size is used to determine when a batch should be flushed. While inherently incorrect in my opinion (should be message list size), this also causes an issue where only the first message from the last tuple in a batch is written. I do not believe there are easy fixes to these problems. There is no way to properly store the relationship between tuples and messages to be written with the current BulkMessageWriter interface and BulkWriterResponse class. If we did have a way, how should we handle partial failures? If multiple messages are parsed from a tuple but only half of them are written successfully, what should happen? Should we replay the tuple? Should we just report the failed messages and continue on? I think it may be a good time to review our writer classes and consider a refactor. Do others agree? Are there easy fixes I'm missing? Assuming there is interest in refactoring, I will throw out some ideas for consideration. For those not as familiar with the writer classes, they are organized as follows (in order from lowest to highest level): Writers - These classes do the actual writing and implement the BulkMessageWriter or MessageWriter interfaces. There are 6 implementations I can see including KafkaWriter, SolrWriter, ElasticsearchWriter, HdfsWriter, etc. There is also an implementation that adapts a MessageWriter to a BulkMessageWriter (WriterToBulkWriter). The result of a writing operation is a BulkWriterResponse containing a list of either successful or failed tuples. Writer Containers - This includes the BulkWriterComponent and WriterHandler classes. These are responsible for batching and flushing messages, handling errors and acking tuples. Bolts - This includes ParserBolt, WriterBolt and BulkMessageWriterBolt. These classes implement the Storm Bolt interfaces, setup writers/components and execute tuples. I think the first step is to reevaluate the separation of concerns for these classes. Here is how I would change from what we currently have: Writers - These classes should only be concerned with writing messages and reporting what happened. They would also manage the lifecycle and configuration of the underlying client libraries as they do now. Instead of accepting 2 separate lists, they should accept a data structure that accurately represents the relationship between tuples and messages. Writer Containers - These classes would continue to handling batching and flushing but would only report the results of a flush rather than actually doing the acking or error handling. Bolts - These would now be responsible for acking and error reporting on tuples. They would transform a tuple into something the Writer Containers can accept as input. I think working through this and adjusting the contracts between the different layers will be necessary to fix the bugs described above. While we're at it I think there are other improvements we could also make: Decouple Storm - It would be beneficial to remove the dependency on tuples in our writers and writer containers. We could replace this with a simple abstraction (an id would probably work fine). This will allow us to more easily port Metron to other streaming platforms. Remove MessageWriter Interface - This is not being actively used as far as I can tell. Is that true? Removing this will make our code simpler and easier to follow (WriterHandler and WriterToBulkWriter classes can probably go away). I don't see any reason future writers, even those without bulk writing capabilities, could not fit into the BulkMessageWriter interface. A writer could either iterate through messages and write one at a time or throw an exception. As far as I know, the writer interfaces are not something we advertise as extension points. Is that true? Consolidate our BulkMessageWriterBolt and WriterBolt classes - Is there any reason we need both? I'll add another item to the list that I consider absolutely necessary: we need better tests. None of our integration tests or unit tests catch these bugs. This is a complex issue and there is a lot of information to process. I realize there are upgrade complications that may come with some of these and probably other things I haven't thought of. I will pause here and wait for feedback or provide more clarification if needed. In summary, here is the feedback I'm requesting: - We have a problem with our writers. Is there an easy fix or should we consider a broader refactor? - How should partial failures be handled when multiple messages are produced from a single tuple? This could be tricky because we might not know there were failures until after some messages have already been written. - If we do decide to reevaluate our writer classes, what should the separate of concerns be? - Do we want to include other changes that may be optional but could improve our code? Some of these may even make the refactor easier. If someone does have an easy fix, we can work through that next. Otherwise we can go further into details and work on designing how the interfaces should look after we make some high level decisions. From there I think we'll have a clear picture of how a refactor would look. Thanks in advance for your input. Ryan