1996fanrui commented on code in PR #24881:
URL: https://github.com/apache/flink/pull/24881#discussion_r1636152962


##########
flink-core/src/test/java/org/apache/flink/core/fs/InitOutputPathTest.java:
##########
@@ -23,59 +23,52 @@
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.runner.RunWith;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
+import lombok.SneakyThrows;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.nio.file.FileAlreadyExistsException;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static org.junit.Assert.fail;
-import static org.powermock.api.mockito.PowerMockito.whenNew;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.junit.Assert.assertThrows;
 
 /** A test validating that the initialization of local output paths is 
properly synchronized. */
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(LocalFileSystem.class)
-public class InitOutputPathTest {
+class InitOutputPathTest {
 
-    @Rule public final TemporaryFolder tempDir = new TemporaryFolder();
+    @TempDir private static java.nio.file.Path tempFolder;
 
     /**
      * This test validates that this test case makes sense - that the error 
can be produced in the
      * absence of synchronization, if the threads make progress in a certain 
way, here enforced by
      * latches.
      */
     @Test
-    public void testErrorOccursUnSynchronized() throws Exception {
+    void testErrorOccursUnSynchronized() throws Exception {
         // deactivate the lock to produce the original un-synchronized state
         Field lock = 
FileSystem.class.getDeclaredField("OUTPUT_DIRECTORY_INIT_LOCK");
         lock.setAccessible(true);
-        lock.set(null, new NoOpLock());
 
-        try {
-            // in the original un-synchronized state, we can force the race to 
occur by using
-            // the proper latch order to control the process of the concurrent 
threads
-            runTest(true);
-            fail("should fail with an exception");
-        } catch (FileNotFoundException e) {
-            // expected
-        } finally {
-            // reset the proper value
-            lock.set(null, new ReentrantLock(true));
-        }
+        Field modifiers = Field.class.getDeclaredField("modifiers");
+        modifiers.setAccessible(true);
+        modifiers.setInt(lock, lock.getModifiers() & ~Modifier.FINAL);
+
+        lock.set(null, new NoOpLock());
+        // in the original un-synchronized state, we can force the race to 
occur by using
+        // the proper latch order to control the process of the concurrent 
threads
+        assertThrows(FileNotFoundException.class, () -> runTest(true));

Review Comment:
   using assertThatThrownBy instead.



##########
flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java:
##########
@@ -20,26 +20,27 @@
 
 import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.util.AbstractAutoCloseableRegistry;
-import org.apache.flink.util.ExceptionUtils;
 
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.Closeable;
+import java.io.File;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
 /** Tests for the {@link SafetyNetCloseableRegistry}. */
 public class SafetyNetCloseableRegistryTest

Review Comment:
   ```suggestion
   class SafetyNetCloseableRegistryTest
   ```



##########
flink-core/src/test/java/org/apache/flink/core/fs/local/LocalFileSystemRecoverableWriterTest.java:
##########
@@ -21,18 +21,18 @@
 import org.apache.flink.core.fs.AbstractRecoverableWriterTest;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 
-import org.junit.ClassRule;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.io.TempDir;
 
 /** Tests for the {@link LocalRecoverableWriter}. */
 public class LocalFileSystemRecoverableWriterTest extends 
AbstractRecoverableWriterTest {

Review Comment:
   ```suggestion
   class LocalFileSystemRecoverableWriterTest extends 
AbstractRecoverableWriterTest {
   ```



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/security/token/HiveServer2DelegationTokenProviderITCase.java:
##########
@@ -49,15 +49,15 @@
 public class HiveServer2DelegationTokenProviderITCase {

Review Comment:
   ```suggestion
   class HiveServer2DelegationTokenProviderITCase {
   ```
   
   Note: the public can be removed for all test methods.



##########
flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java:
##########
@@ -20,483 +20,420 @@
 
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
 import java.util.Calendar;
 import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.junit.Assert.*;
+import static org.assertj.core.api.Assertions.assertThat;
 
-public class LocatableSplitAssignerTest {
+class LocatableSplitAssignerTest {
 
     @Test
-    public void testSerialSplitAssignmentWithNullHost() {
-        try {
-            final int NUM_SPLITS = 50;
-            final String[][] hosts =
-                    new String[][] {new String[] {"localhost"}, new String[0], 
null};
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 3]));
-            }
+    void testSerialSplitAssignmentWithNullHost() {
+        final int NUM_SPLITS = 50;
+        final String[][] hosts = new String[][] {new String[] {"localhost"}, 
new String[0], null};
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit(null, 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % 3]));
+        }
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("", 0));
-            assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(0, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit(null, 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("", 0)).isNull();
+        assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_SPLITS);
+        assertThat(ia.getNumberOfLocalAssignments()).isZero();
     }
 
     @Test
-    public void testSerialSplitAssignmentAllForSameHost() {
-        try {
-            final int NUM_SPLITS = 50;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, "testhost"));
-            }
+    void testSerialSplitAssignmentAllForSameHost() {
+        final int NUM_SPLITS = 50;
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
-
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("", 0));
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, "testhost"));
+        }
 
-            assertEquals(0, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isZero();
+        assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_SPLITS);
     }
 
     @Test
-    public void testSerialSplitAssignmentAllForRemoteHost() {
-        try {
-            final String[] hosts = {"host1", "host1", "host1", "host2", 
"host2", "host3"};
-            final int NUM_SPLITS = 10 * hosts.length;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 
hosts.length]));
-            }
+    void testSerialSplitAssignmentAllForRemoteHost() {
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
+        final String[] hosts = {"host1", "host1", "host1", "host2", "host2", 
"host3"};
+        final int NUM_SPLITS = 10 * hosts.length;
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+        }
 
-            assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(0, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_SPLITS);
+        assertThat(ia.getNumberOfLocalAssignments()).isZero();
     }
 
     @Test
-    public void testSerialSplitAssignmentSomeForRemoteHost() {
-        try {
-
-            // host1 reads all local
-            // host2 reads 10 local and 10 remote
-            // host3 reads all remote
-            final String[] hosts = {"host1", "host2", "host3"};
-            final int NUM_LOCAL_HOST1_SPLITS = 20;
-            final int NUM_LOCAL_HOST2_SPLITS = 10;
-            final int NUM_REMOTE_SPLITS = 30;
-            final int NUM_LOCAL_SPLITS = NUM_LOCAL_HOST1_SPLITS + 
NUM_LOCAL_HOST2_SPLITS;
-
-            // load local splits
-            int splitCnt = 0;
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            // host1 splits
-            for (int i = 0; i < NUM_LOCAL_HOST1_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "host1"));
-            }
-            // host2 splits
-            for (int i = 0; i < NUM_LOCAL_HOST2_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "host2"));
-            }
-            // load remote splits
-            for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
-            }
+    void testSerialSplitAssignmentSomeForRemoteHost() {
+
+        // host1 reads all local
+        // host2 reads 10 local and 10 remote
+        // host3 reads all remote
+        final String[] hosts = {"host1", "host2", "host3"};
+        final int NUM_LOCAL_HOST1_SPLITS = 20;
+        final int NUM_LOCAL_HOST2_SPLITS = 10;
+        final int NUM_REMOTE_SPLITS = 30;
+        final int NUM_LOCAL_SPLITS = NUM_LOCAL_HOST1_SPLITS + 
NUM_LOCAL_HOST2_SPLITS;
+
+        // load local splits
+        int splitCnt = 0;
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        // host1 splits
+        for (int i = 0; i < NUM_LOCAL_HOST1_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "host1"));
+        }
+        // host2 splits
+        for (int i = 0; i < NUM_LOCAL_HOST2_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "host2"));
+        }
+        // load remote splits
+        for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
+        }
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            int i = 0;
-            while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) 
!= null) {
-                assertTrue(splits.remove(is));
-            }
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        int i = 0;
+        while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) != 
null) {
+            assertThat(splits.remove(is)).isTrue();
+        }
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
 
-            assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        
assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_REMOTE_SPLITS);
+        
assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_LOCAL_SPLITS);
     }
 
     @Test
-    public void testSerialSplitAssignmentMultiLocalHost() {
-        try {
-
-            final String[] localHosts = {"local1", "local2", "local3"};
-            final String[] remoteHosts = {"remote1", "remote2", "remote3"};
-            final String[] requestingHosts = {"local3", "local2", "local1", 
"other"};
-
-            final int NUM_THREE_LOCAL_SPLITS = 10;
-            final int NUM_TWO_LOCAL_SPLITS = 10;
-            final int NUM_ONE_LOCAL_SPLITS = 10;
-            final int NUM_LOCAL_SPLITS = 30;
-            final int NUM_REMOTE_SPLITS = 10;
-            final int NUM_SPLITS = 40;
-
-            String[] threeLocalHosts = localHosts;
-            String[] twoLocalHosts = {localHosts[0], localHosts[1], 
remoteHosts[0]};
-            String[] oneLocalHost = {localHosts[0], remoteHosts[0], 
remoteHosts[1]};
-            String[] noLocalHost = remoteHosts;
-
-            int splitCnt = 0;
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            // add splits with three local hosts
-            for (int i = 0; i < NUM_THREE_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, 
threeLocalHosts));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_TWO_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_ONE_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, noLocalHost));
-            }
+    void testSerialSplitAssignmentMultiLocalHost() {
+
+        final String[] localHosts = {"local1", "local2", "local3"};
+        final String[] remoteHosts = {"remote1", "remote2", "remote3"};
+        final String[] requestingHosts = {"local3", "local2", "local1", 
"other"};
+
+        final int NUM_THREE_LOCAL_SPLITS = 10;
+        final int NUM_TWO_LOCAL_SPLITS = 10;
+        final int NUM_ONE_LOCAL_SPLITS = 10;
+        final int NUM_LOCAL_SPLITS = 30;
+        final int NUM_REMOTE_SPLITS = 10;
+        final int NUM_SPLITS = 40;
+
+        String[] threeLocalHosts = localHosts;
+        String[] twoLocalHosts = {localHosts[0], localHosts[1], 
remoteHosts[0]};
+        String[] oneLocalHost = {localHosts[0], remoteHosts[0], 
remoteHosts[1]};
+        String[] noLocalHost = remoteHosts;
+
+        int splitCnt = 0;
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        // add splits with three local hosts
+        for (int i = 0; i < NUM_THREE_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, threeLocalHosts));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_TWO_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_ONE_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, noLocalHost));
+        }
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            LocatableInputSplit is = null;
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                String host = requestingHosts[i % requestingHosts.length];
-                is = ia.getNextInputSplit(host, 0);
-                // check valid split
-                assertTrue(is != null);
-                // check unassigned split
-                assertTrue(splits.remove(is));
-                // check priority of split
-                if (host.equals(localHosts[0])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), oneLocalHost));
-                } else if (host.equals(localHosts[1])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), 
twoLocalHosts));
-                } else if (host.equals(localHosts[2])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), 
threeLocalHosts));
-                } else {
-                    assertTrue(Arrays.equals(is.getHostnames(), noLocalHost));
-                }
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        LocatableInputSplit is = null;
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            String host = requestingHosts[i % requestingHosts.length];
+            is = ia.getNextInputSplit(host, 0);
+            // check valid split
+            assertThat(is).isNotNull();
+            // check unassigned split
+            assertThat(splits.remove(is)).isTrue();
+            // check priority of split
+            if (host.equals(localHosts[0])) {
+                assertThat(is.getHostnames()).isEqualTo(oneLocalHost);
+            } else if (host.equals(localHosts[1])) {
+                assertThat(is.getHostnames()).isEqualTo(twoLocalHosts);
+            } else if (host.equals(localHosts[2])) {
+                assertThat(is.getHostnames()).isEqualTo(threeLocalHosts);
+            } else {
+                assertThat(is.getHostnames()).isEqualTo(noLocalHost);
             }
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
-
-            assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
         }
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
+
+        
assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_REMOTE_SPLITS);
+        
assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_LOCAL_SPLITS);
     }
 
     @Test
-    public void testSerialSplitAssignmentMixedLocalHost() {
-        try {
-            final String[] hosts = {"host1", "host1", "host1", "host2", 
"host2", "host3"};
-            final int NUM_SPLITS = 10 * hosts.length;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 
hosts.length]));
-            }
+    void testSerialSplitAssignmentMixedLocalHost() {
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            int i = 0;
-            while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) 
!= null) {
-                assertTrue(splits.remove(is));
-            }
+        final String[] hosts = {"host1", "host1", "host1", "host2", "host2", 
"host3"};
+        final int NUM_SPLITS = 10 * hosts.length;
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+        }
 
-            assertEquals(0, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        int i = 0;
+        while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) != 
null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isZero();
+        assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_SPLITS);
     }
 
     @Test
-    public void testConcurrentSplitAssignmentNullHost() {
-        try {
-            final int NUM_THREADS = 10;
-            final int NUM_SPLITS = 500;
-            final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
-
-            final String[][] hosts =
-                    new String[][] {new String[] {"localhost"}, new String[0], 
null};
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 3]));
-            }
+    void testConcurrentSplitAssignmentNullHost() throws InterruptedException {
 
-            final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-
-            final AtomicInteger splitsRetrieved = new AtomicInteger(0);
-            final AtomicInteger sumOfIds = new AtomicInteger(0);
-
-            Runnable retriever =
-                    new Runnable() {
-
-                        @Override
-                        public void run() {
-                            LocatableInputSplit split;
-                            while ((split = ia.getNextInputSplit(null, 0)) != 
null) {
-                                splitsRetrieved.incrementAndGet();
-                                sumOfIds.addAndGet(split.getSplitNumber());
-                            }
-                        }
-                    };
-
-            // create the threads
-            Thread[] threads = new Thread[NUM_THREADS];
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i] = new Thread(retriever);
-                threads[i].setDaemon(true);
-            }
+        final int NUM_THREADS = 10;
+        final int NUM_SPLITS = 500;
+        final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
 
-            // launch concurrently
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].start();
-            }
+        final String[][] hosts = new String[][] {new String[] {"localhost"}, 
new String[0], null};
 
-            // sync
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].join(5000);
-            }
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % 3]));
+        }
 
-            // verify
-            for (int i = 0; i < NUM_THREADS; i++) {
-                if (threads[i].isAlive()) {
-                    fail(
-                            "The concurrency test case is erroneous, the 
thread did not respond in time.");
-                }
-            }
+        final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+
+        final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+        final AtomicInteger sumOfIds = new AtomicInteger(0);
+
+        Runnable retriever =
+                () -> {
+                    LocatableInputSplit split;
+                    while ((split = ia.getNextInputSplit(null, 0)) != null) {
+                        splitsRetrieved.incrementAndGet();
+                        sumOfIds.addAndGet(split.getSplitNumber());
+                    }
+                };
+
+        // create the threads
+        Thread[] threads = new Thread[NUM_THREADS];
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i] = new Thread(retriever);
+            threads[i].setDaemon(true);
+        }
 
-            assertEquals(NUM_SPLITS, splitsRetrieved.get());
-            assertEquals(SUM_OF_IDS, sumOfIds.get());
+        // launch concurrently
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].start();
+        }
 
-            // nothing left
-            assertNull(ia.getNextInputSplit("", 0));
+        // sync
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].join(5000);
+        }
 
-            assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(0, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // verify
+        for (int i = 0; i < NUM_THREADS; i++) {
+            assertThat(threads[i].isAlive()).isFalse();
         }
+
+        assertThat(splitsRetrieved.get()).isEqualTo(NUM_SPLITS);
+        assertThat(sumOfIds.get()).isEqualTo(SUM_OF_IDS);
+
+        // nothing left
+        assertThat(ia.getNextInputSplit("", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_SPLITS);
+        assertThat(ia.getNumberOfLocalAssignments()).isZero();
     }
 
     @Test
-    public void testConcurrentSplitAssignmentForSingleHost() {
-        try {
-            final int NUM_THREADS = 10;
-            final int NUM_SPLITS = 500;
-            final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, "testhost"));
-            }
+    void testConcurrentSplitAssignmentForSingleHost() throws 
InterruptedException {
 
-            final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-
-            final AtomicInteger splitsRetrieved = new AtomicInteger(0);
-            final AtomicInteger sumOfIds = new AtomicInteger(0);
-
-            Runnable retriever =
-                    new Runnable() {
-
-                        @Override
-                        public void run() {
-                            LocatableInputSplit split;
-                            while ((split = ia.getNextInputSplit("testhost", 
0)) != null) {
-                                splitsRetrieved.incrementAndGet();
-                                sumOfIds.addAndGet(split.getSplitNumber());
-                            }
-                        }
-                    };
-
-            // create the threads
-            Thread[] threads = new Thread[NUM_THREADS];
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i] = new Thread(retriever);
-                threads[i].setDaemon(true);
-            }
+        final int NUM_THREADS = 10;
+        final int NUM_SPLITS = 500;
+        final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
 
-            // launch concurrently
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].start();
-            }
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, "testhost"));
+        }
 
-            // sync
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].join(5000);
-            }
+        final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
 
-            // verify
-            for (int i = 0; i < NUM_THREADS; i++) {
-                if (threads[i].isAlive()) {
-                    fail(
-                            "The concurrency test case is erroneous, the 
thread did not respond in time.");
-                }
-            }
+        final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+        final AtomicInteger sumOfIds = new AtomicInteger(0);
+
+        Runnable retriever =
+                () -> {
+                    LocatableInputSplit split;
+                    while ((split = ia.getNextInputSplit("testhost", 0)) != 
null) {
+                        splitsRetrieved.incrementAndGet();
+                        sumOfIds.addAndGet(split.getSplitNumber());
+                    }
+                };
+
+        // create the threads
+        Thread[] threads = new Thread[NUM_THREADS];
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i] = new Thread(retriever);
+            threads[i].setDaemon(true);
+        }
 
-            assertEquals(NUM_SPLITS, splitsRetrieved.get());
-            assertEquals(SUM_OF_IDS, sumOfIds.get());
+        // launch concurrently
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].start();
+        }
 
-            // nothing left
-            assertNull(ia.getNextInputSplit("testhost", 0));
+        // sync
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].join(5000);
+        }
 
-            assertEquals(0, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // verify
+        for (int i = 0; i < NUM_THREADS; i++) {
+            assertThat(threads[i].isAlive()).isFalse();
         }
+
+        assertThat(splitsRetrieved.get()).isEqualTo(NUM_SPLITS);
+        assertThat(sumOfIds.get()).isEqualTo(SUM_OF_IDS);

Review Comment:
   These 2 can be updated to hasValue as well.



##########
flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java:
##########
@@ -20,483 +20,420 @@
 
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
 import java.util.Calendar;
 import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.junit.Assert.*;
+import static org.assertj.core.api.Assertions.assertThat;
 
-public class LocatableSplitAssignerTest {
+class LocatableSplitAssignerTest {
 
     @Test
-    public void testSerialSplitAssignmentWithNullHost() {
-        try {
-            final int NUM_SPLITS = 50;
-            final String[][] hosts =
-                    new String[][] {new String[] {"localhost"}, new String[0], 
null};
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 3]));
-            }
+    void testSerialSplitAssignmentWithNullHost() {
+        final int NUM_SPLITS = 50;
+        final String[][] hosts = new String[][] {new String[] {"localhost"}, 
new String[0], null};
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit(null, 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % 3]));
+        }
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("", 0));
-            assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(0, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit(null, 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("", 0)).isNull();
+        assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_SPLITS);
+        assertThat(ia.getNumberOfLocalAssignments()).isZero();
     }
 
     @Test
-    public void testSerialSplitAssignmentAllForSameHost() {
-        try {
-            final int NUM_SPLITS = 50;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, "testhost"));
-            }
+    void testSerialSplitAssignmentAllForSameHost() {
+        final int NUM_SPLITS = 50;
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
-
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("", 0));
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, "testhost"));
+        }
 
-            assertEquals(0, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isZero();
+        assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_SPLITS);
     }
 
     @Test
-    public void testSerialSplitAssignmentAllForRemoteHost() {
-        try {
-            final String[] hosts = {"host1", "host1", "host1", "host2", 
"host2", "host3"};
-            final int NUM_SPLITS = 10 * hosts.length;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 
hosts.length]));
-            }
+    void testSerialSplitAssignmentAllForRemoteHost() {
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
+        final String[] hosts = {"host1", "host1", "host1", "host2", "host2", 
"host3"};
+        final int NUM_SPLITS = 10 * hosts.length;
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+        }
 
-            assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(0, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_SPLITS);
+        assertThat(ia.getNumberOfLocalAssignments()).isZero();
     }
 
     @Test
-    public void testSerialSplitAssignmentSomeForRemoteHost() {
-        try {
-
-            // host1 reads all local
-            // host2 reads 10 local and 10 remote
-            // host3 reads all remote
-            final String[] hosts = {"host1", "host2", "host3"};
-            final int NUM_LOCAL_HOST1_SPLITS = 20;
-            final int NUM_LOCAL_HOST2_SPLITS = 10;
-            final int NUM_REMOTE_SPLITS = 30;
-            final int NUM_LOCAL_SPLITS = NUM_LOCAL_HOST1_SPLITS + 
NUM_LOCAL_HOST2_SPLITS;
-
-            // load local splits
-            int splitCnt = 0;
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            // host1 splits
-            for (int i = 0; i < NUM_LOCAL_HOST1_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "host1"));
-            }
-            // host2 splits
-            for (int i = 0; i < NUM_LOCAL_HOST2_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "host2"));
-            }
-            // load remote splits
-            for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
-            }
+    void testSerialSplitAssignmentSomeForRemoteHost() {
+
+        // host1 reads all local
+        // host2 reads 10 local and 10 remote
+        // host3 reads all remote
+        final String[] hosts = {"host1", "host2", "host3"};
+        final int NUM_LOCAL_HOST1_SPLITS = 20;
+        final int NUM_LOCAL_HOST2_SPLITS = 10;
+        final int NUM_REMOTE_SPLITS = 30;
+        final int NUM_LOCAL_SPLITS = NUM_LOCAL_HOST1_SPLITS + 
NUM_LOCAL_HOST2_SPLITS;
+
+        // load local splits
+        int splitCnt = 0;
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        // host1 splits
+        for (int i = 0; i < NUM_LOCAL_HOST1_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "host1"));
+        }
+        // host2 splits
+        for (int i = 0; i < NUM_LOCAL_HOST2_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "host2"));
+        }
+        // load remote splits
+        for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
+        }
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            int i = 0;
-            while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) 
!= null) {
-                assertTrue(splits.remove(is));
-            }
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        int i = 0;
+        while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) != 
null) {
+            assertThat(splits.remove(is)).isTrue();
+        }
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
 
-            assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        
assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_REMOTE_SPLITS);
+        
assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_LOCAL_SPLITS);
     }
 
     @Test
-    public void testSerialSplitAssignmentMultiLocalHost() {
-        try {
-
-            final String[] localHosts = {"local1", "local2", "local3"};
-            final String[] remoteHosts = {"remote1", "remote2", "remote3"};
-            final String[] requestingHosts = {"local3", "local2", "local1", 
"other"};
-
-            final int NUM_THREE_LOCAL_SPLITS = 10;
-            final int NUM_TWO_LOCAL_SPLITS = 10;
-            final int NUM_ONE_LOCAL_SPLITS = 10;
-            final int NUM_LOCAL_SPLITS = 30;
-            final int NUM_REMOTE_SPLITS = 10;
-            final int NUM_SPLITS = 40;
-
-            String[] threeLocalHosts = localHosts;
-            String[] twoLocalHosts = {localHosts[0], localHosts[1], 
remoteHosts[0]};
-            String[] oneLocalHost = {localHosts[0], remoteHosts[0], 
remoteHosts[1]};
-            String[] noLocalHost = remoteHosts;
-
-            int splitCnt = 0;
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            // add splits with three local hosts
-            for (int i = 0; i < NUM_THREE_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, 
threeLocalHosts));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_TWO_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_ONE_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, noLocalHost));
-            }
+    void testSerialSplitAssignmentMultiLocalHost() {
+
+        final String[] localHosts = {"local1", "local2", "local3"};
+        final String[] remoteHosts = {"remote1", "remote2", "remote3"};
+        final String[] requestingHosts = {"local3", "local2", "local1", 
"other"};
+
+        final int NUM_THREE_LOCAL_SPLITS = 10;
+        final int NUM_TWO_LOCAL_SPLITS = 10;
+        final int NUM_ONE_LOCAL_SPLITS = 10;
+        final int NUM_LOCAL_SPLITS = 30;
+        final int NUM_REMOTE_SPLITS = 10;
+        final int NUM_SPLITS = 40;
+
+        String[] threeLocalHosts = localHosts;
+        String[] twoLocalHosts = {localHosts[0], localHosts[1], 
remoteHosts[0]};
+        String[] oneLocalHost = {localHosts[0], remoteHosts[0], 
remoteHosts[1]};
+        String[] noLocalHost = remoteHosts;
+
+        int splitCnt = 0;
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        // add splits with three local hosts
+        for (int i = 0; i < NUM_THREE_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, threeLocalHosts));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_TWO_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_ONE_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, noLocalHost));
+        }
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            LocatableInputSplit is = null;
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                String host = requestingHosts[i % requestingHosts.length];
-                is = ia.getNextInputSplit(host, 0);
-                // check valid split
-                assertTrue(is != null);
-                // check unassigned split
-                assertTrue(splits.remove(is));
-                // check priority of split
-                if (host.equals(localHosts[0])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), oneLocalHost));
-                } else if (host.equals(localHosts[1])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), 
twoLocalHosts));
-                } else if (host.equals(localHosts[2])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), 
threeLocalHosts));
-                } else {
-                    assertTrue(Arrays.equals(is.getHostnames(), noLocalHost));
-                }
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        LocatableInputSplit is = null;
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            String host = requestingHosts[i % requestingHosts.length];
+            is = ia.getNextInputSplit(host, 0);
+            // check valid split
+            assertThat(is).isNotNull();
+            // check unassigned split
+            assertThat(splits.remove(is)).isTrue();
+            // check priority of split
+            if (host.equals(localHosts[0])) {
+                assertThat(is.getHostnames()).isEqualTo(oneLocalHost);
+            } else if (host.equals(localHosts[1])) {
+                assertThat(is.getHostnames()).isEqualTo(twoLocalHosts);
+            } else if (host.equals(localHosts[2])) {
+                assertThat(is.getHostnames()).isEqualTo(threeLocalHosts);
+            } else {
+                assertThat(is.getHostnames()).isEqualTo(noLocalHost);
             }
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
-
-            assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
         }
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
+
+        
assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_REMOTE_SPLITS);
+        
assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_LOCAL_SPLITS);
     }
 
     @Test
-    public void testSerialSplitAssignmentMixedLocalHost() {
-        try {
-            final String[] hosts = {"host1", "host1", "host1", "host2", 
"host2", "host3"};
-            final int NUM_SPLITS = 10 * hosts.length;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 
hosts.length]));
-            }
+    void testSerialSplitAssignmentMixedLocalHost() {
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            int i = 0;
-            while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) 
!= null) {
-                assertTrue(splits.remove(is));
-            }
+        final String[] hosts = {"host1", "host1", "host1", "host2", "host2", 
"host3"};
+        final int NUM_SPLITS = 10 * hosts.length;
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+        }
 
-            assertEquals(0, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        int i = 0;
+        while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) != 
null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isZero();
+        assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_SPLITS);
     }
 
     @Test
-    public void testConcurrentSplitAssignmentNullHost() {
-        try {
-            final int NUM_THREADS = 10;
-            final int NUM_SPLITS = 500;
-            final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
-
-            final String[][] hosts =
-                    new String[][] {new String[] {"localhost"}, new String[0], 
null};
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 3]));
-            }
+    void testConcurrentSplitAssignmentNullHost() throws InterruptedException {
 
-            final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-
-            final AtomicInteger splitsRetrieved = new AtomicInteger(0);
-            final AtomicInteger sumOfIds = new AtomicInteger(0);
-
-            Runnable retriever =
-                    new Runnable() {
-
-                        @Override
-                        public void run() {
-                            LocatableInputSplit split;
-                            while ((split = ia.getNextInputSplit(null, 0)) != 
null) {
-                                splitsRetrieved.incrementAndGet();
-                                sumOfIds.addAndGet(split.getSplitNumber());
-                            }
-                        }
-                    };
-
-            // create the threads
-            Thread[] threads = new Thread[NUM_THREADS];
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i] = new Thread(retriever);
-                threads[i].setDaemon(true);
-            }
+        final int NUM_THREADS = 10;
+        final int NUM_SPLITS = 500;
+        final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
 
-            // launch concurrently
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].start();
-            }
+        final String[][] hosts = new String[][] {new String[] {"localhost"}, 
new String[0], null};
 
-            // sync
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].join(5000);
-            }
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % 3]));
+        }
 
-            // verify
-            for (int i = 0; i < NUM_THREADS; i++) {
-                if (threads[i].isAlive()) {
-                    fail(
-                            "The concurrency test case is erroneous, the 
thread did not respond in time.");
-                }
-            }
+        final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+
+        final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+        final AtomicInteger sumOfIds = new AtomicInteger(0);
+
+        Runnable retriever =
+                () -> {
+                    LocatableInputSplit split;
+                    while ((split = ia.getNextInputSplit(null, 0)) != null) {
+                        splitsRetrieved.incrementAndGet();
+                        sumOfIds.addAndGet(split.getSplitNumber());
+                    }
+                };
+
+        // create the threads
+        Thread[] threads = new Thread[NUM_THREADS];
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i] = new Thread(retriever);
+            threads[i].setDaemon(true);
+        }
 
-            assertEquals(NUM_SPLITS, splitsRetrieved.get());
-            assertEquals(SUM_OF_IDS, sumOfIds.get());
+        // launch concurrently
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].start();
+        }
 
-            // nothing left
-            assertNull(ia.getNextInputSplit("", 0));
+        // sync
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].join(5000);
+        }
 
-            assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(0, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // verify
+        for (int i = 0; i < NUM_THREADS; i++) {
+            assertThat(threads[i].isAlive()).isFalse();
         }
+
+        assertThat(splitsRetrieved.get()).isEqualTo(NUM_SPLITS);
+        assertThat(sumOfIds.get()).isEqualTo(SUM_OF_IDS);
+
+        // nothing left
+        assertThat(ia.getNextInputSplit("", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_SPLITS);
+        assertThat(ia.getNumberOfLocalAssignments()).isZero();
     }
 
     @Test
-    public void testConcurrentSplitAssignmentForSingleHost() {
-        try {
-            final int NUM_THREADS = 10;
-            final int NUM_SPLITS = 500;
-            final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, "testhost"));
-            }
+    void testConcurrentSplitAssignmentForSingleHost() throws 
InterruptedException {
 
-            final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-
-            final AtomicInteger splitsRetrieved = new AtomicInteger(0);
-            final AtomicInteger sumOfIds = new AtomicInteger(0);
-
-            Runnable retriever =
-                    new Runnable() {
-
-                        @Override
-                        public void run() {
-                            LocatableInputSplit split;
-                            while ((split = ia.getNextInputSplit("testhost", 
0)) != null) {
-                                splitsRetrieved.incrementAndGet();
-                                sumOfIds.addAndGet(split.getSplitNumber());
-                            }
-                        }
-                    };
-
-            // create the threads
-            Thread[] threads = new Thread[NUM_THREADS];
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i] = new Thread(retriever);
-                threads[i].setDaemon(true);
-            }
+        final int NUM_THREADS = 10;
+        final int NUM_SPLITS = 500;
+        final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
 
-            // launch concurrently
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].start();
-            }
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, "testhost"));
+        }
 
-            // sync
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].join(5000);
-            }
+        final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
 
-            // verify
-            for (int i = 0; i < NUM_THREADS; i++) {
-                if (threads[i].isAlive()) {
-                    fail(
-                            "The concurrency test case is erroneous, the 
thread did not respond in time.");
-                }
-            }
+        final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+        final AtomicInteger sumOfIds = new AtomicInteger(0);
+
+        Runnable retriever =
+                () -> {
+                    LocatableInputSplit split;
+                    while ((split = ia.getNextInputSplit("testhost", 0)) != 
null) {
+                        splitsRetrieved.incrementAndGet();
+                        sumOfIds.addAndGet(split.getSplitNumber());
+                    }
+                };
+
+        // create the threads
+        Thread[] threads = new Thread[NUM_THREADS];
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i] = new Thread(retriever);
+            threads[i].setDaemon(true);
+        }
 
-            assertEquals(NUM_SPLITS, splitsRetrieved.get());
-            assertEquals(SUM_OF_IDS, sumOfIds.get());
+        // launch concurrently
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].start();
+        }
 
-            // nothing left
-            assertNull(ia.getNextInputSplit("testhost", 0));
+        // sync
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].join(5000);
+        }
 
-            assertEquals(0, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // verify
+        for (int i = 0; i < NUM_THREADS; i++) {
+            assertThat(threads[i].isAlive()).isFalse();
         }
+
+        assertThat(splitsRetrieved.get()).isEqualTo(NUM_SPLITS);
+        assertThat(sumOfIds.get()).isEqualTo(SUM_OF_IDS);
+
+        // nothing left
+        assertThat(ia.getNextInputSplit("testhost", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isZero();
+        assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_SPLITS);
     }
 
     @Test
-    public void testConcurrentSplitAssignmentForMultipleHosts() {
-        try {
-            final int NUM_THREADS = 10;
-            final int NUM_SPLITS = 500;
-            final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
-
-            final String[] hosts = {"host1", "host1", "host1", "host2", 
"host2", "host3"};
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 
hosts.length]));
-            }
+    void testConcurrentSplitAssignmentForMultipleHosts() throws 
InterruptedException {
 
-            final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        final int NUM_THREADS = 10;
+        final int NUM_SPLITS = 500;
+        final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
 
-            final AtomicInteger splitsRetrieved = new AtomicInteger(0);
-            final AtomicInteger sumOfIds = new AtomicInteger(0);
+        final String[] hosts = {"host1", "host1", "host1", "host2", "host2", 
"host3"};
 
-            Runnable retriever =
-                    new Runnable() {
-
-                        @Override
-                        public void run() {
-                            final String threadHost = hosts[(int) 
(Math.random() * hosts.length)];
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+        }
 
-                            LocatableInputSplit split;
-                            while ((split = ia.getNextInputSplit(threadHost, 
0)) != null) {
-                                splitsRetrieved.incrementAndGet();
-                                sumOfIds.addAndGet(split.getSplitNumber());
-                            }
-                        }
-                    };
+        final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
 
-            // create the threads
-            Thread[] threads = new Thread[NUM_THREADS];
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i] = new Thread(retriever);
-                threads[i].setDaemon(true);
-            }
+        final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+        final AtomicInteger sumOfIds = new AtomicInteger(0);
+
+        Runnable retriever =
+                () -> {
+                    final String threadHost = hosts[(int) (Math.random() * 
hosts.length)];
+
+                    LocatableInputSplit split;
+                    while ((split = ia.getNextInputSplit(threadHost, 0)) != 
null) {
+                        splitsRetrieved.incrementAndGet();
+                        sumOfIds.addAndGet(split.getSplitNumber());
+                    }
+                };
+
+        // create the threads
+        Thread[] threads = new Thread[NUM_THREADS];
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i] = new Thread(retriever);
+            threads[i].setDaemon(true);
+        }
 
-            // launch concurrently
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].start();
-            }
+        // launch concurrently
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].start();
+        }
 
-            // sync
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].join(5000);
-            }
+        // sync
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].join(5000);
+        }
 
-            // verify
-            for (int i = 0; i < NUM_THREADS; i++) {
-                if (threads[i].isAlive()) {
-                    fail(
-                            "The concurrency test case is erroneous, the 
thread did not respond in time.");
-                }
-            }
+        // verify
+        for (int i = 0; i < NUM_THREADS; i++) {
+            assertThat(threads[i].isAlive()).isFalse();
+        }
 
-            assertEquals(NUM_SPLITS, splitsRetrieved.get());
-            assertEquals(SUM_OF_IDS, sumOfIds.get());
+        assertThat(splitsRetrieved.get()).isEqualTo(NUM_SPLITS);
+        assertThat(sumOfIds.get()).isEqualTo(SUM_OF_IDS);

Review Comment:
   These 2 can be updated to hasValue as well.



##########
flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java:
##########
@@ -137,7 +137,7 @@ public class HiveServer2EndpointITCase extends TestLogger {
             new 
HiveServer2EndpointExtension(SQL_GATEWAY_SERVICE_EXTENSION::getService);
 
     @BeforeAll
-    public static void setup() throws Exception {
+    static void setup() throws Exception {

Review Comment:
   This class shouldn't extends `TestLogger`, and the public can be removed at 
class level.
   
   Note: the public can be removed for all test methods.
   
   TBH, these changes should be finished in the previous PR, but they are 
forgot. Thank you for the updating!



##########
flink-core/src/test/java/org/apache/flink/core/io/DefaultSplitAssignerTest.java:
##########
@@ -20,105 +20,87 @@
 
 import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
 import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.junit.Assert.*;
+import static org.assertj.core.api.Assertions.assertThat;
 
-public class DefaultSplitAssignerTest {
+class DefaultSplitAssignerTest {
 
     @Test
-    public void testSerialSplitAssignment() {
-        try {
-            final int NUM_SPLITS = 50;
-
-            Set<InputSplit> splits = new HashSet<InputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new GenericInputSplit(i, NUM_SPLITS));
-            }
-
-            DefaultInputSplitAssigner ia = new 
DefaultInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit("", 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
-
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("", 0));
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+    void testSerialSplitAssignment() {
+        final int NUM_SPLITS = 50;
+
+        Set<InputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new GenericInputSplit(i, NUM_SPLITS));
+        }
+
+        DefaultInputSplitAssigner ia = new DefaultInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit("", 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("", 0)).isNull();
     }
 
     @Test
-    public void testConcurrentSplitAssignment() {
-        try {
-            final int NUM_THREADS = 10;
-            final int NUM_SPLITS = 500;
-            final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
-
-            Set<InputSplit> splits = new HashSet<InputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new GenericInputSplit(i, NUM_SPLITS));
-            }
-
-            final DefaultInputSplitAssigner ia = new 
DefaultInputSplitAssigner(splits);
-
-            final AtomicInteger splitsRetrieved = new AtomicInteger(0);
-            final AtomicInteger sumOfIds = new AtomicInteger(0);
-
-            Runnable retriever =
-                    new Runnable() {
-
-                        @Override
-                        public void run() {
-                            String host = "";
-                            GenericInputSplit split;
-                            while ((split = (GenericInputSplit) 
ia.getNextInputSplit(host, 0))
-                                    != null) {
-                                splitsRetrieved.incrementAndGet();
-                                sumOfIds.addAndGet(split.getSplitNumber());
-                            }
-                        }
-                    };
-
-            // create the threads
-            Thread[] threads = new Thread[NUM_THREADS];
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i] = new Thread(retriever);
-                threads[i].setDaemon(true);
-            }
-
-            // launch concurrently
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].start();
-            }
-
-            // sync
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].join(5000);
-            }
-
-            // verify
-            for (int i = 0; i < NUM_THREADS; i++) {
-                if (threads[i].isAlive()) {
-                    fail(
-                            "The concurrency test case is erroneous, the 
thread did not respond in time.");
-                }
-            }
-
-            assertEquals(NUM_SPLITS, splitsRetrieved.get());
-            assertEquals(SUM_OF_IDS, sumOfIds.get());
-
-            // nothing left
-            assertNull(ia.getNextInputSplit("", 0));
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+    void testConcurrentSplitAssignment() throws InterruptedException {
+        final int NUM_THREADS = 10;
+        final int NUM_SPLITS = 500;
+        final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
+
+        Set<InputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new GenericInputSplit(i, NUM_SPLITS));
+        }
+
+        final DefaultInputSplitAssigner ia = new 
DefaultInputSplitAssigner(splits);
+
+        final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+        final AtomicInteger sumOfIds = new AtomicInteger(0);
+
+        Runnable retriever =
+                () -> {
+                    String host = "";
+                    GenericInputSplit split;
+                    while ((split = (GenericInputSplit) 
ia.getNextInputSplit(host, 0)) != null) {
+                        splitsRetrieved.incrementAndGet();
+                        sumOfIds.addAndGet(split.getSplitNumber());
+                    }
+                };
+
+        // create the threads
+        Thread[] threads = new Thread[NUM_THREADS];
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i] = new Thread(retriever);
+            threads[i].setDaemon(true);
+        }
+
+        // launch concurrently
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].start();
+        }
+
+        // sync
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].join(5000);
         }
+
+        // verify
+        for (int i = 0; i < NUM_THREADS; i++) {
+            assertThat(threads[i].isAlive()).isFalse();
+        }
+
+        assertThat(splitsRetrieved.get()).isEqualTo(NUM_SPLITS);
+        assertThat(sumOfIds.get()).isEqualTo(SUM_OF_IDS);

Review Comment:
   ```suggestion
           assertThat(splitsRetrieved).hasValue(NUM_SPLITS);
           assertThat(sumOfIds).hasValue(SUM_OF_IDS);
   ```



##########
flink-core/src/test/java/org/apache/flink/core/state/StateFutureTest.java:
##########
@@ -45,13 +44,13 @@ public class StateFutureTest {
             };
 
     @Test
-    public void basicSyncComplete() {
+    void basicSyncComplete() {
         StateFutureImpl.CallbackRunner runner = new TestCallbackRunner(null);
         final AtomicInteger counter = new AtomicInteger(0);

Review Comment:
   All assertThat can be updated to hasValue as well.



##########
flink-core/src/test/java/org/apache/flink/core/state/StateFutureTest.java:
##########
@@ -35,7 +35,6 @@
 import java.util.concurrent.atomic.AtomicReference;
 
 import static org.assertj.core.api.Assertions.assertThat;
-import static org.assertj.core.api.Assertions.fail;
 
 /** Tests for {@link StateFuture} related implementations. */
 public class StateFutureTest {

Review Comment:
   ```suggestion
   class StateFutureTest {
   ```



##########
flink-core/src/test/java/org/apache/flink/core/io/LocatableSplitAssignerTest.java:
##########
@@ -20,483 +20,420 @@
 
 import org.apache.flink.api.common.io.LocatableInputSplitAssigner;
 
-import org.junit.Test;
+import org.junit.jupiter.api.Test;
 
-import java.util.Arrays;
 import java.util.Calendar;
 import java.util.HashSet;
 import java.util.Random;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.junit.Assert.*;
+import static org.assertj.core.api.Assertions.assertThat;
 
-public class LocatableSplitAssignerTest {
+class LocatableSplitAssignerTest {
 
     @Test
-    public void testSerialSplitAssignmentWithNullHost() {
-        try {
-            final int NUM_SPLITS = 50;
-            final String[][] hosts =
-                    new String[][] {new String[] {"localhost"}, new String[0], 
null};
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 3]));
-            }
+    void testSerialSplitAssignmentWithNullHost() {
+        final int NUM_SPLITS = 50;
+        final String[][] hosts = new String[][] {new String[] {"localhost"}, 
new String[0], null};
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit(null, 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % 3]));
+        }
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("", 0));
-            assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(0, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit(null, 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("", 0)).isNull();
+        assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_SPLITS);
+        assertThat(ia.getNumberOfLocalAssignments()).isZero();
     }
 
     @Test
-    public void testSerialSplitAssignmentAllForSameHost() {
-        try {
-            final int NUM_SPLITS = 50;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, "testhost"));
-            }
+    void testSerialSplitAssignmentAllForSameHost() {
+        final int NUM_SPLITS = 50;
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
-
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("", 0));
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, "testhost"));
+        }
 
-            assertEquals(0, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isZero();
+        assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_SPLITS);
     }
 
     @Test
-    public void testSerialSplitAssignmentAllForRemoteHost() {
-        try {
-            final String[] hosts = {"host1", "host1", "host1", "host2", 
"host2", "host3"};
-            final int NUM_SPLITS = 10 * hosts.length;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 
hosts.length]));
-            }
+    void testSerialSplitAssignmentAllForRemoteHost() {
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
-                assertTrue(splits.remove(is));
-            }
+        final String[] hosts = {"host1", "host1", "host1", "host2", "host2", 
"host3"};
+        final int NUM_SPLITS = 10 * hosts.length;
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+        }
 
-            assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(0, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        while ((is = ia.getNextInputSplit("testhost", 0)) != null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_SPLITS);
+        assertThat(ia.getNumberOfLocalAssignments()).isZero();
     }
 
     @Test
-    public void testSerialSplitAssignmentSomeForRemoteHost() {
-        try {
-
-            // host1 reads all local
-            // host2 reads 10 local and 10 remote
-            // host3 reads all remote
-            final String[] hosts = {"host1", "host2", "host3"};
-            final int NUM_LOCAL_HOST1_SPLITS = 20;
-            final int NUM_LOCAL_HOST2_SPLITS = 10;
-            final int NUM_REMOTE_SPLITS = 30;
-            final int NUM_LOCAL_SPLITS = NUM_LOCAL_HOST1_SPLITS + 
NUM_LOCAL_HOST2_SPLITS;
-
-            // load local splits
-            int splitCnt = 0;
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            // host1 splits
-            for (int i = 0; i < NUM_LOCAL_HOST1_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "host1"));
-            }
-            // host2 splits
-            for (int i = 0; i < NUM_LOCAL_HOST2_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "host2"));
-            }
-            // load remote splits
-            for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
-            }
+    void testSerialSplitAssignmentSomeForRemoteHost() {
+
+        // host1 reads all local
+        // host2 reads 10 local and 10 remote
+        // host3 reads all remote
+        final String[] hosts = {"host1", "host2", "host3"};
+        final int NUM_LOCAL_HOST1_SPLITS = 20;
+        final int NUM_LOCAL_HOST2_SPLITS = 10;
+        final int NUM_REMOTE_SPLITS = 30;
+        final int NUM_LOCAL_SPLITS = NUM_LOCAL_HOST1_SPLITS + 
NUM_LOCAL_HOST2_SPLITS;
+
+        // load local splits
+        int splitCnt = 0;
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        // host1 splits
+        for (int i = 0; i < NUM_LOCAL_HOST1_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "host1"));
+        }
+        // host2 splits
+        for (int i = 0; i < NUM_LOCAL_HOST2_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "host2"));
+        }
+        // load remote splits
+        for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, "remoteHost"));
+        }
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            int i = 0;
-            while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) 
!= null) {
-                assertTrue(splits.remove(is));
-            }
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        int i = 0;
+        while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) != 
null) {
+            assertThat(splits.remove(is)).isTrue();
+        }
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
 
-            assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
-        }
+        
assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_REMOTE_SPLITS);
+        
assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_LOCAL_SPLITS);
     }
 
     @Test
-    public void testSerialSplitAssignmentMultiLocalHost() {
-        try {
-
-            final String[] localHosts = {"local1", "local2", "local3"};
-            final String[] remoteHosts = {"remote1", "remote2", "remote3"};
-            final String[] requestingHosts = {"local3", "local2", "local1", 
"other"};
-
-            final int NUM_THREE_LOCAL_SPLITS = 10;
-            final int NUM_TWO_LOCAL_SPLITS = 10;
-            final int NUM_ONE_LOCAL_SPLITS = 10;
-            final int NUM_LOCAL_SPLITS = 30;
-            final int NUM_REMOTE_SPLITS = 10;
-            final int NUM_SPLITS = 40;
-
-            String[] threeLocalHosts = localHosts;
-            String[] twoLocalHosts = {localHosts[0], localHosts[1], 
remoteHosts[0]};
-            String[] oneLocalHost = {localHosts[0], remoteHosts[0], 
remoteHosts[1]};
-            String[] noLocalHost = remoteHosts;
-
-            int splitCnt = 0;
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            // add splits with three local hosts
-            for (int i = 0; i < NUM_THREE_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, 
threeLocalHosts));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_TWO_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_ONE_LOCAL_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost));
-            }
-            // add splits with two local hosts
-            for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(splitCnt++, noLocalHost));
-            }
+    void testSerialSplitAssignmentMultiLocalHost() {
+
+        final String[] localHosts = {"local1", "local2", "local3"};
+        final String[] remoteHosts = {"remote1", "remote2", "remote3"};
+        final String[] requestingHosts = {"local3", "local2", "local1", 
"other"};
+
+        final int NUM_THREE_LOCAL_SPLITS = 10;
+        final int NUM_TWO_LOCAL_SPLITS = 10;
+        final int NUM_ONE_LOCAL_SPLITS = 10;
+        final int NUM_LOCAL_SPLITS = 30;
+        final int NUM_REMOTE_SPLITS = 10;
+        final int NUM_SPLITS = 40;
+
+        String[] threeLocalHosts = localHosts;
+        String[] twoLocalHosts = {localHosts[0], localHosts[1], 
remoteHosts[0]};
+        String[] oneLocalHost = {localHosts[0], remoteHosts[0], 
remoteHosts[1]};
+        String[] noLocalHost = remoteHosts;
+
+        int splitCnt = 0;
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        // add splits with three local hosts
+        for (int i = 0; i < NUM_THREE_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, threeLocalHosts));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_TWO_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, twoLocalHosts));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_ONE_LOCAL_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, oneLocalHost));
+        }
+        // add splits with two local hosts
+        for (int i = 0; i < NUM_REMOTE_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(splitCnt++, noLocalHost));
+        }
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            LocatableInputSplit is = null;
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                String host = requestingHosts[i % requestingHosts.length];
-                is = ia.getNextInputSplit(host, 0);
-                // check valid split
-                assertTrue(is != null);
-                // check unassigned split
-                assertTrue(splits.remove(is));
-                // check priority of split
-                if (host.equals(localHosts[0])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), oneLocalHost));
-                } else if (host.equals(localHosts[1])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), 
twoLocalHosts));
-                } else if (host.equals(localHosts[2])) {
-                    assertTrue(Arrays.equals(is.getHostnames(), 
threeLocalHosts));
-                } else {
-                    assertTrue(Arrays.equals(is.getHostnames(), noLocalHost));
-                }
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        LocatableInputSplit is = null;
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            String host = requestingHosts[i % requestingHosts.length];
+            is = ia.getNextInputSplit(host, 0);
+            // check valid split
+            assertThat(is).isNotNull();
+            // check unassigned split
+            assertThat(splits.remove(is)).isTrue();
+            // check priority of split
+            if (host.equals(localHosts[0])) {
+                assertThat(is.getHostnames()).isEqualTo(oneLocalHost);
+            } else if (host.equals(localHosts[1])) {
+                assertThat(is.getHostnames()).isEqualTo(twoLocalHosts);
+            } else if (host.equals(localHosts[2])) {
+                assertThat(is.getHostnames()).isEqualTo(threeLocalHosts);
+            } else {
+                assertThat(is.getHostnames()).isEqualTo(noLocalHost);
             }
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
-
-            assertEquals(NUM_REMOTE_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_LOCAL_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
         }
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
+
+        
assertThat(ia.getNumberOfRemoteAssignments()).isEqualTo(NUM_REMOTE_SPLITS);
+        
assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_LOCAL_SPLITS);
     }
 
     @Test
-    public void testSerialSplitAssignmentMixedLocalHost() {
-        try {
-            final String[] hosts = {"host1", "host1", "host1", "host2", 
"host2", "host3"};
-            final int NUM_SPLITS = 10 * hosts.length;
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 
hosts.length]));
-            }
+    void testSerialSplitAssignmentMixedLocalHost() {
 
-            // get all available splits
-            LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-            InputSplit is = null;
-            int i = 0;
-            while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) 
!= null) {
-                assertTrue(splits.remove(is));
-            }
+        final String[] hosts = {"host1", "host1", "host1", "host2", "host2", 
"host3"};
+        final int NUM_SPLITS = 10 * hosts.length;
 
-            // check we had all
-            assertTrue(splits.isEmpty());
-            assertNull(ia.getNextInputSplit("anotherHost", 0));
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % hosts.length]));
+        }
 
-            assertEquals(0, ia.getNumberOfRemoteAssignments());
-            assertEquals(NUM_SPLITS, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // get all available splits
+        LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+        InputSplit is = null;
+        int i = 0;
+        while ((is = ia.getNextInputSplit(hosts[i++ % hosts.length], 0)) != 
null) {
+            assertThat(splits.remove(is)).isTrue();
         }
+
+        // check we had all
+        assertThat(splits).isEmpty();
+        assertThat(ia.getNextInputSplit("anotherHost", 0)).isNull();
+
+        assertThat(ia.getNumberOfRemoteAssignments()).isZero();
+        assertThat(ia.getNumberOfLocalAssignments()).isEqualTo(NUM_SPLITS);
     }
 
     @Test
-    public void testConcurrentSplitAssignmentNullHost() {
-        try {
-            final int NUM_THREADS = 10;
-            final int NUM_SPLITS = 500;
-            final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
-
-            final String[][] hosts =
-                    new String[][] {new String[] {"localhost"}, new String[0], 
null};
-
-            // load some splits
-            Set<LocatableInputSplit> splits = new 
HashSet<LocatableInputSplit>();
-            for (int i = 0; i < NUM_SPLITS; i++) {
-                splits.add(new LocatableInputSplit(i, hosts[i % 3]));
-            }
+    void testConcurrentSplitAssignmentNullHost() throws InterruptedException {
 
-            final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
-
-            final AtomicInteger splitsRetrieved = new AtomicInteger(0);
-            final AtomicInteger sumOfIds = new AtomicInteger(0);
-
-            Runnable retriever =
-                    new Runnable() {
-
-                        @Override
-                        public void run() {
-                            LocatableInputSplit split;
-                            while ((split = ia.getNextInputSplit(null, 0)) != 
null) {
-                                splitsRetrieved.incrementAndGet();
-                                sumOfIds.addAndGet(split.getSplitNumber());
-                            }
-                        }
-                    };
-
-            // create the threads
-            Thread[] threads = new Thread[NUM_THREADS];
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i] = new Thread(retriever);
-                threads[i].setDaemon(true);
-            }
+        final int NUM_THREADS = 10;
+        final int NUM_SPLITS = 500;
+        final int SUM_OF_IDS = (NUM_SPLITS - 1) * (NUM_SPLITS) / 2;
 
-            // launch concurrently
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].start();
-            }
+        final String[][] hosts = new String[][] {new String[] {"localhost"}, 
new String[0], null};
 
-            // sync
-            for (int i = 0; i < NUM_THREADS; i++) {
-                threads[i].join(5000);
-            }
+        // load some splits
+        Set<LocatableInputSplit> splits = new HashSet<>();
+        for (int i = 0; i < NUM_SPLITS; i++) {
+            splits.add(new LocatableInputSplit(i, hosts[i % 3]));
+        }
 
-            // verify
-            for (int i = 0; i < NUM_THREADS; i++) {
-                if (threads[i].isAlive()) {
-                    fail(
-                            "The concurrency test case is erroneous, the 
thread did not respond in time.");
-                }
-            }
+        final LocatableInputSplitAssigner ia = new 
LocatableInputSplitAssigner(splits);
+
+        final AtomicInteger splitsRetrieved = new AtomicInteger(0);
+        final AtomicInteger sumOfIds = new AtomicInteger(0);
+
+        Runnable retriever =
+                () -> {
+                    LocatableInputSplit split;
+                    while ((split = ia.getNextInputSplit(null, 0)) != null) {
+                        splitsRetrieved.incrementAndGet();
+                        sumOfIds.addAndGet(split.getSplitNumber());
+                    }
+                };
+
+        // create the threads
+        Thread[] threads = new Thread[NUM_THREADS];
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i] = new Thread(retriever);
+            threads[i].setDaemon(true);
+        }
 
-            assertEquals(NUM_SPLITS, splitsRetrieved.get());
-            assertEquals(SUM_OF_IDS, sumOfIds.get());
+        // launch concurrently
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].start();
+        }
 
-            // nothing left
-            assertNull(ia.getNextInputSplit("", 0));
+        // sync
+        for (int i = 0; i < NUM_THREADS; i++) {
+            threads[i].join(5000);
+        }
 
-            assertEquals(NUM_SPLITS, ia.getNumberOfRemoteAssignments());
-            assertEquals(0, ia.getNumberOfLocalAssignments());
-        } catch (Exception e) {
-            e.printStackTrace();
-            fail(e.getMessage());
+        // verify
+        for (int i = 0; i < NUM_THREADS; i++) {
+            assertThat(threads[i].isAlive()).isFalse();
         }
+
+        assertThat(splitsRetrieved.get()).isEqualTo(NUM_SPLITS);
+        assertThat(sumOfIds.get()).isEqualTo(SUM_OF_IDS);

Review Comment:
   ```suggestion
           assertThat(splitsRetrieved).hasValue(NUM_SPLITS);
           assertThat(sumOfIds).hasValue(SUM_OF_IDS);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to