Github user omkreddy commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2480#discussion_r158720967
  
    --- Diff: 
external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
 ---
    @@ -125,12 +129,22 @@ public void open(Map conf, TopologyContext context, 
SpoutOutputCollector collect
             }
             refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, 
kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
     
    -        offsetManagers = new HashMap<>();
    +        offsetManagers = new ConcurrentHashMap<>();
             emitted = new HashSet<>();
             waitingToEmit = Collections.emptyListIterator();
     
             tupleListener.open(conf, context);
     
    +        context.registerMetric("kafkaOffset", new IMetric() {
    --- End diff --
    
    Removed the wrapper metric.


---

Reply via email to