Github user omkreddy commented on a diff in the pull request:
https://github.com/apache/storm/pull/2480#discussion_r158720967
--- Diff:
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
---
@@ -125,12 +129,22 @@ public void open(Map conf, TopologyContext context,
SpoutOutputCollector collect
}
refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS,
kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
- offsetManagers = new HashMap<>();
+ offsetManagers = new ConcurrentHashMap<>();
emitted = new HashSet<>();
waitingToEmit = Collections.emptyListIterator();
tupleListener.open(conf, context);
+ context.registerMetric("kafkaOffset", new IMetric() {
--- End diff --
Removed the wrapper metric.
---