[ https://issues.apache.org/jira/browse/STORM-737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553139#comment-14553139 ]
ASF GitHub Bot commented on STORM-737: -------------------------------------- Github user HeartSaVioR commented on a diff in the pull request: https://github.com/apache/storm/pull/521#discussion_r30750199 --- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java --- @@ -23,40 +23,62 @@ import backtype.storm.messaging.IConnection; import backtype.storm.messaging.TaskMessage; +import com.google.common.collect.Maps; public class TransferDrainer { - private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap(); + private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap(); - public void add(HashMap<String, ArrayList<TaskMessage>> workerTupleSetMap) { - for (String key : workerTupleSetMap.keySet()) { - - ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key); - if (null == bundle) { - bundle = new ArrayList<ArrayList<TaskMessage>>(); - bundles.put(key, bundle); - } - - ArrayList tupleSet = workerTupleSetMap.get(key); - if (null != tupleSet && tupleSet.size() > 0) { - bundle.add(tupleSet); - } - } + public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) { + for (Integer task : taskTupleSetMap.keySet()) { + addListRefToMap(this.bundles, task, taskTupleSetMap.get(task)); + } } - public void send(HashMap<String, IConnection> connections) { - for (String hostPort : bundles.keySet()) { - IConnection connection = connections.get(hostPort); - if (null != connection) { - ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort); - Iterator<TaskMessage> iter = getBundleIterator(bundle); - if (null != iter && iter.hasNext()) { - connection.send(iter); + public void send(HashMap<Integer, String> taskToNode, HashMap<String, IConnection> connections) { + HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode); + + for (String hostPort : bundleMapByDestination.keySet()) { + if (hostPort != null) { + IConnection connection = connections.get(hostPort); + if (null != connection) { + ArrayList<ArrayList<TaskMessage>> bundle = bundleMapByDestination.get(hostPort); + Iterator<TaskMessage> iter = getBundleIterator(bundle); + if (null != iter && iter.hasNext()) { + connection.send(iter); + } } } - } + } } - + + private HashMap<String, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(HashMap<Integer, String> taskToNode) { + HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMap = Maps.newHashMap(); + for (Integer task : this.bundles.keySet()) { + String hostPort = taskToNode.get(task); + if (hostPort != null) { + for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) { + addListRefToMap(bundleMap, hostPort, chunk); + } + } + } + return bundleMap; + } + + private <T> void addListRefToMap(HashMap<T, ArrayList<ArrayList<TaskMessage>>> bundles, --- End diff -- @d2r OK, it's better than shadowing fields. Thanks! > Workers may try to send to closed connections > --------------------------------------------- > > Key: STORM-737 > URL: https://issues.apache.org/jira/browse/STORM-737 > Project: Apache Storm > Issue Type: Bug > Affects Versions: 0.9.2-incubating > Reporter: Derek Dagit > > There is a race condition in the worker code that can allow for a send() to > be called on a closed connection. > [Discussion|https://github.com/apache/storm/pull/349#issuecomment-87778672] > The assignment mapping from task -> node+port needs to be read and used in > the read lock when sending, so that an accurate mapping is used that does not > include any connections that are closed. -- This message was sent by Atlassian JIRA (v6.3.4#6332)