This is an automated email from the ASF dual-hosted git repository. jiabaosun pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d1954b580020f62e5fdaff6830bccc3e569ce78d Author: Jiabao Sun <jiabao...@apache.org> AuthorDate: Mon Mar 4 14:04:02 2024 +0800 [FLINK-25544][streaming][JUnit5 Migration] The experimental package of module flink-stream-java --- .../experimental/SocketStreamIteratorTest.java | 69 +++++++++------------- 1 file changed, 29 insertions(+), 40 deletions(-) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java index aa335de358c..63a03218517 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java @@ -21,16 +21,15 @@ package org.apache.flink.streaming.experimental; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; +import org.apache.flink.core.testutils.CheckedThread; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.net.Socket; import java.util.Random; -import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; /** * Tests for the SocketStreamIterator. @@ -38,12 +37,10 @@ import static org.junit.Assert.assertTrue; * <p>This experimental class is relocated from flink-streaming-contrib. Please see * package-info.java for more information. */ -public class SocketStreamIteratorTest { +class SocketStreamIteratorTest { @Test - public void testIterator() throws Exception { - - final AtomicReference<Throwable> error = new AtomicReference<>(); + void testIterator() throws Exception { final long seed = new Random().nextLong(); final int numElements = 1000; @@ -51,28 +48,22 @@ public class SocketStreamIteratorTest { final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE); - Thread writer = - new Thread() { + CheckedThread writer = + new CheckedThread() { @Override - public void run() { - try { - try (Socket sock = - new Socket( - iterator.getBindAddress(), iterator.getPort()); - DataOutputViewStreamWrapper out = - new DataOutputViewStreamWrapper( - sock.getOutputStream())) { - - final TypeSerializer<Long> serializer = LongSerializer.INSTANCE; - final Random rnd = new Random(seed); - - for (int i = 0; i < numElements; i++) { - serializer.serialize(rnd.nextLong(), out); - } + public void go() throws Exception { + try (Socket sock = + new Socket(iterator.getBindAddress(), iterator.getPort()); + DataOutputViewStreamWrapper out = + new DataOutputViewStreamWrapper(sock.getOutputStream())) { + + final TypeSerializer<Long> serializer = LongSerializer.INSTANCE; + final Random rnd = new Random(seed); + + for (int i = 0; i < numElements; i++) { + serializer.serialize(rnd.nextLong(), out); } - } catch (Throwable t) { - error.set(t); } } }; @@ -81,18 +72,18 @@ public class SocketStreamIteratorTest { final Random validator = new Random(seed); for (int i = 0; i < numElements; i++) { - assertTrue(iterator.hasNext()); - assertTrue(iterator.hasNext()); - assertEquals(validator.nextLong(), iterator.next().longValue()); + assertThat(iterator).hasNext(); + assertThat(iterator).hasNext(); + assertThat(iterator.next()).isEqualTo(validator.nextLong()); } - assertFalse(iterator.hasNext()); - writer.join(); - assertFalse(iterator.hasNext()); + assertThat(iterator).isExhausted(); + writer.sync(); + assertThat(iterator).isExhausted(); } @Test - public void testIteratorWithException() throws Exception { + void testIteratorWithException() throws Exception { final SocketStreamIterator<Long> iterator = new SocketStreamIterator<>(LongSerializer.INSTANCE); @@ -109,10 +100,8 @@ public class SocketStreamIteratorTest { } }.start(); - try { - iterator.hasNext(); - } catch (Exception e) { - assertTrue(e.getCause().getMessage().contains("test")); - } + assertThatThrownBy(iterator::hasNext) + .isInstanceOf(RuntimeException.class) + .hasMessageContaining("test"); } }