Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2399#discussion_r148327705
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java
 ---
    @@ -17,44 +17,102 @@
      */
     package org.apache.storm.messaging;
     
    +import org.apache.storm.Config;
     import org.apache.storm.daemon.worker.WorkerState;
    +import org.apache.storm.metric.api.IMetric;
    +import org.apache.storm.serialization.KryoTupleDeserializer;
     import org.apache.storm.task.GeneralTopologyContext;
     import org.apache.storm.tuple.AddressedTuple;
    -import org.apache.storm.serialization.KryoTupleDeserializer;
    +import org.apache.storm.tuple.Tuple;
     
     import java.util.ArrayList;
    +import java.util.HashMap;
     import java.util.List;
     import java.util.Map;
    +import java.util.concurrent.ConcurrentHashMap;
    +import java.util.concurrent.atomic.AtomicLong;
    +
     
     /**
      * A class that is called when a TaskMessage arrives.
      */
    -public class DeserializingConnectionCallback implements 
IConnectionCallback {
    +public class DeserializingConnectionCallback implements 
IConnectionCallback, IMetric {
         private final WorkerState.ILocalTransferCallback _cb;
         private final Map _conf;
         private final GeneralTopologyContext _context;
    +
         private final ThreadLocal<KryoTupleDeserializer> _des =
    -         new ThreadLocal<KryoTupleDeserializer>() {
    -             @Override
    -             protected KryoTupleDeserializer initialValue() {
    -                 return new KryoTupleDeserializer(_conf, _context);
    -             }
    -         };
    -
    -    public DeserializingConnectionCallback(final Map<String, Object> conf, 
final GeneralTopologyContext context, WorkerState.ILocalTransferCallback 
callback) {
    +        new ThreadLocal<KryoTupleDeserializer>() {
    +            @Override
    +            protected KryoTupleDeserializer initialValue() {
    +                return new KryoTupleDeserializer(_conf, _context);
    +            }
    +        };
    +
    +    // Track serialized size of messages
    +    private final boolean _sizeMetricsEnabled;
    +    private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new 
ConcurrentHashMap<>();
    --- End diff --
    
    I know this fits the convention we have been using for member variables in 
this file, but it would be good to follow the new style guides and remove the 
'_'.  If you want to update the other member variables in the file too that 
would be great.


---

Reply via email to