Github user revans2 commented on a diff in the pull request:

    https://github.com/apache/storm/pull/2750#discussion_r200393309
  
    --- Diff: 
storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java ---
    @@ -22,56 +22,53 @@
     import java.util.List;
     import java.util.Map;
     import java.util.Map.Entry;
    -import java.util.concurrent.ConcurrentHashMap;
    -import org.apache.storm.Constants;
     import org.apache.storm.messaging.netty.BackPressureStatus;
     import org.apache.storm.utils.JCQueue;
     import org.slf4j.Logger;
     import org.slf4j.LoggerFactory;
     
    -import static org.apache.storm.Constants.SYSTEM_TASK_ID;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import java.util.stream.Collectors;
    +import 
org.apache.storm.shade.org.apache.commons.lang.builder.ToStringBuilder;
    +import 
org.apache.storm.shade.org.apache.commons.lang.builder.ToStringStyle;
     
     /***
    - *   Tracks the BackPressure status using a Map<TaskId, JCQueue>.
    - *   Special value NONE, is used to indicate that the task is not under 
BackPressure
    - *   ConcurrentHashMap does not allow storing null values, so we use the 
special value NONE instead.
    + *   Tracks the BackPressure status.
      */
     public class BackPressureTracker {
         static final Logger LOG = 
LoggerFactory.getLogger(BackPressureTracker.class);
    -    private static final JCQueue NONE = new JCQueue("NoneQ", 2, 0, 1, null,
    -                                                    "none", 
Constants.SYSTEM_COMPONENT_ID, -1, 0) {
    -    };
    -    private final Map<Integer, JCQueue> tasks = new ConcurrentHashMap<>(); 
// updates are more frequent than iteration
    +    private final Map<Integer, BackpressureState> tasks;
         private final String workerId;
     
    -    public BackPressureTracker(String workerId, List<Integer> 
allLocalTasks) {
    +    public BackPressureTracker(String workerId, Map<Integer, JCQueue> 
localTasksToQueues) {
             this.workerId = workerId;
    -        for (Integer taskId : allLocalTasks) {
    -            if (taskId != SYSTEM_TASK_ID) {
    -                tasks.put(taskId, NONE);  // all tasks are considered to 
be not under BP initially
    -            }
    -        }
    +        this.tasks = localTasksToQueues.entrySet().stream()
    +            .collect(Collectors.toMap(
    +                entry -> entry.getKey(),
    +                entry -> new BackpressureState(entry.getValue())));
         }
     
         private void recordNoBackPressure(Integer taskId) {
    -        tasks.put(taskId, NONE);
    +        tasks.get(taskId).backpressure.set(false);
         }
     
         /***
          * Record BP for a task.
          * This is called by transferLocalBatch() on NettyWorker thread
          * @return true if an update was recorded, false if taskId is already 
under BP
          */
    -    public boolean recordBackPressure(Integer taskId, JCQueue recvQ) {
    -        return tasks.put(taskId, recvQ) == NONE;
    +    public boolean recordBackPressure(Integer taskId) {
    +        return tasks.get(taskId).backpressure.getAndSet(true);
    --- End diff --
    
    The return value of this method has switched.  getAndSet is going to return 
the previous value, which is true if it already was under backpressure and 
false if it was not.  This is the opposite of
    
    ```
    * @return true if an update was recorded, false if taskId is already under 
BP
    ```
    
    This is used to control when the server sends messages to the client about 
backpressure, and could make it happen not frequently enough and the beginning 
and then way too frequently after that.


---

Reply via email to