Hi all,

For those who are interested, answers to the below questions have been posted 
in the jira, METRON-322.

 

Second, I am splitting the work into (1) implementing batchTimeout parameter 
for all queues that have batchSize parameter; (2) modifying each Writer to make 
use of the batchTimeout parameter.  The design for the first step is in 
https://issues.apache.org/jira/browse/METRON-516 .  Please have a look at it 
and comment if you will.

 

Thanks,

--Matt

 

From: Matt Foley <ma...@apache.org>
Date: Thursday, October 20, 2016 at 3:06 PM
To: <dev@metron.incubator.apache.org>
Subject: Re: [DISCUSS] Time-base flushing of all Writer queues

 

Design Questions for METRON-322

 

First, the following questions are open, and if you’re interested please give 
me your input:

You can respond here or in the Jira.

 

1.      Motivation: All bolts that do internal queuing or batching, should be 
able to flush on timeout, because they will all suffer from the problems cited. 
 
Question: Are there any queuing/batching bolts that do NOT use 
BulkMessageWriterBolt class?  
(Reason for asking: I would like to use BulkMessageWriterBolt class as my 
implementation bottleneck.  The code implies that it is used by all the 
Indexing bolts, and the HBase-like Enrichment bolts, but it’s hard to be sure.)
Note this does not refer to Storm’s queuing under-the-hood between bolts, only 
to Metron-specific batching inside of Metron-implemented bolts.

 

2.      Motivation: All topologies that have a positive value of 
“topology.message.timeout.secs” should also have a positive value of 
“topology.tick.tuple.freq.secs” for all queuing/batching bolts, to avoid 
spurious message failures due to timeout.
Question: Does any topology have TWO OR MORE qeueing bolts in series, in a 
single topo?  
(Reason for asking: If not, the default value of 
“topology.tick.tuple.freq.secs” should just be 1/2 of 
“topology.message.timeout.secs”, but if two such queues in series, then the 
default value of “topology.tick.tuple.freq.secs” should be 1/4 of 
“topology.message.timeout.secs”.) 

 

Design Proposal

 

Only Bolt objects can receive Tick Tuples.  They receive them from the 
SYSTEM_COMPONENT source component, via the SYSTEM_TICK_STREAM stream.  They are 
not structured like Metron message tuples.  “topology.tick.tuple.freq.secs” is 
a topology-level configuration parameter, but is specified by each bolt that 
wishes to receive periodic Tick Tuples.  If multiple bolts in the same topology 
specify different intervals, it appears that Storm uses the shortest interval 
for all recipients.

 

The BulkMessageWriterBolt appears to be the root class of all queuing/batching 
Bolts in Metron.  When BulkMessageWriterBolt receives a message tuple, it 
passes (sensorType, tuple, message, BulkMessageWriter, configuration) to the 
write() method of BulkWriterComponent, which owns the queue, and makes the 
decision whether to enqueue the message, or if the queue is full, to flush the 
queue by passing it to the BulkMessageWriter.  BulkWriterComponent actually 
owns a set of named queues, one for each allowed value of sensorType.  The 
underlying BulkMessageWriter may then write the different telemetries 
differently, per queue.

 

I propose to receive the Tick Tuple in BulkMessageWriterBolt, and not pass it 
to BulkWriterComponent::write().  Instead, will add a new method 
BulkWriterComponent::flushOlderThanSec(int interval, BulkMessageWriter, 
configuration).  This method will cause BulkWriterComponent to iterate over the 
set of queues, and flush each if needed.  As recommended here, I will not 
necessarily flush every queue, but only those that have not been flushed (due 
to batchSize or previous timeout-based flush) for at least interval seconds.

 

I also propose to be aware of changes in the configuration of 
“topology.tick.tuple.freq.secs”, and incorporate the new value in the next 
instance of a received Tick Tuple.

 

That’s about it.  The open questions again are:

1.      Are there any queuing/batching bolts that do NOT use 
BulkMessageWriterBolt class?

2.      Does any topology have TWO OR MORE qeueing bolts in series, in a single 
topo?

 

Thanks,

--Matt

 

From: Matt Foley <ma...@apache.org>
Date: Friday, October 14, 2016 at 6:32 PM
To: <dev@metron.incubator.apache.org>
Subject: [DISCUSS] Time-base flushing of all Writer queues

 

Hi all,

 

METRON-227 “Add Time-Based Flushing to Writer Bolt”, and 

METRON-322 “Global Batching & flushing”

have been dormant since July, but contain some very valuable ideas.  The basic 
idea is that Metron’s Writer queues in general will flush on queue size, but 
not on time.  As a result, low-traffic or bursty channels can languish 
unprocessed, and therefore un-ack’ed, which results in Storm automatically 
recycling the messages after a certain timeout (topology.message.timeout.secs), 
or if too many total pending messages accumulate in a topology 
(topology.max.spout.pending).  This results in duplicate messages and wasted 
computations, as well as unpredictable latency.

 

Storm now has a very nice, low-complexity solution for time-based flushing, 
using Tick Tuples.

I propose to use Tick Tuples to implement time-based flushing for all Writer 
queues that currently flush only on queue size.

I will do this work in the context of METRON-322, subsuming METRON-227 into it.

 

Per the recommendation of some members of the Storm implementation team, I will 
default the queue flush timeout (topology.tick.tuple.freq.secs) in each Writer 
to half the value of topology.message.timeout.secs (minus delta).  The default 
value of topology.message.timeout.secs is 30 seconds, so in many cases the 
queue flush times will be set to 14 seconds; but this will be configurable.

 

The reporter of METRON-322 was also concerned about “global” behavior of a 
topology, for instance the Enhancer topology with multiple telemetry-specific 
bolts in parallel.  If each individual bolt accumulates a number of un-ack’ed 
messages, the total across the whole topology can become large, and if 
topology.max.spout.pending is set, it may trigger.  However, the probability of 
this drops greatly if we implement a reasonable default for queue flush 
timeouts, and any remaining issue can be addressed by setting the bolt queue 
size limits, and the value of topology.max.spout.pending itself, appropriately. 
 Therefore, I will not at this time worry much about this “global” behavior, 
other than making sure that all Writers in the topology have queue flush 
timeouts.

 

Your thoughts, suggestions, and concerns are invited.

 

Thanks,

--Matt

 

 

Reply via email to