[ 
https://issues.apache.org/jira/browse/STORM-737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14553139#comment-14553139
 ] 

ASF GitHub Bot commented on STORM-737:
--------------------------------------

Github user HeartSaVioR commented on a diff in the pull request:

    https://github.com/apache/storm/pull/521#discussion_r30750199
  
    --- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---
    @@ -23,40 +23,62 @@
     
     import backtype.storm.messaging.IConnection;
     import backtype.storm.messaging.TaskMessage;
    +import com.google.common.collect.Maps;
     
     public class TransferDrainer {
     
    -  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new 
HashMap();
    +  private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = 
new HashMap();
       
    -  public void add(HashMap<String, ArrayList<TaskMessage>> 
workerTupleSetMap) {
    -    for (String key : workerTupleSetMap.keySet()) {
    -      
    -      ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
    -      if (null == bundle) {
    -        bundle = new ArrayList<ArrayList<TaskMessage>>();
    -        bundles.put(key, bundle);
    -      }
    -      
    -      ArrayList tupleSet = workerTupleSetMap.get(key);
    -      if (null != tupleSet && tupleSet.size() > 0) {
    -        bundle.add(tupleSet);
    -      }
    -    } 
    +  public void add(HashMap<Integer, ArrayList<TaskMessage>> 
taskTupleSetMap) {
    +    for (Integer task : taskTupleSetMap.keySet()) {
    +      addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
    +    }
       }
       
    -  public void send(HashMap<String, IConnection> connections) {
    -    for (String hostPort : bundles.keySet()) {
    -      IConnection connection = connections.get(hostPort);
    -      if (null != connection) { 
    -        ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
    -        Iterator<TaskMessage> iter = getBundleIterator(bundle);
    -        if (null != iter && iter.hasNext()) {
    -          connection.send(iter);
    +  public void send(HashMap<Integer, String> taskToNode, HashMap<String, 
IConnection> connections) {
    +    HashMap<String, ArrayList<ArrayList<TaskMessage>>> 
bundleMapByDestination = groupBundleByDestination(taskToNode);
    +
    +    for (String hostPort : bundleMapByDestination.keySet()) {
    +      if (hostPort != null) {
    +        IConnection connection = connections.get(hostPort);
    +        if (null != connection) {
    +          ArrayList<ArrayList<TaskMessage>> bundle = 
bundleMapByDestination.get(hostPort);
    +          Iterator<TaskMessage> iter = getBundleIterator(bundle);
    +          if (null != iter && iter.hasNext()) {
    +            connection.send(iter);
    +          }
             }
           }
    -    } 
    +    }
       }
    -  
    +
    +  private HashMap<String, ArrayList<ArrayList<TaskMessage>>> 
groupBundleByDestination(HashMap<Integer, String> taskToNode) {
    +    HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundleMap = 
Maps.newHashMap();
    +    for (Integer task : this.bundles.keySet()) {
    +      String hostPort = taskToNode.get(task);
    +      if (hostPort != null) {
    +        for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) {
    +          addListRefToMap(bundleMap, hostPort, chunk);
    +        }
    +      }
    +    }
    +    return bundleMap;
    +  }
    +
    +  private <T> void addListRefToMap(HashMap<T, 
ArrayList<ArrayList<TaskMessage>>> bundles,
    --- End diff --
    
    @d2r OK, it's better than shadowing fields. Thanks!


> Workers may try to send to closed connections
> ---------------------------------------------
>
>                 Key: STORM-737
>                 URL: https://issues.apache.org/jira/browse/STORM-737
>             Project: Apache Storm
>          Issue Type: Bug
>    Affects Versions: 0.9.2-incubating
>            Reporter: Derek Dagit
>
> There is a race condition in the worker code that can allow for a send() to 
> be called on a closed connection.
> [Discussion|https://github.com/apache/storm/pull/349#issuecomment-87778672]
> The assignment mapping from task -> node+port needs to be read and used in 
> the read lock when sending, so that an accurate mapping is used that does not 
> include any connections that are closed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to