[
https://issues.apache.org/jira/browse/STORM-737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14548601#comment-14548601
]
ASF GitHub Bot commented on STORM-737:
--------------------------------------
Github user d2r commented on a diff in the pull request:
https://github.com/apache/storm/pull/521#discussion_r30539591
--- Diff: storm-core/src/jvm/backtype/storm/utils/TransferDrainer.java ---
@@ -23,40 +23,62 @@
import backtype.storm.messaging.IConnection;
import backtype.storm.messaging.TaskMessage;
+import com.google.common.collect.Maps;
public class TransferDrainer {
- private HashMap<String, ArrayList<ArrayList<TaskMessage>>> bundles = new
HashMap();
+ private HashMap<Integer, ArrayList<ArrayList<TaskMessage>>> bundles =
new HashMap();
- public void add(HashMap<String, ArrayList<TaskMessage>>
workerTupleSetMap) {
- for (String key : workerTupleSetMap.keySet()) {
-
- ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(key);
- if (null == bundle) {
- bundle = new ArrayList<ArrayList<TaskMessage>>();
- bundles.put(key, bundle);
- }
-
- ArrayList tupleSet = workerTupleSetMap.get(key);
- if (null != tupleSet && tupleSet.size() > 0) {
- bundle.add(tupleSet);
- }
- }
+ public void add(HashMap<Integer, ArrayList<TaskMessage>>
taskTupleSetMap) {
+ for (Integer task : taskTupleSetMap.keySet()) {
+ addListRefToMap(this.bundles, task, taskTupleSetMap.get(task));
+ }
}
- public void send(HashMap<String, IConnection> connections) {
- for (String hostPort : bundles.keySet()) {
- IConnection connection = connections.get(hostPort);
- if (null != connection) {
- ArrayList<ArrayList<TaskMessage>> bundle = bundles.get(hostPort);
- Iterator<TaskMessage> iter = getBundleIterator(bundle);
- if (null != iter && iter.hasNext()) {
- connection.send(iter);
+ public void send(HashMap<Integer, String> taskToNode, HashMap<String,
IConnection> connections) {
+ HashMap<String, ArrayList<ArrayList<TaskMessage>>>
bundleMapByDestination = groupBundleByDestination(taskToNode);
+
+ for (String hostPort : bundleMapByDestination.keySet()) {
+ if (hostPort != null) {
--- End diff --
Can `hostPort` (the key) be `null` in the map returned by
`groupBundleByDestination`?
> Workers may try to send to closed connections
> ---------------------------------------------
>
> Key: STORM-737
> URL: https://issues.apache.org/jira/browse/STORM-737
> Project: Apache Storm
> Issue Type: Bug
> Affects Versions: 0.9.2-incubating
> Reporter: Derek Dagit
>
> There is a race condition in the worker code that can allow for a send() to
> be called on a closed connection.
> [Discussion|https://github.com/apache/storm/pull/349#issuecomment-87778672]
> The assignment mapping from task -> node+port needs to be read and used in
> the read lock when sending, so that an accurate mapping is used that does not
> include any connections that are closed.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)