[ https://issues.apache.org/jira/browse/METRON-322?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16084834#comment-16084834 ]
ASF GitHub Bot commented on METRON-322: --------------------------------------- Github user ottobackwards commented on a diff in the pull request: https://github.com/apache/metron/pull/481#discussion_r127087711 --- Diff: metron-platform/metron-writer/src/main/java/org/apache/metron/writer/bolt/BulkMessageWriterBolt.java --- @@ -92,24 +177,58 @@ public void prepare(Map stormConf, TopologyContext context, OutputCollector coll configurationTransformation = x -> x; } try { - bulkMessageWriter.init(stormConf, - context, - configurationTransformation.apply(new IndexingWriterConfiguration(bulkMessageWriter.getName(), - getConfigurations())) - ); + WriterConfiguration writerconf = configurationTransformation.apply( + new IndexingWriterConfiguration(bulkMessageWriter.getName(), getConfigurations())); + if (defaultBatchTimeout == 0) { + //This means getComponentConfiguration was never called to initialize defaultBatchTimeout, + //probably because we are in a unit test scenario. So calculate it here. + BatchTimeoutHelper timeoutHelper = new BatchTimeoutHelper(writerconf::getAllConfiguredTimeouts, batchTimeoutDivisor); + defaultBatchTimeout = timeoutHelper.getDefaultBatchTimeout(); + } + writerComponent.setDefaultBatchTimeout(defaultBatchTimeout); + bulkMessageWriter.init(stormConf, context, writerconf); } catch (Exception e) { throw new RuntimeException(e); } } + /** + * Used only for unit testing. + */ --- End diff -- let the intellisense fall where it may then > Global Batching and Flushing > ---------------------------- > > Key: METRON-322 > URL: https://issues.apache.org/jira/browse/METRON-322 > Project: Metron > Issue Type: Improvement > Reporter: Ajay Yadav > Assignee: Matt Foley > > All Writers and other bolts that maintain an internal "batch" queue, need to > have a timeout flush, to prevent messages from low-volume telemetries from > sitting in their queues indefinitely. Storm has a timeout value > (topology.message.timeout.secs) that prevents it from waiting for too long. > If the Writer does not process the queue before the timeout, then Storm > recycles the tuples through the topology. This has multiple undesirable > consequences, including data duplication and waste of compute resources. We > would like to be able to specify an interval after which the queues would > flush, even if the batch size is not met. > We will utilize the Storm Tick Tuple to trigger timeout flushing, following > the recommendations of the article at > http://hortonworks.com/blog/apache-storm-design-pattern-micro-batching/#CONCLUSION > Since every Writer processes its queue somewhat differently, every bolt that > has a "batchSize" parameter will be given a "batchTimeout" parameter too. It > will default to 1/2 the value of "topology.message.timeout.secs", as > recommended, and will ignore settings larger than the default, which could > cause failure to flush in time. In the Enrichment topology, where two > Writers may be placed one after the other (enrichment and threat intel), the > default timeout interval will be 1/4 the value of > "topology.message.timeout.secs". The default value of > "topology.message.timeout.secs" in Storm is 30 seconds. > In addition, Storm provides a limit on the number of pending messages that > have not been acked. If more than "topology.max.spout.pending" messages are > waiting in a topology, then Storm will recycle them through the topology. > However, the default value of "topology.max.spout.pending" is null, and if > set to non-null value, the user can manage the consequences by setting > batchSize limits appropriately. Having the timeout flush will also > ameliorate this issue. So we do not need to address > "topology.max.spout.pending" directly in this task. -- This message was sent by Atlassian JIRA (v6.4.14#64029)