This is an automated email from the ASF dual-hosted git repository.

jiabaosun pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d1954b580020f62e5fdaff6830bccc3e569ce78d
Author: Jiabao Sun <jiabao...@apache.org>
AuthorDate: Mon Mar 4 14:04:02 2024 +0800

    [FLINK-25544][streaming][JUnit5 Migration] The experimental package of 
module flink-stream-java
---
 .../experimental/SocketStreamIteratorTest.java     | 69 +++++++++-------------
 1 file changed, 29 insertions(+), 40 deletions(-)

diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java
index aa335de358c..63a03218517 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/experimental/SocketStreamIteratorTest.java
@@ -21,16 +21,15 @@ package org.apache.flink.streaming.experimental;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
+import org.apache.flink.core.testutils.CheckedThread;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.net.Socket;
 import java.util.Random;
-import java.util.concurrent.atomic.AtomicReference;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /**
  * Tests for the SocketStreamIterator.
@@ -38,12 +37,10 @@ import static org.junit.Assert.assertTrue;
  * <p>This experimental class is relocated from flink-streaming-contrib. 
Please see
  * package-info.java for more information.
  */
-public class SocketStreamIteratorTest {
+class SocketStreamIteratorTest {
 
     @Test
-    public void testIterator() throws Exception {
-
-        final AtomicReference<Throwable> error = new AtomicReference<>();
+    void testIterator() throws Exception {
 
         final long seed = new Random().nextLong();
         final int numElements = 1000;
@@ -51,28 +48,22 @@ public class SocketStreamIteratorTest {
         final SocketStreamIterator<Long> iterator =
                 new SocketStreamIterator<>(LongSerializer.INSTANCE);
 
-        Thread writer =
-                new Thread() {
+        CheckedThread writer =
+                new CheckedThread() {
 
                     @Override
-                    public void run() {
-                        try {
-                            try (Socket sock =
-                                            new Socket(
-                                                    iterator.getBindAddress(), 
iterator.getPort());
-                                    DataOutputViewStreamWrapper out =
-                                            new DataOutputViewStreamWrapper(
-                                                    sock.getOutputStream())) {
-
-                                final TypeSerializer<Long> serializer = 
LongSerializer.INSTANCE;
-                                final Random rnd = new Random(seed);
-
-                                for (int i = 0; i < numElements; i++) {
-                                    serializer.serialize(rnd.nextLong(), out);
-                                }
+                    public void go() throws Exception {
+                        try (Socket sock =
+                                        new Socket(iterator.getBindAddress(), 
iterator.getPort());
+                                DataOutputViewStreamWrapper out =
+                                        new 
DataOutputViewStreamWrapper(sock.getOutputStream())) {
+
+                            final TypeSerializer<Long> serializer = 
LongSerializer.INSTANCE;
+                            final Random rnd = new Random(seed);
+
+                            for (int i = 0; i < numElements; i++) {
+                                serializer.serialize(rnd.nextLong(), out);
                             }
-                        } catch (Throwable t) {
-                            error.set(t);
                         }
                     }
                 };
@@ -81,18 +72,18 @@ public class SocketStreamIteratorTest {
 
         final Random validator = new Random(seed);
         for (int i = 0; i < numElements; i++) {
-            assertTrue(iterator.hasNext());
-            assertTrue(iterator.hasNext());
-            assertEquals(validator.nextLong(), iterator.next().longValue());
+            assertThat(iterator).hasNext();
+            assertThat(iterator).hasNext();
+            assertThat(iterator.next()).isEqualTo(validator.nextLong());
         }
 
-        assertFalse(iterator.hasNext());
-        writer.join();
-        assertFalse(iterator.hasNext());
+        assertThat(iterator).isExhausted();
+        writer.sync();
+        assertThat(iterator).isExhausted();
     }
 
     @Test
-    public void testIteratorWithException() throws Exception {
+    void testIteratorWithException() throws Exception {
 
         final SocketStreamIterator<Long> iterator =
                 new SocketStreamIterator<>(LongSerializer.INSTANCE);
@@ -109,10 +100,8 @@ public class SocketStreamIteratorTest {
             }
         }.start();
 
-        try {
-            iterator.hasNext();
-        } catch (Exception e) {
-            assertTrue(e.getCause().getMessage().contains("test"));
-        }
+        assertThatThrownBy(iterator::hasNext)
+                .isInstanceOf(RuntimeException.class)
+                .hasMessageContaining("test");
     }
 }

Reply via email to