Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158034510
--- Diff: storm-client/src/jvm/org/apache/storm/daemon/Task.java ---
@@ -122,28 +130,33 @@ public Task(Executor executor, Integer taskId) throws
IOException {
return new ArrayList<>(0);
}
+
public List<Integer> getOutgoingTasks(String stream, List<Object>
values) {
if (debug) {
LOG.info("Emitting Tuple: taskId={} componentId={} stream={}
values={}", taskId, componentId, stream, values);
}
- List<Integer> outTasks = new ArrayList<>();
- if (!streamComponentToGrouper.containsKey(stream)) {
- throw new IllegalArgumentException("Unknown stream ID: " +
stream);
- }
- if (null != streamComponentToGrouper.get(stream)) {
- // null value for __system
- for (LoadAwareCustomStreamGrouping grouper :
streamComponentToGrouper.get(stream).values()) {
+ ArrayList<Integer> outTasks = new ArrayList<>();
+
+ // TODO: PERF: expensive hashtable lookup in critical path
+ ArrayList<LoadAwareCustomStreamGrouping> groupers =
streamToGroupers.get(stream);
+ if (null != groupers) {
+ for (int i=0; i<groupers.size(); ++i) {
+ LoadAwareCustomStreamGrouping grouper = groupers.get(i);
if (grouper == GrouperFactory.DIRECT) {
throw new IllegalArgumentException("Cannot do regular
emit to direct stream");
}
List<Integer> compTasks = grouper.chooseTasks(taskId,
values, loadMapping);
- outTasks.addAll(compTasks);
+ outTasks.addAll(compTasks); // TODO: PERF: this is a
perf hit
--- End diff --
Same here: may be better to decide.
IMHO I'm still not convinced that it can introduce performance hit.
Something I can imagine are allocating backed array (only once in method call)
and expanding array, but unless we use fan-out in huge size topology, outTasks
is expected to be small.
---