Design Questions for METRON-322
First, the following questions are open, and if you’re interested please give me your input: You can respond here or in the Jira. 1. Motivation: All bolts that do internal queuing or batching, should be able to flush on timeout, because they will all suffer from the problems cited. Question: Are there any queuing/batching bolts that do NOT use BulkMessageWriterBolt class? (Reason for asking: I would like to use BulkMessageWriterBolt class as my implementation bottleneck. The code implies that it is used by all the Indexing bolts, and the HBase-like Enrichment bolts, but it’s hard to be sure.) Note this does not refer to Storm’s queuing under-the-hood between bolts, only to Metron-specific batching inside of Metron-implemented bolts. 2. Motivation: All topologies that have a positive value of “topology.message.timeout.secs” should also have a positive value of “topology.tick.tuple.freq.secs” for all queuing/batching bolts, to avoid spurious message failures due to timeout. Question: Does any topology have TWO OR MORE qeueing bolts in series, in a single topo? (Reason for asking: If not, the default value of “topology.tick.tuple.freq.secs” should just be 1/2 of “topology.message.timeout.secs”, but if two such queues in series, then the default value of “topology.tick.tuple.freq.secs” should be 1/4 of “topology.message.timeout.secs”.) Design Proposal Only Bolt objects can receive Tick Tuples. They receive them from the SYSTEM_COMPONENT source component, via the SYSTEM_TICK_STREAM stream. They are not structured like Metron message tuples. “topology.tick.tuple.freq.secs” is a topology-level configuration parameter, but is specified by each bolt that wishes to receive periodic Tick Tuples. If multiple bolts in the same topology specify different intervals, it appears that Storm uses the shortest interval for all recipients. The BulkMessageWriterBolt appears to be the root class of all queuing/batching Bolts in Metron. When BulkMessageWriterBolt receives a message tuple, it passes (sensorType, tuple, message, BulkMessageWriter, configuration) to the write() method of BulkWriterComponent, which owns the queue, and makes the decision whether to enqueue the message, or if the queue is full, to flush the queue by passing it to the BulkMessageWriter. BulkWriterComponent actually owns a set of named queues, one for each allowed value of sensorType. The underlying BulkMessageWriter may then write the different telemetries differently, per queue. I propose to receive the Tick Tuple in BulkMessageWriterBolt, and not pass it to BulkWriterComponent::write(). Instead, will add a new method BulkWriterComponent::flushOlderThanSec(int interval, BulkMessageWriter, configuration). This method will cause BulkWriterComponent to iterate over the set of queues, and flush each if needed. As recommended here, I will not necessarily flush every queue, but only those that have not been flushed (due to batchSize or previous timeout-based flush) for at least interval seconds. I also propose to be aware of changes in the configuration of “topology.tick.tuple.freq.secs”, and incorporate the new value in the next instance of a received Tick Tuple. That’s about it. The open questions again are: 1. Are there any queuing/batching bolts that do NOT use BulkMessageWriterBolt class? 2. Does any topology have TWO OR MORE qeueing bolts in series, in a single topo? Thanks, --Matt From: Matt Foley <ma...@apache.org> Date: Friday, October 14, 2016 at 6:32 PM To: <dev@metron.incubator.apache.org> Subject: [DISCUSS] Time-base flushing of all Writer queues Hi all, METRON-227 “Add Time-Based Flushing to Writer Bolt”, and METRON-322 “Global Batching & flushing” have been dormant since July, but contain some very valuable ideas. The basic idea is that Metron’s Writer queues in general will flush on queue size, but not on time. As a result, low-traffic or bursty channels can languish unprocessed, and therefore un-ack’ed, which results in Storm automatically recycling the messages after a certain timeout (topology.message.timeout.secs), or if too many total pending messages accumulate in a topology (topology.max.spout.pending). This results in duplicate messages and wasted computations, as well as unpredictable latency. Storm now has a very nice, low-complexity solution for time-based flushing, using Tick Tuples. I propose to use Tick Tuples to implement time-based flushing for all Writer queues that currently flush only on queue size. I will do this work in the context of METRON-322, subsuming METRON-227 into it. Per the recommendation of some members of the Storm implementation team, I will default the queue flush timeout (topology.tick.tuple.freq.secs) in each Writer to half the value of topology.message.timeout.secs (minus delta). The default value of topology.message.timeout.secs is 30 seconds, so in many cases the queue flush times will be set to 14 seconds; but this will be configurable. The reporter of METRON-322 was also concerned about “global” behavior of a topology, for instance the Enhancer topology with multiple telemetry-specific bolts in parallel. If each individual bolt accumulates a number of un-ack’ed messages, the total across the whole topology can become large, and if topology.max.spout.pending is set, it may trigger. However, the probability of this drops greatly if we implement a reasonable default for queue flush timeouts, and any remaining issue can be addressed by setting the bolt queue size limits, and the value of topology.max.spout.pending itself, appropriately. Therefore, I will not at this time worry much about this “global” behavior, other than making sure that all Writers in the topology have queue flush timeouts. Your thoughts, suggestions, and concerns are invited. Thanks, --Matt