Github user HeartSaVioR commented on a diff in the pull request:
https://github.com/apache/storm/pull/2241#discussion_r158196161
--- Diff:
storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java ---
@@ -439,29 +436,40 @@ public void refreshStormActive(Runnable callback) {
}
}
- public void refreshThrottle() {
- boolean backpressure =
stormClusterState.topologyBackpressure(topologyId, this::refreshThrottle);
- this.throttleOn.set(backpressure);
- }
-
public void refreshLoad() {
- Set<Integer> remoteTasks = Sets.difference(new
HashSet<Integer>(outboundTasks), new HashSet<>(taskIds));
+ Set<Integer> remoteTasks = Sets.difference(new
HashSet<Integer>(outboundTasks), new HashSet<>(localTaskIds));
Long now = System.currentTimeMillis();
Map<Integer, Double> localLoad =
shortExecutorReceiveQueueMap.entrySet().stream().collect(Collectors.toMap(
- (Function<Map.Entry<Integer, DisruptorQueue>, Integer>)
Map.Entry::getKey,
- (Function<Map.Entry<Integer, DisruptorQueue>, Double>) entry
-> {
- DisruptorQueue.QueueMetrics qMetrics =
entry.getValue().getMetrics();
- return ( (double) qMetrics.population()) /
qMetrics.capacity();
+ (Function<Map.Entry<Integer, JCQueue>, Integer>)
Map.Entry::getKey,
+ (Function<Map.Entry<Integer, JCQueue>, Double>) entry -> {
+ JCQueue.QueueMetrics qMetrics =
entry.getValue().getMetrics();
+ return ((double) qMetrics.population()) /
qMetrics.capacity();
}));
Map<Integer, Load> remoteLoad = new HashMap<>();
cachedNodeToPortSocket.get().values().stream().forEach(conn ->
remoteLoad.putAll(conn.getLoad(remoteTasks)));
loadMapping.setLocal(localLoad);
loadMapping.setRemote(remoteLoad);
- if (now > nextUpdate.get()) {
+ if (now > nextLoadUpdate.get()) {
receiver.sendLoadMetrics(localLoad);
- nextUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
+ nextLoadUpdate.set(now + LOAD_REFRESH_INTERVAL_MS);
+ }
+ }
+
+ // checks if the tasks which had back pressure are now free again. if
so, sends an update to other workers
+ public void refreshBackPressureStatus() {
+ LOG.debug("Checking for change in Backpressure status on worker's
tasks");
+ boolean bpSituationChanged = bpTracker.refreshBpTaskList();
+ if (bpSituationChanged) {
+ BackPressureStatus bpStatus = bpTracker.getCurrStatus();
+ receiver.sendBackPressureStatus(bpStatus);
+ }
+ for (Entry<Integer, JCQueue> entry :
localReceiveQueues.entrySet()) {
--- End diff --
Looks like this effectively does nothing. Is there any missing here? Or can
this statement (for) be removed? If you have reason to do so, please leave a
comment describing the reason.
---