Github user revans2 commented on a diff in the pull request: https://github.com/apache/storm/pull/2399#discussion_r148328743 --- Diff: storm-client/src/jvm/org/apache/storm/messaging/DeserializingConnectionCallback.java --- @@ -17,44 +17,102 @@ */ package org.apache.storm.messaging; +import org.apache.storm.Config; import org.apache.storm.daemon.worker.WorkerState; +import org.apache.storm.metric.api.IMetric; +import org.apache.storm.serialization.KryoTupleDeserializer; import org.apache.storm.task.GeneralTopologyContext; import org.apache.storm.tuple.AddressedTuple; -import org.apache.storm.serialization.KryoTupleDeserializer; +import org.apache.storm.tuple.Tuple; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + /** * A class that is called when a TaskMessage arrives. */ -public class DeserializingConnectionCallback implements IConnectionCallback { +public class DeserializingConnectionCallback implements IConnectionCallback, IMetric { private final WorkerState.ILocalTransferCallback _cb; private final Map _conf; private final GeneralTopologyContext _context; + private final ThreadLocal<KryoTupleDeserializer> _des = - new ThreadLocal<KryoTupleDeserializer>() { - @Override - protected KryoTupleDeserializer initialValue() { - return new KryoTupleDeserializer(_conf, _context); - } - }; - - public DeserializingConnectionCallback(final Map<String, Object> conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { + new ThreadLocal<KryoTupleDeserializer>() { + @Override + protected KryoTupleDeserializer initialValue() { + return new KryoTupleDeserializer(_conf, _context); + } + }; + + // Track serialized size of messages + private final boolean _sizeMetricsEnabled; + private final ConcurrentHashMap<String, AtomicLong> _byteCounts = new ConcurrentHashMap<>(); + + + public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) { _conf = conf; _context = context; _cb = callback; + _sizeMetricsEnabled = _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS) instanceof Boolean && + (Boolean) _conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS); + } @Override public void recv(List<TaskMessage> batch) { KryoTupleDeserializer des = _des.get(); ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size()); for (TaskMessage message: batch) { - ret.add(new AddressedTuple(message.task(), des.deserialize(message.message()))); + Tuple tuple = des.deserialize(message.message()); + AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple); + updateMetrics(tuple.getSourceTask(), message); + ret.add(addrTuple); } _cb.transfer(ret); } + @Override + public Object getValueAndReset() { + HashMap<String, Long> outMap = new HashMap<>(); + + if (_sizeMetricsEnabled) { // Possible race conditions --- End diff -- Where is the race and what are we doing to mitigate it? The comment is a bit confusing.
---