This is an automated email from the ASF dual-hosted git repository. srdo pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/storm.git
The following commit(s) were added to refs/heads/master by this push: new c78f1aa STORM-3510: Track overflow count per taskId for resending backpressur… (#3131) c78f1aa is described below commit c78f1aaa8fcd17eb5c8f45f5dbb90e30f99e19bb Author: cjljohnson <cjljohn...@gmail.com> AuthorDate: Tue Oct 1 15:38:28 2019 +0100 STORM-3510: Track overflow count per taskId for resending backpressur… (#3131) * STORM-3510: Track overflow count per taskId for resending backpressure status --- .../storm/daemon/worker/BackPressureTracker.java | 29 ++++++++++++++---- .../apache/storm/daemon/worker/WorkerState.java | 14 +++++---- .../daemon/worker/BackPressureTrackerTest.java | 35 ++++++++++++++++++---- 3 files changed, 60 insertions(+), 18 deletions(-) diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java index dae5cca..3c590e5 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/BackPressureTracker.java @@ -49,8 +49,12 @@ public class BackPressureTracker { entry -> new BackpressureState(entry.getValue()))); } - private void recordNoBackPressure(Integer taskId) { - tasks.get(taskId).backpressure.set(false); + public BackpressureState getBackpressureState(Integer taskId) { + return tasks.get(taskId); + } + + private void recordNoBackPressure(BackpressureState state) { + state.backpressure.set(false); } /** @@ -60,8 +64,8 @@ public class BackPressureTracker { * * @return true if an update was recorded, false if taskId is already under BP */ - public boolean recordBackPressure(Integer taskId) { - return tasks.get(taskId).backpressure.getAndSet(true) == false; + public boolean recordBackPressure(BackpressureState state) { + return state.backpressure.getAndSet(true) == false; } // returns true if there was a change in the BP situation @@ -71,7 +75,7 @@ public class BackPressureTracker { for (Entry<Integer, BackpressureState> entry : tasks.entrySet()) { BackpressureState state = entry.getValue(); if (state.backpressure.get() && state.queue.isEmptyOverflow()) { - recordNoBackPressure(entry.getKey()); + recordNoBackPressure(state); changed = true; } } @@ -95,11 +99,24 @@ public class BackPressureTracker { } return new BackPressureStatus(workerId, bpTasks, nonBpTasks); } + + public int getLastOverflowCount(BackpressureState state) { + return state.lastOverflowCount; + } + + public void setLastOverflowCount(BackpressureState state, int value) { + state.lastOverflowCount = value; + } + + - private static class BackpressureState { + public static class BackpressureState { private final JCQueue queue; //No task is under backpressure initially private final AtomicBoolean backpressure = new AtomicBoolean(false); + //The overflow count last time BP status was sent + private int lastOverflowCount = 0; + BackpressureState(JCQueue queue) { this.queue = queue; diff --git a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java index f380769..eaab4e9 100644 --- a/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java +++ b/storm-client/src/jvm/org/apache/storm/daemon/worker/WorkerState.java @@ -42,6 +42,7 @@ import org.apache.storm.cluster.IStormClusterState; import org.apache.storm.cluster.VersionedData; import org.apache.storm.daemon.StormCommon; import org.apache.storm.daemon.supervisor.AdvancedFSOps; +import org.apache.storm.daemon.worker.BackPressureTracker.BackpressureState; import org.apache.storm.executor.IRunningExecutor; import org.apache.storm.generated.Assignment; import org.apache.storm.generated.DebugOptions; @@ -88,6 +89,7 @@ public class WorkerState { private static final Logger LOG = LoggerFactory.getLogger(WorkerState.class); private static final long LOAD_REFRESH_INTERVAL_MS = 5000L; + private static final int RESEND_BACKPRESSURE_SIZE = 10000; private static long dropCount = 0; final Map<String, Object> conf; final IContext mqContext; @@ -533,8 +535,6 @@ public class WorkerState { // Receives msgs from remote workers and feeds them to local executors. If any receiving local executor is under Back Pressure, // informs other workers about back pressure situation. Runs in the NettyWorker thread. private void transferLocalBatch(ArrayList<AddressedTuple> tupleBatch) { - int lastOverflowCount = 0; // overflowQ size at the time the last BPStatus was sent - for (int i = 0; i < tupleBatch.size(); i++) { AddressedTuple tuple = tupleBatch.get(i); JCQueue queue = taskToExecutorQueue.get(tuple.dest); @@ -548,16 +548,18 @@ public class WorkerState { // 2- BP detected (i.e MainQ is full). So try adding to overflow int currOverflowCount = queue.getOverflowCount(); - if (bpTracker.recordBackPressure(tuple.dest)) { + // get BP state object so only have to lookup once + BackpressureState bpState = bpTracker.getBackpressureState(tuple.dest); + if (bpTracker.recordBackPressure(bpState)) { receiver.sendBackPressureStatus(bpTracker.getCurrStatus()); - lastOverflowCount = currOverflowCount; + bpTracker.setLastOverflowCount(bpState, currOverflowCount); } else { - if (currOverflowCount - lastOverflowCount > 10000) { + if (currOverflowCount - bpTracker.getLastOverflowCount(bpState) > RESEND_BACKPRESSURE_SIZE) { // resend BP status, in case prev notification was missed or reordered BackPressureStatus bpStatus = bpTracker.getCurrStatus(); receiver.sendBackPressureStatus(bpStatus); - lastOverflowCount = currOverflowCount; + bpTracker.setLastOverflowCount(bpState, currOverflowCount); LOG.debug("Re-sent BackPressure Status. OverflowCount = {}, BP Status ID = {}. ", currOverflowCount, bpStatus.id); } } diff --git a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java index 7e891b5..f642c54 100644 --- a/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java +++ b/storm-client/test/jvm/org/apache/storm/daemon/worker/BackPressureTrackerTest.java @@ -24,6 +24,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import java.util.Collections; + +import org.apache.storm.daemon.worker.BackPressureTracker.BackpressureState; import org.apache.storm.messaging.netty.BackPressureStatus; import org.apache.storm.shade.org.apache.curator.shaded.com.google.common.collect.ImmutableMap; import org.apache.storm.utils.JCQueue; @@ -38,7 +40,7 @@ public class BackPressureTrackerTest { int taskIdNoBackPressure = 1; JCQueue noBackPressureQueue = mock(JCQueue.class); BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, - Collections.singletonMap(taskIdNoBackPressure, noBackPressureQueue)); + Collections.singletonMap(taskIdNoBackPressure, noBackPressureQueue)); BackPressureStatus status = tracker.getCurrStatus(); @@ -57,7 +59,8 @@ public class BackPressureTrackerTest { taskIdNoBackPressure, noBackPressureQueue, taskIdBackPressure, backPressureQueue)); - boolean backpressureChanged = tracker.recordBackPressure(taskIdBackPressure); + BackpressureState state = tracker.getBackpressureState(taskIdBackPressure); + boolean backpressureChanged = tracker.recordBackPressure(state); BackPressureStatus status = tracker.getCurrStatus(); assertThat(backpressureChanged, is(true)); @@ -72,9 +75,10 @@ public class BackPressureTrackerTest { JCQueue queue = mock(JCQueue.class); BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of( taskId, queue)); - tracker.recordBackPressure(taskId); + BackpressureState state = tracker.getBackpressureState(taskId); + tracker.recordBackPressure(state); - boolean backpressureChanged = tracker.recordBackPressure(taskId); + boolean backpressureChanged = tracker.recordBackPressure(state); BackPressureStatus status = tracker.getCurrStatus(); assertThat(backpressureChanged, is(false)); @@ -89,7 +93,8 @@ public class BackPressureTrackerTest { when(queue.isEmptyOverflow()).thenReturn(true); BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of( taskId, queue)); - tracker.recordBackPressure(taskId); + BackpressureState state = tracker.getBackpressureState(taskId); + tracker.recordBackPressure(state); boolean backpressureChanged = tracker.refreshBpTaskList(); BackPressureStatus status = tracker.getCurrStatus(); @@ -106,7 +111,8 @@ public class BackPressureTrackerTest { when(queue.isEmptyOverflow()).thenReturn(false); BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of( taskId, queue)); - tracker.recordBackPressure(taskId); + BackpressureState state = tracker.getBackpressureState(taskId); + tracker.recordBackPressure(state); boolean backpressureChanged = tracker.refreshBpTaskList(); BackPressureStatus status = tracker.getCurrStatus(); @@ -116,4 +122,21 @@ public class BackPressureTrackerTest { assertThat(status.bpTasks, contains(taskId)); } + @Test + public void testSetLastOverflowCount() { + int taskId = 1; + int overflow = 5; + JCQueue queue = mock(JCQueue.class); + BackPressureTracker tracker = new BackPressureTracker(WORKER_ID, ImmutableMap.of( + taskId, queue)); + BackpressureState state = tracker.getBackpressureState(taskId); + tracker.recordBackPressure(state); + tracker.setLastOverflowCount(state, overflow); + + BackpressureState retrievedState = tracker.getBackpressureState(taskId); + int lastOverflowCount = tracker.getLastOverflowCount(retrievedState); + + assertThat(lastOverflowCount, is(overflow)); + } + }