zhijiangW commented on a change in pull request #10083: [FLINK-14472][runtime]Implement back-pressure monitor with non-blocking outputs. URL: https://github.com/apache/flink/pull/10083#discussion_r344581857
########## File path: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/backpressure/BackPressureStatsTrackerImplTest.java ########## @@ -18,166 +18,218 @@ package org.apache.flink.runtime.rest.handler.legacy.backpressure; -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.TestLogger; -import org.junit.Assert; import org.junit.Test; -import org.mockito.Matchers; -import org.mockito.Mockito; -import java.util.ArrayList; import java.util.HashMap; -import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + /** - * Tests for the BackPressureStatsTrackerImpl. + * Tests for the {@link BackPressureStatsTrackerImpl}. */ public class BackPressureStatsTrackerImplTest extends TestLogger { - /** Tests simple statistics with fake stack traces. */ @Test - @SuppressWarnings("unchecked") - public void testTriggerStackTraceSample() throws Exception { - CompletableFuture<StackTraceSample> sampleFuture = new CompletableFuture<>(); - - StackTraceSampleCoordinator sampleCoordinator = Mockito.mock(StackTraceSampleCoordinator.class); - Mockito.when(sampleCoordinator.triggerStackTraceSample( - Matchers.any(ExecutionVertex[].class), - Matchers.anyInt(), - Matchers.any(Time.class), - Matchers.anyInt())).thenReturn(sampleFuture); - - ExecutionGraph graph = Mockito.mock(ExecutionGraph.class); - Mockito.when(graph.getState()).thenReturn(JobStatus.RUNNING); - - // Same Thread execution context - Mockito.when(graph.getFutureExecutor()).thenReturn(new Executor() { - - @Override - public void execute(Runnable runnable) { - runnable.run(); - } - }); - - ExecutionVertex[] taskVertices = new ExecutionVertex[4]; - - ExecutionJobVertex jobVertex = Mockito.mock(ExecutionJobVertex.class); - Mockito.when(jobVertex.getJobId()).thenReturn(new JobID()); - Mockito.when(jobVertex.getJobVertexId()).thenReturn(new JobVertexID()); - Mockito.when(jobVertex.getGraph()).thenReturn(graph); - Mockito.when(jobVertex.getTaskVertices()).thenReturn(taskVertices); - - taskVertices[0] = mockExecutionVertex(jobVertex, 0); - taskVertices[1] = mockExecutionVertex(jobVertex, 1); - taskVertices[2] = mockExecutionVertex(jobVertex, 2); - taskVertices[3] = mockExecutionVertex(jobVertex, 3); - - int numSamples = 100; - Time delayBetweenSamples = Time.milliseconds(100L); - - BackPressureStatsTrackerImpl tracker = new BackPressureStatsTrackerImpl( - sampleCoordinator, 9999, numSamples, Integer.MAX_VALUE, delayBetweenSamples); - - // getOperatorBackPressureStats triggers stack trace sampling - Assert.assertFalse(tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - // Request back pressure stats again. This should not trigger another sample request - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - Mockito.verify(sampleCoordinator, Mockito.times(1)).triggerStackTraceSample( - Matchers.eq(taskVertices), - Matchers.eq(numSamples), - Matchers.eq(delayBetweenSamples), - Matchers.eq(BackPressureStatsTrackerImpl.MAX_STACK_TRACE_DEPTH)); - - Assert.assertTrue(!tracker.getOperatorBackPressureStats(jobVertex).isPresent()); - - // Complete the future - Map<ExecutionAttemptID, List<StackTraceElement[]>> traces = new HashMap<>(); - for (ExecutionVertex vertex : taskVertices) { - List<StackTraceElement[]> taskTraces = new ArrayList<>(); + public void testGetOperatorBackPressureStats() throws Exception { + final ExecutionJobVertex executionJobVertex = BackPressureTrackerTestUtils.createExecutionJobVertex(); + final ExecutionVertex[] taskVertices = executionJobVertex.getTaskVertices(); - for (int i = 0; i < taskVertices.length; i++) { - // Traces until sub task index are back pressured - taskTraces.add(createStackTrace(i <= vertex.getParallelSubtaskIndex())); - } + final int requestId = 0; + final long startTime = System.currentTimeMillis(); + final long endTime = startTime + 1; + final double backPressureRatio = 0.1; - traces.put(vertex.getCurrentExecutionAttempt().getAttemptId(), taskTraces); - } + final BackPressureStats backPressureStats = createBackPressureStats( + taskVertices, requestId, startTime, endTime, backPressureRatio); + final BackPressureStatsTracker tracker = createBackPressureTracker(600000, 10000, backPressureStats); - int sampleId = 1231; - int endTime = 841; + // trigger back pressure stats request + tracker.getOperatorBackPressureStats(executionJobVertex); Review comment: It seems strange to not verify the results of this `getOperatorBackPressureStats`. Actually this check should be done here instead of in `testOperatorBackPressureStatsUpdate` did. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services