This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d145febe96feb131996bc579d25f4dea0a93c4f0 Author: Weijie Guo <res...@163.com> AuthorDate: Thu Dec 1 01:54:06 2022 +0800 [hotfix] Migrate NettyShuffleEnvironmentTest to Junit5 and AssertJ. --- .../io/network/NettyShuffleEnvironmentTest.java | 87 ++++++++++------------ 1 file changed, 41 insertions(+), 46 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java index d1ec985bc24..18c34e0f569 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentTest.java @@ -52,13 +52,10 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.throughput.BufferDebloatConfiguration; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder; -import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; import java.io.IOException; import java.util.Arrays; @@ -70,27 +67,26 @@ import java.util.concurrent.Executors; import static org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils.createExecutionAttemptId; import static org.apache.flink.runtime.io.network.partition.InputChannelTestUtils.createDummyConnectionManager; import static org.apache.flink.runtime.io.network.partition.PartitionTestUtils.createPartition; -import static org.junit.Assert.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.spy; /** Various tests for the {@link NettyShuffleEnvironment} class. */ -public class NettyShuffleEnvironmentTest extends TestLogger { +class NettyShuffleEnvironmentTest { private static final String tempDir = EnvironmentInformation.getTemporaryFileDirectory(); private static FileChannelManager fileChannelManager; - @Rule public ExpectedException expectedException = ExpectedException.none(); - - @BeforeClass - public static void setUp() { + @BeforeAll + static void setUp() { fileChannelManager = new FileChannelManagerImpl(new String[] {tempDir}, "testing"); } - @AfterClass - public static void shutdown() throws Exception { + @AfterAll + static void shutdown() throws Exception { fileChannelManager.close(); } @@ -100,7 +96,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger { * working with the bare minimum of required buffers. */ @Test - public void testRegisterTaskWithLimitedBuffers() throws Exception { + void testRegisterTaskWithLimitedBuffers() throws Exception { // outgoing: 1 buffer per channel + 1 extra buffer per ResultPartition // incoming: 2 exclusive buffers per channel + 1 floating buffer per single gate final int bufferCount = @@ -114,7 +110,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger { * fails if the bare minimum of required buffers is not available (we are one buffer short). */ @Test - public void testRegisterTaskWithInsufficientBuffers() throws Exception { + void testRegisterTaskWithInsufficientBuffers() throws Exception { // outgoing: 1 buffer per channel + 1 extra buffer per ResultPartition // incoming: 2 exclusive buffers per channel + 1 floating buffer per single gate final int bufferCount = @@ -124,13 +120,13 @@ public class NettyShuffleEnvironmentTest extends TestLogger { .defaultValue() - 1; - expectedException.expect(IOException.class); - expectedException.expectMessage("Insufficient number of network buffers"); - testRegisterTaskWithLimitedBuffers(bufferCount); + assertThatThrownBy(() -> testRegisterTaskWithLimitedBuffers(bufferCount)) + .isInstanceOf(IOException.class) + .hasMessageContaining("Insufficient number of network buffers"); } @Test - public void testSlowIODoesNotBlockRelease() throws Exception { + void testSlowIODoesNotBlockRelease() throws Exception { BlockerSync sync = new BlockerSync(); ResultPartitionManager blockingResultPartitionManager = new ResultPartitionManager() { @@ -154,7 +150,7 @@ public class NettyShuffleEnvironmentTest extends TestLogger { @Test @SuppressWarnings("unchecked") - public void testRegisteringDebloatingMetrics() throws IOException { + void testRegisteringDebloatingMetrics() throws IOException { Map<String, Metric> metrics = new ConcurrentHashMap<>(); final TaskMetricGroup taskMetricGroup = createTaskMetricGroup(metrics); final Configuration config = new Configuration(); @@ -183,22 +179,20 @@ public class NettyShuffleEnvironmentTest extends TestLogger { new NettyShuffleDescriptorBuilder().buildRemote() }))); for (int i = 0; i < 2; i++) { - assertEquals( - TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().getBytes(), - (long) + assertThat( ((Gauge<Integer>) getDebloatingMetric( metrics, i, MetricNames.DEBLOATED_BUFFER_SIZE)) - .getValue()); - assertEquals( - 0L, - (long) + .getValue()) + .isEqualTo(TaskManagerOptions.MEMORY_SEGMENT_SIZE.defaultValue().getBytes()); + assertThat( ((Gauge<Long>) getDebloatingMetric( metrics, i, MetricNames.ESTIMATED_TIME_TO_CONSUME_BUFFERS)) - .getValue()); + .getValue()) + .isZero(); } } @@ -269,29 +263,30 @@ public class NettyShuffleEnvironmentTest extends TestLogger { Task.setupPartitionsAndGates(resultPartitions, inputGates); // verify buffer pools for the result partitions - assertEquals(Integer.MAX_VALUE, rp1.getBufferPool().getMaxNumberOfMemorySegments()); - assertEquals(Integer.MAX_VALUE, rp2.getBufferPool().getMaxNumberOfMemorySegments()); - assertEquals(expectedBuffers, rp3.getBufferPool().getMaxNumberOfMemorySegments()); - assertEquals(expectedRp4Buffers, rp4.getBufferPool().getMaxNumberOfMemorySegments()); + assertThat(rp1.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(Integer.MAX_VALUE); + assertThat(rp2.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(Integer.MAX_VALUE); + assertThat(rp3.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(expectedBuffers); + assertThat(rp4.getBufferPool().getMaxNumberOfMemorySegments()) + .isEqualTo(expectedRp4Buffers); for (ResultPartition rp : resultPartitions) { - assertEquals( - rp.getNumberOfSubpartitions() + 1, - rp.getBufferPool().getNumberOfRequiredMemorySegments()); - assertEquals(rp.getNumberOfSubpartitions() + 1, rp.getBufferPool().getNumBuffers()); + assertThat(rp.getBufferPool().getNumberOfRequiredMemorySegments()) + .isEqualTo(rp.getNumberOfSubpartitions() + 1); + assertThat(rp.getBufferPool().getNumBuffers()) + .isEqualTo(rp.getNumberOfSubpartitions() + 1); } // verify buffer pools for the input gates (NOTE: credit-based uses minimum required buffers // for exclusive buffers not managed by the buffer pool) - assertEquals(1, ig1.getBufferPool().getNumberOfRequiredMemorySegments()); - assertEquals(1, ig2.getBufferPool().getNumberOfRequiredMemorySegments()); - assertEquals(1, ig3.getBufferPool().getNumberOfRequiredMemorySegments()); - assertEquals(1, ig4.getBufferPool().getNumberOfRequiredMemorySegments()); - - assertEquals(floatingBuffers, ig1.getBufferPool().getMaxNumberOfMemorySegments()); - assertEquals(floatingBuffers, ig2.getBufferPool().getMaxNumberOfMemorySegments()); - assertEquals(floatingBuffers, ig3.getBufferPool().getMaxNumberOfMemorySegments()); - assertEquals(floatingBuffers, ig4.getBufferPool().getMaxNumberOfMemorySegments()); + assertThat(ig1.getBufferPool().getNumberOfRequiredMemorySegments()).isOne(); + assertThat(ig2.getBufferPool().getNumberOfRequiredMemorySegments()).isOne(); + assertThat(ig3.getBufferPool().getNumberOfRequiredMemorySegments()).isOne(); + assertThat(ig4.getBufferPool().getNumberOfRequiredMemorySegments()).isOne(); + + assertThat(ig1.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingBuffers); + assertThat(ig2.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingBuffers); + assertThat(ig3.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingBuffers); + assertThat(ig4.getBufferPool().getMaxNumberOfMemorySegments()).isEqualTo(floatingBuffers); verify(ig1, times(1)).setupChannels(); verify(ig2, times(1)).setupChannels();