Github user roshannaik commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158923315
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -122,28 +130,33 @@ public Task(Executor executor, Integer taskId) throws
IOException {
return new ArrayList<>(0);
}
+
public List<Integer> getOutgoingTasks(String stream, List<Object>
values) {
if (debug) {
LOG.info("Emitting Tuple: taskId={} componentId={} stream={}
values={}", taskId, componentId, stream, values);
}
- List<Integer> outTasks = new ArrayList<>();
- if (!streamComponentToGrouper.containsKey(stream)) {
- throw new IllegalArgumentException("Unknown stream ID: " +
stream);
- }
- if (null != streamComponentToGrouper.get(stream)) {
- // null value for __system
- for (LoadAwareCustomStreamGrouping grouper :
streamComponentToGrouper.get(stream).values()) {
+ ArrayList<Integer> outTasks = new ArrayList<>();
+
+ // TODO: PERF: expensive hashtable lookup in critical path
+ ArrayList<LoadAwareCustomStreamGrouping> groupers =
streamToGroupers.get(stream);
+ if (null != groupers) {
+ for (int i=0; i<groupers.size(); ++i) {
+ LoadAwareCustomStreamGrouping grouper = groupers.get(i);
if (grouper == GrouperFactory.DIRECT) {
throw new IllegalArgumentException("Cannot do regular
emit to direct stream");
}
List<Integer> compTasks = grouper.chooseTasks(taskId,
values, loadMapping);
- outTasks.addAll(compTasks);
+ outTasks.addAll(compTasks); // TODO: PERF: this is a
perf hit
--- End diff --
I understand your position, as I was surprised too. I have filed an
[issue](https://issues.apache.org/jira/browse/STORM-2871) and explained in it
how I originally discovered it. I didnt pursue it too much, so I dont know why
it is an issue (or if it still is), but felt it was good idea to record that
observation for later.
---