Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2203#discussion_r155642325
--- Diff: storm-core/src/jvm/org/apache/storm/utils/DisruptorQueue.java ---
@@ -418,12 +430,19 @@ public DisruptorQueue(String queueName, ProducerType
type, int size, long readTi
_barrier = _buffer.newBarrier();
_buffer.addGatingSequences(_consumer);
_metrics = new QueueMetrics();
+ _disruptorMetrics =
StormMetricRegistry.disruptorMetrics(_queueName, topologyId, componentId, port);
//The batch size can be no larger than half the full queue size.
//This is mostly to avoid contention issues.
_inputBatchSize = Math.max(1, Math.min(inputBatchSize, size/2));
_flusher = new Flusher(Math.max(flushInterval, 1), _queueName);
_flusher.start();
+ METRICS_TIMER.schedule(new TimerTask(){
--- End diff --
Won't this leak the task if the queue is shut down?
---