Github user srdo commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2750#discussion_r200445007
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java ---
    @@ -22,56 +22,53 @@
     import java.util.List;
     import java.util.Map;
     import java.util.Map.Entry;
    -import java.util.concurrent.ConcurrentHashMap;
    -import org.apache.storm.Constants;
     import org.apache.storm.messaging.netty.BackPressureStatus;
     import org.apache.storm.utils.JCQueue;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import static org.apache.storm.Constants.SYSTEM_TASK_ID;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import java.util.stream.Collectors;
    +import 
org.apache.storm.shade.org.apache.commons.lang.builder.ToStringBuilder;
    +import 
org.apache.storm.shade.org.apache.commons.lang.builder.ToStringStyle;
     
     /***
    - *   Tracks the BackPressure status using a Map<TaskId, JCQueue>.
    - *   Special value NONE, is used to indicate that the task is not under 
BackPressure
    - *   ConcurrentHashMap does not allow storing null values, so we use the 
special value NONE instead.
    + *   Tracks the BackPressure status.
      */
     public class BackPressureTracker {
         static final Logger LOG = 
LoggerFactory.getLogger(BackPressureTracker.class);
    -    private static final JCQueue NONE = new JCQueue("NoneQ", 2, 0, 1, null,
    -                                                    "none", 
Constants.SYSTEM_COMPONENT_ID, -1, 0) {
    -    };
    -    private final Map<Integer, JCQueue> tasks = new ConcurrentHashMap<>(); 
// updates are more frequent than iteration
    +    private final Map<Integer, BackpressureState> tasks;
         private final String workerId;
     
    -    public BackPressureTracker(String workerId, List<Integer> 
allLocalTasks) {
    +    public BackPressureTracker(String workerId, Map<Integer, JCQueue> 
localTasksToQueues) {
             this.workerId = workerId;
    -        for (Integer taskId : allLocalTasks) {
    -            if (taskId != SYSTEM_TASK_ID) {
    -                tasks.put(taskId, NONE);  // all tasks are considered to 
be not under BP initially
    -            }
    -        }
    +        this.tasks = localTasksToQueues.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                entry -> entry.getKey(),
    +                entry -> new BackpressureState(entry.getValue())));
         }
     
         private void recordNoBackPressure(Integer taskId) {
    -        tasks.put(taskId, NONE);
    +        tasks.get(taskId).backpressure.set(false);
         }
     
         /***
          * Record BP for a task.
          * This is called by transferLocalBatch() on NettyWorker thread
          * @return true if an update was recorded, false if taskId is already 
under BP
          */
    -    public boolean recordBackPressure(Integer taskId, JCQueue recvQ) {
    -        return tasks.put(taskId, recvQ) == NONE;
    +    public boolean recordBackPressure(Integer taskId) {
    +        return tasks.get(taskId).backpressure.getAndSet(true);
    --- End diff --
    
    Nice catch. Fixed and added some tests.


---

Reply via email to