Github user srdo commented on a diff in the pull request:
https://github.com/apache/storm/pull/2750#discussion_r200445007
--- Diff:
storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java ---
@@ -22,56 +22,53 @@
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.concurrent.ConcurrentHashMap;
-import org.apache.storm.Constants;
import org.apache.storm.messaging.netty.BackPressureStatus;
import org.apache.storm.utils.JCQueue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import static org.apache.storm.Constants.SYSTEM_TASK_ID;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import java.util.stream.Collectors;
+import
org.apache.storm.shade.org.apache.commons.lang.builder.ToStringBuilder;
+import
org.apache.storm.shade.org.apache.commons.lang.builder.ToStringStyle;
/***
- * Tracks the BackPressure status using a Map<TaskId, JCQueue>.
- * Special value NONE, is used to indicate that the task is not under
BackPressure
- * ConcurrentHashMap does not allow storing null values, so we use the
special value NONE instead.
+ * Tracks the BackPressure status.
*/
public class BackPressureTracker {
static final Logger LOG =
LoggerFactory.getLogger(BackPressureTracker.class);
- private static final JCQueue NONE = new JCQueue("NoneQ", 2, 0, 1, null,
- "none",
Constants.SYSTEM_COMPONENT_ID, -1, 0) {
- };
- private final Map<Integer, JCQueue> tasks = new ConcurrentHashMap<>();
// updates are more frequent than iteration
+ private final Map<Integer, BackpressureState> tasks;
private final String workerId;
- public BackPressureTracker(String workerId, List<Integer>
allLocalTasks) {
+ public BackPressureTracker(String workerId, Map<Integer, JCQueue>
localTasksToQueues) {
this.workerId = workerId;
- for (Integer taskId : allLocalTasks) {
- if (taskId != SYSTEM_TASK_ID) {
- tasks.put(taskId, NONE); // all tasks are considered to
be not under BP initially
- }
- }
+ this.tasks = localTasksToQueues.entrySet().stream()
+ .collect(Collectors.toMap(
+ entry -> entry.getKey(),
+ entry -> new BackpressureState(entry.getValue())));
}
private void recordNoBackPressure(Integer taskId) {
- tasks.put(taskId, NONE);
+ tasks.get(taskId).backpressure.set(false);
}
/***
* Record BP for a task.
* This is called by transferLocalBatch() on NettyWorker thread
* @return true if an update was recorded, false if taskId is already
under BP
*/
- public boolean recordBackPressure(Integer taskId, JCQueue recvQ) {
- return tasks.put(taskId, recvQ) == NONE;
+ public boolean recordBackPressure(Integer taskId) {
+ return tasks.get(taskId).backpressure.getAndSet(true);
--- End diff --
Nice catch. Fixed and added some tests.
---