Totally on board with everybody's comments above this point.

Jon

On Fri, Jan 18, 2019, 6:07 PM Michael Miklavcic <michael.miklav...@gmail.com>
wrote:

> Thanks for the write up, Ryan. I had to touch on some of this when
> refactoring the kafka writer away from the async model so we could
> guarantee delivery. We had potential to drop messages before that change
> because of the async producer calls, which would ack the Storm tuple as
> soon as the writer returned.
>
>    - https://github.com/apache/metron/pull/1045
>
> We'll want to talk about these fixes/updates in context of our message
> delivery semantics, both in Storm and Kafka. As it currently stands, we do
> NOT use Storm Trident, which means we have at-least-once message processing
> in Storm. There is an inherent possibility that we will publish duplicate
> messages in some instances. From a Kafka perspective, we have the same
> issue. As of Kafka 0.11.0, they provide a way to get exactly-once
> semantics, but I'm not sure we've done much to explicitly achieve that.
>
>    - https://kafka.apache.org/10/documentation.html#semantics
>
> From a Kafka delivery guarantee perspective, it appears we're currently
> setting # required acks to 1 by default. This means we get commit
> confirmation as soon as the leader has written the message to its local
> log. In this case should the leader fail immediately after acknowledging
> the record but before the followers have replicated it then the record will
> be lost. We could investigate settings acks=all or acks=-1, but this would
> be a tradeoff in performance for us.
>
>    -
>
> https://github.com/apache/metron/blob/341960b91f8fe742d5cf947633b7edd2275587d5/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L87
>    - https://kafka.apache.org/10/documentation/#producerconfigs
>
> Per the KafkaProducer documentation, the flush() command will wait until
> all messages are batched and sent, and will return with either success
> (acked) or an error. "A request is considered completed when it is
> successfully acknowledged according to the acks configuration you have
> specified or else it results in an error."
>
>    -
>
> https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/KafkaProducer.html
>
> With this combination of factors, I believe we can continue to guarantee
> at-least-once semantics in the writer, regardless of batch size. To your
> point about not passing 2 separate lists, I suggest that we modify the API
> by passing in something like Map<Tuple, List<JSONObject>> so that the
> tuples always get acked with respect to their messages. This way we can
> avoid the tuple-message batch boundary problem by ensuring we only ack a
> tuple when all associated messages are successfully written to Kafka.
>
> Best,
> Mike
>
>
> On Fri, Jan 18, 2019 at 1:31 PM Otto Fowler <ottobackwa...@gmail.com>
> wrote:
>
> > Agreed
> >
> >
> > On January 18, 2019 at 14:52:32, Ryan Merriman (merrim...@gmail.com)
> > wrote:
> >
> > I am on board with that. In that case, I think it's even more important
> > that we get the Writer interfaces right.
> >
> > On Fri, Jan 18, 2019 at 1:34 PM Otto Fowler <ottobackwa...@gmail.com>
> > wrote:
> >
> > > I think that the writers should be loaded as, and act as extension
> > points,
> > > such that it is possible to have 3rd party writers, and would structure
> > > them as such.
> > >
> > >
> > >
> > > On January 18, 2019 at 13:55:00, Ryan Merriman (merrim...@gmail.com)
> > > wrote:
> > >
> > > Recently there was a bug reported by a user where a parser that emits
> > > multiple messages from a single tuple doesn't work correctly:
> > > https://issues.apache.org/jira/browse/METRON-1968. This has exposed a
> > > problem with how the writer classes work.
> > >
> > > The fundamental issue is this: the writer classes operate under the
> > > assumption that there is a 1 to 1 mapping between tuples and messages
> to
> > > be
> > > written. A couple of examples:
> > >
> > > KafkaWriter
> > > <
> > >
> >
> >
> https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java#L236
> > >
> >
> > >
> > > -
> > > This class writes messages by iterating through the list of tuples and
> > > fetching the message with the same index. This is the cause of the Jira
> > > above. We could iterate through the message list instead but then we
> > don't
> > > know which tuples have been fully processed. It would be possible for a
> > > batch to be flushed before all messages from a tuple are passed to the
> > > writer.
> > >
> > > BulkWriterComponent
> > > <
> > >
> >
> >
> https://github.com/apache/metron/blob/master/metron-platform/metron-writer/src/main/java/org/apache/metron/writer/BulkWriterComponent.java#L250
> > >
> >
> > >
> > > - The tuple list size is used to determine when a batch should be
> > flushed.
> > > While inherently incorrect in my opinion (should be message list size),
> > > this also causes an issue where only the first message from the last
> > tuple
> > > in a batch is written.
> > >
> > > I do not believe there are easy fixes to these problems. There is no
> way
> > > to properly store the relationship between tuples and messages to be
> > > written with the current BulkMessageWriter interface and
> > > BulkWriterResponse
> > > class. If we did have a way, how should we handle partial failures? If
> > > multiple messages are parsed from a tuple but only half of them are
> > > written
> > > successfully, what should happen? Should we replay the tuple? Should we
> > > just report the failed messages and continue on? I think it may be a
> good
> > > time to review our writer classes and consider a refactor. Do others
> > > agree? Are there easy fixes I'm missing?
> > >
> > > Assuming there is interest in refactoring, I will throw out some ideas
> > for
> > > consideration. For those not as familiar with the writer classes, they
> > are
> > > organized as follows (in order from lowest to highest level):
> > >
> > > Writers - These classes do the actual writing and implement the
> > > BulkMessageWriter or MessageWriter interfaces. There are 6
> > implementations
> > > I can see including KafkaWriter, SolrWriter, ElasticsearchWriter,
> > > HdfsWriter, etc. There is also an implementation that adapts a
> > > MessageWriter to a BulkMessageWriter (WriterToBulkWriter). The result
> of
> > a
> > > writing operation is a BulkWriterResponse containing a list of either
> > > successful or failed tuples.
> > >
> > > Writer Containers - This includes the BulkWriterComponent and
> > > WriterHandler
> > > classes. These are responsible for batching and flushing messages,
> > > handling errors and acking tuples.
> > >
> > > Bolts - This includes ParserBolt, WriterBolt and BulkMessageWriterBolt.
> > > These classes implement the Storm Bolt interfaces, setup
> > > writers/components
> > > and execute tuples.
> > >
> > > I think the first step is to reevaluate the separation of concerns for
> > > these classes. Here is how I would change from what we currently have:
> > >
> > > Writers - These classes should only be concerned with writing messages
> > and
> > > reporting what happened. They would also manage the lifecycle and
> > > configuration of the underlying client libraries as they do now.
> Instead
> > > of accepting 2 separate lists, they should accept a data structure that
> > > accurately represents the relationship between tuples and messages.
> > >
> > > Writer Containers - These classes would continue to handling batching
> and
> > > flushing but would only report the results of a flush rather than
> > actually
> > > doing the acking or error handling.
> > >
> > > Bolts - These would now be responsible for acking and error reporting
> on
> > > tuples. They would transform a tuple into something the Writer
> Containers
> > > can accept as input.
> > >
> > > I think working through this and adjusting the contracts between the
> > > different layers will be necessary to fix the bugs described above.
> While
> > > we're at it I think there are other improvements we could also make:
> > >
> > > Decouple Storm - It would be beneficial to remove the dependency on
> > tuples
> > > in our writers and writer containers. We could replace this with a
> simple
> > > abstraction (an id would probably work fine). This will allow us to
> more
> > > easily port Metron to other streaming platforms.
> > >
> > > Remove MessageWriter Interface - This is not being actively used as far
> > as
> > > I can tell. Is that true? Removing this will make our code simpler and
> > > easier to follow (WriterHandler and WriterToBulkWriter classes can
> > > probably
> > > go away). I don't see any reason future writers, even those without
> bulk
> > > writing capabilities, could not fit into the BulkMessageWriter
> interface.
> > > A writer could either iterate through messages and write one at a time
> or
> > > throw an exception. As far as I know, the writer interfaces are not
> > > something we advertise as extension points. Is that true?
> > >
> > > Consolidate our BulkMessageWriterBolt and WriterBolt classes - Is there
> > > any
> > > reason we need both?
> > >
> > > I'll add another item to the list that I consider absolutely necessary:
> > we
> > > need better tests. None of our integration tests or unit tests catch
> > these
> > > bugs.
> > >
> > > This is a complex issue and there is a lot of information to process. I
> > > realize there are upgrade complications that may come with some of
> these
> > > and probably other things I haven't thought of. I will pause here and
> > wait
> > > for feedback or provide more clarification if needed. In summary, here
> is
> > > the feedback I'm requesting:
> > >
> > > - We have a problem with our writers. Is there an easy fix or should we
> > > consider a broader refactor?
> > > - How should partial failures be handled when multiple messages are
> > > produced from a single tuple? This could be tricky because we might not
> > > know there were failures until after some messages have already been
> > > written.
> > > - If we do decide to reevaluate our writer classes, what should the
> > > separate of concerns be?
> > > - Do we want to include other changes that may be optional but could
> > > improve our code? Some of these may even make the refactor easier.
> > >
> > > If someone does have an easy fix, we can work through that next.
> > Otherwise
> > > we can go further into details and work on designing how the interfaces
> > > should look after we make some high level decisions. From there I think
> > > we'll have a clear picture of how a refactor would look. Thanks in
> > advance
> > > for your input.
> > >
> > > Ryan
> > >
> > >
> >
>
-- 

Jon Zeolla

Reply via email to