This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new 60a93ad STORM-3422: Make the TupleCaptureBolt thread-safe new 727caf6 Merge pull request #3034 from JanecekPetr/3422-TupleCaptureBolt_is_not_thread-safe 60a93ad is described below commit 60a93add93bc4e1d63ad6d3356a81626ace00920 Author: Petr Janeček <petr.jane...@anritsu.com> AuthorDate: Mon Jul 8 19:06:32 2019 +0200 STORM-3422: Make the TupleCaptureBolt thread-safe --- .../org/apache/storm/testing/TupleCaptureBolt.java | 33 ++++++++++++++-------- 1 file changed, 22 insertions(+), 11 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java b/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java index 19bdf86..6d07372 100644 --- a/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java +++ b/storm-client/src/jvm/org/apache/storm/testing/TupleCaptureBolt.java @@ -17,22 +17,29 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import org.apache.storm.task.OutputCollector; import org.apache.storm.task.TopologyContext; import org.apache.storm.topology.IRichBolt; import org.apache.storm.topology.OutputFieldsDeclarer; import org.apache.storm.tuple.Tuple; - public class TupleCaptureBolt implements IRichBolt { - public static final transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new HashMap<>(); - private String name; + /* + * Even though normally bolts do not need to care about thread safety, this particular bolt is different. + * It maintains a static field that is prepopulated before the topology starts, is written into by the topology, + * and is then read from after the topology is completed - all of this by potentially different threads. + */ + + private static final transient Map<String, Map<String, List<FixedTuple>>> emitted_tuples = new ConcurrentHashMap<>(); + + private final String name; private OutputCollector collector; public TupleCaptureBolt() { name = UUID.randomUUID().toString(); - emitted_tuples.put(name, new HashMap<String, List<FixedTuple>>()); + emitted_tuples.put(name, new ConcurrentHashMap<String, List<FixedTuple>>()); } @Override @@ -43,11 +50,14 @@ public class TupleCaptureBolt implements IRichBolt { @Override public void execute(Tuple input) { String component = input.getSourceComponent(); - Map<String, List<FixedTuple>> captured = emitted_tuples.get(name); - if (!captured.containsKey(component)) { - captured.put(component, new ArrayList<FixedTuple>()); - } - captured.get(component).add(new FixedTuple(input.getSourceStreamId(), input.getValues())); + emitted_tuples.get(name) + .compute(component, (String key, List<FixedTuple> tuples) -> { + if (tuples == null) { + tuples = new ArrayList<>(); + } + tuples.add(new FixedTuple(input.getSourceStreamId(), input.getValues())); + return tuples; + }); collector.ack(input); } @@ -64,8 +74,9 @@ public class TupleCaptureBolt implements IRichBolt { } public Map<String, List<FixedTuple>> getAndClearResults() { - Map<String, List<FixedTuple>> ret = new HashMap<>(emitted_tuples.get(name)); - emitted_tuples.get(name).clear(); + Map<String, List<FixedTuple>> results = emitted_tuples.get(name); + Map<String, List<FixedTuple>> ret = new HashMap<>(results); + results.clear(); return ret; }