This is an automated email from the ASF dual-hosted git repository.

guoyangze pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c418b57870aaf5b4f4f9d0f69a70366b583ae166
Author: Weihua Hu <huweihua....@gmail.com>
AuthorDate: Thu Jun 8 17:30:18 2023 +0800

    [hotfix] migrate some taskmanager rest handler tests to junit5 and remove 
Mockito usage.
---
 .../AbstractTaskManagerFileHandlerTest.java        | 395 ++-------------------
 .../taskmanager/TaskManagerLogListHandlerTest.java |  36 +-
 .../taskmanager/TestingChannelHandlerContext.java  | 277 +++++++++++++++
 .../taskmanager/TestingChannelPipeline.java        | 380 ++++++++++++++++++++
 .../taskmanager/TestingTaskManagerFileHandler.java |  90 +++++
 tools/maven/suppressions-runtime.xml               |   2 +-
 6 files changed, 801 insertions(+), 379 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java
index 1ec46093a91..db08a298346 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java
@@ -19,11 +19,9 @@
 package org.apache.flink.runtime.rest.handler.taskmanager;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobServer;
 import org.apache.flink.runtime.blob.TransientBlobKey;
-import org.apache.flink.runtime.blob.TransientBlobService;
 import org.apache.flink.runtime.blob.VoidBlobStore;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
@@ -37,64 +35,35 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerFileMessage
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
 import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion;
-import org.apache.flink.runtime.webmonitor.RestfulGateway;
-import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import org.apache.flink.testutils.TestingUtils;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.FileUtils;
-import org.apache.flink.util.FlinkException;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
-import org.apache.flink.util.concurrent.FutureUtils;
-
-import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
-import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelProgressivePromise;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
-import org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise;
-import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod;
 import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion;
-import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
-import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
-import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
-import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.ImmediateEventExecutor;
-
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
 
-import javax.annotation.Nonnull;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.net.SocketAddress;
 import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.Map;
 import java.util.Queue;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.greaterThan;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for the {@link AbstractTaskManagerFileHandler}. */
-public class AbstractTaskManagerFileHandlerTest extends TestLogger {
+class AbstractTaskManagerFileHandlerTest {
 
     private static final ResourceID EXPECTED_TASK_MANAGER_ID = 
ResourceID.generate();
 
@@ -102,7 +71,7 @@ public class AbstractTaskManagerFileHandlerTest extends 
TestLogger {
             new DefaultFullHttpRequest(
                     HttpVersion.HTTP_1_1, HttpMethod.GET, 
TestUntypedMessageHeaders.URL);
 
-    @ClassRule public static TemporaryFolder temporaryFolder = new 
TemporaryFolder();
+    @TempDir private static File temporaryFolder;
 
     private static BlobServer blobServer;
 
@@ -116,12 +85,11 @@ public class AbstractTaskManagerFileHandlerTest extends 
TestLogger {
 
     private TransientBlobKey transientBlobKey2;
 
-    @BeforeClass
-    public static void setup() throws IOException, HandlerRequestException {
+    @BeforeAll
+    static void setup() throws IOException, HandlerRequestException {
         final Configuration configuration = new Configuration();
 
-        blobServer =
-                new BlobServer(configuration, temporaryFolder.newFolder(), new 
VoidBlobStore());
+        blobServer = new BlobServer(configuration, temporaryFolder, new 
VoidBlobStore());
 
         handlerRequest =
                 HandlerRequest.resolveParametersAndCreate(
@@ -134,8 +102,8 @@ public class AbstractTaskManagerFileHandlerTest extends 
TestLogger {
                         Collections.emptyList());
     }
 
-    @Before
-    public void setupTest() throws IOException {
+    @BeforeEach
+    void setupTest() throws IOException {
         fileContent1 = UUID.randomUUID().toString();
         final File file1 = createFileWithContent(fileContent1);
         transientBlobKey1 = storeFileInBlobServer(file1);
@@ -145,8 +113,8 @@ public class AbstractTaskManagerFileHandlerTest extends 
TestLogger {
         transientBlobKey2 = storeFileInBlobServer(file2);
     }
 
-    @AfterClass
-    public static void teardown() throws IOException {
+    @AfterAll
+    static void teardown() throws IOException {
         if (blobServer != null) {
             blobServer.close();
             blobServer = null;
@@ -155,46 +123,46 @@ public class AbstractTaskManagerFileHandlerTest extends 
TestLogger {
 
     /** Tests that the {@link AbstractTaskManagerFileHandler} serves the 
requested file. */
     @Test
-    public void testFileServing() throws Exception {
+    void testFileServing() throws Exception {
         final Time cacheEntryDuration = Time.milliseconds(1000L);
 
         final Queue<CompletableFuture<TransientBlobKey>> requestFileUploads = 
new ArrayDeque<>(1);
 
         
requestFileUploads.add(CompletableFuture.completedFuture(transientBlobKey1));
 
-        final TestTaskManagerFileHandler testTaskManagerFileHandler =
+        final TestingTaskManagerFileHandler testingTaskManagerFileHandler =
                 createTestTaskManagerFileHandler(
                         cacheEntryDuration, requestFileUploads, 
EXPECTED_TASK_MANAGER_ID);
 
-        final File outputFile = temporaryFolder.newFile();
+        final File outputFile = TempDirUtils.newFile(temporaryFolder.toPath());
         final TestingChannelHandlerContext testingContext =
                 new TestingChannelHandlerContext(outputFile);
 
-        testTaskManagerFileHandler.respondToRequest(
+        testingTaskManagerFileHandler.respondToRequest(
                 testingContext, HTTP_REQUEST, handlerRequest, null);
 
-        assertThat(outputFile.length(), is(greaterThan(0L)));
-        assertThat(FileUtils.readFileUtf8(outputFile), 
is(equalTo(fileContent1)));
+        assertThat(outputFile).isNotEmpty();
+        assertThat(FileUtils.readFileUtf8(outputFile)).isEqualTo(fileContent1);
     }
 
     /** Tests that files are cached. */
     @Test
-    public void testFileCaching() throws Exception {
+    void testFileCaching() throws Exception {
         final File outputFile = runFileCachingTest(Time.milliseconds(5000L), 
Time.milliseconds(0L));
 
-        assertThat(outputFile.length(), is(greaterThan(0L)));
-        assertThat(FileUtils.readFileUtf8(outputFile), 
is(equalTo(fileContent1)));
+        assertThat(outputFile).isNotEmpty();
+        assertThat(FileUtils.readFileUtf8(outputFile)).isEqualTo(fileContent1);
     }
 
     /** Tests that file cache entries expire. */
     @Test
-    public void testFileCacheExpiration() throws Exception {
+    void testFileCacheExpiration() throws Exception {
         final Time cacheEntryDuration = Time.milliseconds(5L);
 
         final File outputFile = runFileCachingTest(cacheEntryDuration, 
cacheEntryDuration);
 
-        assertThat(outputFile.length(), is(greaterThan(0L)));
-        assertThat(FileUtils.readFileUtf8(outputFile), 
is(equalTo(fileContent2)));
+        assertThat(outputFile).isNotEmpty();
+        assertThat(FileUtils.readFileUtf8(outputFile)).isEqualTo(fileContent2);
     }
 
     private File runFileCachingTest(Time cacheEntryDuration, Time 
delayBetweenRequests)
@@ -203,33 +171,32 @@ public class AbstractTaskManagerFileHandlerTest extends 
TestLogger {
         
requestFileUploads.add(CompletableFuture.completedFuture(transientBlobKey1));
         
requestFileUploads.add(CompletableFuture.completedFuture(transientBlobKey2));
 
-        final TestTaskManagerFileHandler testTaskManagerFileHandler =
+        final TestingTaskManagerFileHandler testingTaskManagerFileHandler =
                 createTestTaskManagerFileHandler(
                         cacheEntryDuration, requestFileUploads, 
EXPECTED_TASK_MANAGER_ID);
 
-        final File outputFile = temporaryFolder.newFile();
+        final File outputFile = TempDirUtils.newFile(temporaryFolder.toPath());
         final TestingChannelHandlerContext testingContext =
                 new TestingChannelHandlerContext(outputFile);
 
-        testTaskManagerFileHandler.respondToRequest(
+        testingTaskManagerFileHandler.respondToRequest(
                 testingContext, HTTP_REQUEST, handlerRequest, null);
 
         Thread.sleep(delayBetweenRequests.toMilliseconds());
 
         // the handler should not trigger the file upload again because it is 
still cached
-        testTaskManagerFileHandler.respondToRequest(
+        testingTaskManagerFileHandler.respondToRequest(
                 testingContext, HTTP_REQUEST, handlerRequest, null);
         return outputFile;
     }
 
-    private AbstractTaskManagerFileHandlerTest.TestTaskManagerFileHandler
-            createTestTaskManagerFileHandler(
-                    Time cacheEntryDuration,
-                    Queue<CompletableFuture<TransientBlobKey>> 
requestFileUploads,
-                    ResourceID expectedTaskManagerId) {
+    private TestingTaskManagerFileHandler createTestTaskManagerFileHandler(
+            Time cacheEntryDuration,
+            Queue<CompletableFuture<TransientBlobKey>> requestFileUploads,
+            ResourceID expectedTaskManagerId) {
         final ResourceManagerGateway resourceManagerGateway = new 
TestingResourceManagerGateway();
 
-        return new TestTaskManagerFileHandler(
+        return new TestingTaskManagerFileHandler(
                 () -> CompletableFuture.completedFuture(null),
                 TestingUtils.infiniteTime(),
                 Collections.emptyMap(),
@@ -242,7 +209,7 @@ public class AbstractTaskManagerFileHandlerTest extends 
TestLogger {
     }
 
     private static File createFileWithContent(String fileContent) throws 
IOException {
-        final File file = temporaryFolder.newFile();
+        final File file = TempDirUtils.newFile(temporaryFolder.toPath());
 
         // write random content into the file
         try (FileOutputStream fileOutputStream = new FileOutputStream(file)) {
@@ -259,290 +226,6 @@ public class AbstractTaskManagerFileHandlerTest extends 
TestLogger {
         }
     }
 
-    /** Class under test. */
-    private static final class TestTaskManagerFileHandler
-            extends 
AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {
-
-        private final Queue<CompletableFuture<TransientBlobKey>> 
requestFileUploads;
-
-        private final ResourceID expectedTaskManagerId;
-
-        protected TestTaskManagerFileHandler(
-                @Nonnull GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
-                @Nonnull Time timeout,
-                @Nonnull Map<String, String> responseHeaders,
-                @Nonnull
-                        UntypedResponseMessageHeaders<
-                                        EmptyRequestBody, 
TaskManagerMessageParameters>
-                                untypedResponseMessageHeaders,
-                @Nonnull GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
-                @Nonnull TransientBlobService transientBlobService,
-                @Nonnull Time cacheEntryDuration,
-                Queue<CompletableFuture<TransientBlobKey>> requestFileUploads,
-                ResourceID expectedTaskManagerId) {
-            super(
-                    leaderRetriever,
-                    timeout,
-                    responseHeaders,
-                    untypedResponseMessageHeaders,
-                    resourceManagerGatewayRetriever,
-                    transientBlobService,
-                    cacheEntryDuration);
-            this.requestFileUploads = 
Preconditions.checkNotNull(requestFileUploads);
-            this.expectedTaskManagerId = 
Preconditions.checkNotNull(expectedTaskManagerId);
-        }
-
-        @Override
-        protected CompletableFuture<TransientBlobKey> requestFileUpload(
-                ResourceManagerGateway resourceManagerGateway,
-                Tuple2<ResourceID, String> taskManagerIdAndFileName) {
-            assertThat(taskManagerIdAndFileName.f0, 
is(equalTo(expectedTaskManagerId)));
-            final CompletableFuture<TransientBlobKey> transientBlobKeyFuture =
-                    requestFileUploads.poll();
-
-            if (transientBlobKeyFuture != null) {
-                return transientBlobKeyFuture;
-            } else {
-                return FutureUtils.completedExceptionally(
-                        new FlinkException("Could not upload file."));
-            }
-        }
-    }
-
-    /** Testing implementation of {@link ChannelHandlerContext}. */
-    private static final class TestingChannelHandlerContext implements 
ChannelHandlerContext {
-
-        final File outputFile;
-
-        private TestingChannelHandlerContext(File outputFile) {
-            this.outputFile = Preconditions.checkNotNull(outputFile);
-        }
-
-        @Override
-        public ChannelFuture write(Object msg, ChannelPromise promise) {
-            if (msg instanceof DefaultFileRegion) {
-                final DefaultFileRegion defaultFileRegion = 
(DefaultFileRegion) msg;
-
-                try (final FileOutputStream fileOutputStream = new 
FileOutputStream(outputFile)) {
-                    fileOutputStream.getChannel();
-
-                    
defaultFileRegion.transferTo(fileOutputStream.getChannel(), 0L);
-                } catch (IOException ioe) {
-                    throw new RuntimeException(ioe);
-                }
-            }
-
-            return new DefaultChannelPromise(new EmbeddedChannel());
-        }
-
-        @Override
-        public EventExecutor executor() {
-            return ImmediateEventExecutor.INSTANCE;
-        }
-
-        @Override
-        public ChannelFuture write(Object msg) {
-            return write(msg, null);
-        }
-
-        @Override
-        public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) 
{
-            final ChannelFuture channelFuture = write(msg, promise);
-            flush();
-            return channelFuture;
-        }
-
-        @Override
-        public ChannelFuture writeAndFlush(Object msg) {
-            return writeAndFlush(msg, null);
-        }
-
-        @Override
-        public ChannelPipeline pipeline() {
-            return mock(ChannelPipeline.class);
-        }
-
-        // -----------------------------------------------------
-        // Automatically generated implementation
-        // -----------------------------------------------------
-
-        @Override
-        public Channel channel() {
-            return null;
-        }
-
-        @Override
-        public String name() {
-            return null;
-        }
-
-        @Override
-        public ChannelHandler handler() {
-            return null;
-        }
-
-        @Override
-        public boolean isRemoved() {
-            return false;
-        }
-
-        @Override
-        public ChannelHandlerContext fireChannelRegistered() {
-            return null;
-        }
-
-        @Override
-        public ChannelHandlerContext fireChannelUnregistered() {
-            return null;
-        }
-
-        @Override
-        public ChannelHandlerContext fireChannelActive() {
-            return null;
-        }
-
-        @Override
-        public ChannelHandlerContext fireChannelInactive() {
-            return null;
-        }
-
-        @Override
-        public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
-            return null;
-        }
-
-        @Override
-        public ChannelHandlerContext fireUserEventTriggered(Object event) {
-            return null;
-        }
-
-        @Override
-        public ChannelHandlerContext fireChannelRead(Object msg) {
-            return null;
-        }
-
-        @Override
-        public ChannelHandlerContext fireChannelReadComplete() {
-            return null;
-        }
-
-        @Override
-        public ChannelHandlerContext fireChannelWritabilityChanged() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture bind(SocketAddress localAddress) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture connect(SocketAddress remoteAddress) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture connect(SocketAddress remoteAddress, 
SocketAddress localAddress) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture disconnect() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture close() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture deregister() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture bind(SocketAddress localAddress, ChannelPromise 
promise) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture connect(SocketAddress remoteAddress, 
ChannelPromise promise) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture connect(
-                SocketAddress remoteAddress, SocketAddress localAddress, 
ChannelPromise promise) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture disconnect(ChannelPromise promise) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture close(ChannelPromise promise) {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture deregister(ChannelPromise promise) {
-            return null;
-        }
-
-        @Override
-        public ChannelHandlerContext read() {
-            return null;
-        }
-
-        @Override
-        public ChannelHandlerContext flush() {
-            return null;
-        }
-
-        @Override
-        public ByteBufAllocator alloc() {
-            return null;
-        }
-
-        @Override
-        public ChannelPromise newPromise() {
-            return null;
-        }
-
-        @Override
-        public ChannelProgressivePromise newProgressivePromise() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture newSucceededFuture() {
-            return null;
-        }
-
-        @Override
-        public ChannelFuture newFailedFuture(Throwable cause) {
-            return null;
-        }
-
-        @Override
-        public ChannelPromise voidPromise() {
-            return null;
-        }
-
-        @Override
-        public <T> Attribute<T> attr(AttributeKey<T> key) {
-            return null;
-        }
-
-        @Override
-        public <T> boolean hasAttr(AttributeKey<T> attributeKey) {
-            return false;
-        }
-    }
-
     /** Testing {@link UntypedResponseMessageHeaders}. */
     private static final class TestUntypedMessageHeaders
             implements UntypedResponseMessageHeaders<
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java
index 8fdc9c840f7..35831a70b82 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java
@@ -31,13 +31,12 @@ import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParam
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogsHeaders;
 import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
 import org.apache.flink.testutils.TestingUtils;
-import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.concurrent.FutureUtils;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
-import org.junit.Before;
-import org.junit.Test;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -47,23 +46,18 @@ import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-import static org.hamcrest.Matchers.containsString;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.hasSize;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.is;
-import static org.junit.Assert.assertThat;
+import static org.assertj.core.api.Assertions.assertThat;
 
 /** Test for the {@link TaskManagerLogListHandler}. */
-public class TaskManagerLogListHandlerTest extends TestLogger {
+class TaskManagerLogListHandlerTest {
 
     private static final ResourceID EXPECTED_TASK_MANAGER_ID = 
ResourceID.generate();
     private TestingResourceManagerGateway resourceManagerGateway;
     private TaskManagerLogListHandler taskManagerLogListHandler;
     private HandlerRequest<EmptyRequestBody> handlerRequest;
 
-    @Before
-    public void setUp() throws HandlerRequestException {
+    @BeforeEach
+    void setUp() throws HandlerRequestException {
         resourceManagerGateway = new TestingResourceManagerGateway();
         taskManagerLogListHandler =
                 new TaskManagerLogListHandler(
@@ -76,7 +70,7 @@ public class TaskManagerLogListHandlerTest extends TestLogger 
{
     }
 
     @Test
-    public void testGetTaskManagerLogsList() throws Exception {
+    void testGetTaskManagerLogsList() throws Exception {
         List<LogInfo> logsList =
                 Arrays.asList(
                         new LogInfo("taskmanager.log", 1024L, 1632844800000L),
@@ -88,11 +82,11 @@ public class TaskManagerLogListHandlerTest extends 
TestLogger {
                 taskManagerLogListHandler
                         .handleRequest(handlerRequest, resourceManagerGateway)
                         .get();
-        assertThat(logListInfo.getLogInfos(), hasSize(logsList.size()));
+        
assertThat(logListInfo.getLogInfos()).containsExactlyInAnyOrderElementsOf(logsList);
     }
 
     @Test
-    public void testGetTaskManagerLogsListForUnknownTaskExecutorException() 
throws Exception {
+    void testGetTaskManagerLogsListForUnknownTaskExecutorException() throws 
Exception {
         resourceManagerGateway.setRequestTaskManagerLogListFunction(
                 EXPECTED_TASK_MANAGER_ID ->
                         FutureUtils.completedExceptionally(
@@ -101,15 +95,13 @@ public class TaskManagerLogListHandlerTest extends 
TestLogger {
             taskManagerLogListHandler.handleRequest(handlerRequest, 
resourceManagerGateway).get();
         } catch (ExecutionException e) {
             final Throwable cause = e.getCause();
-            assertThat(cause, is(instanceOf(RestHandlerException.class)));
+            assertThat(cause).isInstanceOf(RestHandlerException.class);
 
             final RestHandlerException restHandlerException = 
(RestHandlerException) cause;
-            assertThat(
-                    restHandlerException.getHttpResponseStatus(),
-                    is(equalTo(HttpResponseStatus.NOT_FOUND)));
-            assertThat(
-                    restHandlerException.getMessage(),
-                    containsString("Could not find TaskExecutor " + 
EXPECTED_TASK_MANAGER_ID));
+            assertThat(restHandlerException.getHttpResponseStatus())
+                    .isEqualTo(HttpResponseStatus.NOT_FOUND);
+            assertThat(restHandlerException.getMessage())
+                    .contains("Could not find TaskExecutor " + 
EXPECTED_TASK_MANAGER_ID);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelHandlerContext.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelHandlerContext.java
new file mode 100644
index 00000000000..1c9a69a6723
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelHandlerContext.java
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator;
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelProgressivePromise;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise;
+import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.flink.shaded.netty4.io.netty.util.Attribute;
+import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey;
+import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor;
+import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.ImmediateEventExecutor;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.net.SocketAddress;
+
+/** Testing implementation of {@link ChannelHandlerContext}. */
+class TestingChannelHandlerContext implements ChannelHandlerContext {
+
+    private final File outputFile;
+    private final ChannelPipeline pipeline = new TestingChannelPipeline();
+
+    TestingChannelHandlerContext(File outputFile) {
+        this.outputFile = Preconditions.checkNotNull(outputFile);
+    }
+
+    @Override
+    public ChannelFuture write(Object msg, ChannelPromise promise) {
+        if (msg instanceof DefaultFileRegion) {
+            final DefaultFileRegion defaultFileRegion = (DefaultFileRegion) 
msg;
+
+            try (final FileOutputStream fileOutputStream = new 
FileOutputStream(outputFile)) {
+                fileOutputStream.getChannel();
+
+                defaultFileRegion.transferTo(fileOutputStream.getChannel(), 
0L);
+            } catch (IOException ioe) {
+                throw new RuntimeException(ioe);
+            }
+        }
+
+        return new DefaultChannelPromise(new EmbeddedChannel());
+    }
+
+    @Override
+    public EventExecutor executor() {
+        return ImmediateEventExecutor.INSTANCE;
+    }
+
+    @Override
+    public ChannelFuture write(Object msg) {
+        return write(msg, null);
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
+        final ChannelFuture channelFuture = write(msg, promise);
+        flush();
+        return channelFuture;
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object msg) {
+        return writeAndFlush(msg, null);
+    }
+
+    @Override
+    public ChannelPipeline pipeline() {
+        return pipeline;
+    }
+
+    // -----------------------------------------------------
+    // Automatically generated implementation
+    // -----------------------------------------------------
+
+    @Override
+    public Channel channel() {
+        return null;
+    }
+
+    @Override
+    public String name() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandler handler() {
+        return null;
+    }
+
+    @Override
+    public boolean isRemoved() {
+        return false;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelRegistered() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelUnregistered() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelActive() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelInactive() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireExceptionCaught(Throwable cause) {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireUserEventTriggered(Object event) {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelRead(Object msg) {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelReadComplete() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext fireChannelWritabilityChanged() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress localAddress) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress 
localAddress) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise 
promise) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise 
promise) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(
+            SocketAddress remoteAddress, SocketAddress localAddress, 
ChannelPromise promise) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect(ChannelPromise promise) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close(ChannelPromise promise) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister(ChannelPromise promise) {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext read() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext flush() {
+        return null;
+    }
+
+    @Override
+    public ByteBufAllocator alloc() {
+        return null;
+    }
+
+    @Override
+    public ChannelPromise newPromise() {
+        return null;
+    }
+
+    @Override
+    public ChannelProgressivePromise newProgressivePromise() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newSucceededFuture() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newFailedFuture(Throwable cause) {
+        return null;
+    }
+
+    @Override
+    public ChannelPromise voidPromise() {
+        return null;
+    }
+
+    @Override
+    public <T> Attribute<T> attr(AttributeKey<T> key) {
+        return null;
+    }
+
+    @Override
+    public <T> boolean hasAttr(AttributeKey<T> attributeKey) {
+        return false;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelPipeline.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelPipeline.java
new file mode 100644
index 00000000000..437c835db35
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelPipeline.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundInvoker;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelProgressivePromise;
+import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
+import 
org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup;
+
+import java.net.SocketAddress;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/** Testing implementation of {@link ChannelPipeline}. */
+class TestingChannelPipeline implements ChannelPipeline {
+
+    @Override
+    public ChannelPipeline addFirst(String s, ChannelHandler channelHandler) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline addFirst(
+            EventExecutorGroup eventExecutorGroup, String s, ChannelHandler 
channelHandler) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline addLast(String s, ChannelHandler channelHandler) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline addLast(
+            EventExecutorGroup eventExecutorGroup, String s, ChannelHandler 
channelHandler) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline addBefore(String s, String s1, ChannelHandler 
channelHandler) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline addBefore(
+            EventExecutorGroup eventExecutorGroup,
+            String s,
+            String s1,
+            ChannelHandler channelHandler) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline addAfter(String s, String s1, ChannelHandler 
channelHandler) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline addAfter(
+            EventExecutorGroup eventExecutorGroup,
+            String s,
+            String s1,
+            ChannelHandler channelHandler) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline addFirst(ChannelHandler... channelHandlers) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline addFirst(
+            EventExecutorGroup eventExecutorGroup, ChannelHandler... 
channelHandlers) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline addLast(ChannelHandler... channelHandlers) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline addLast(
+            EventExecutorGroup eventExecutorGroup, ChannelHandler... 
channelHandlers) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline remove(ChannelHandler channelHandler) {
+        return null;
+    }
+
+    @Override
+    public ChannelHandler remove(String s) {
+        return null;
+    }
+
+    @Override
+    public <T extends ChannelHandler> T remove(Class<T> aClass) {
+        return null;
+    }
+
+    @Override
+    public ChannelHandler removeFirst() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandler removeLast() {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline replace(
+            ChannelHandler channelHandler, String s, ChannelHandler 
channelHandler1) {
+        return null;
+    }
+
+    @Override
+    public ChannelHandler replace(String s, String s1, ChannelHandler 
channelHandler) {
+        return null;
+    }
+
+    @Override
+    public <T extends ChannelHandler> T replace(
+            Class<T> aClass, String s, ChannelHandler channelHandler) {
+        return null;
+    }
+
+    @Override
+    public ChannelHandler first() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext firstContext() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandler last() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext lastContext() {
+        return null;
+    }
+
+    @Override
+    public ChannelHandler get(String s) {
+        return null;
+    }
+
+    @Override
+    public <T extends ChannelHandler> T get(Class<T> aClass) {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext context(ChannelHandler channelHandler) {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext context(String s) {
+        return null;
+    }
+
+    @Override
+    public ChannelHandlerContext context(Class<? extends ChannelHandler> 
aClass) {
+        return null;
+    }
+
+    @Override
+    public Channel channel() {
+        return null;
+    }
+
+    @Override
+    public List<String> names() {
+        return null;
+    }
+
+    @Override
+    public Map<String, ChannelHandler> toMap() {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline fireChannelRegistered() {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline fireChannelUnregistered() {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline fireChannelActive() {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline fireChannelInactive() {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline fireExceptionCaught(Throwable throwable) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline fireUserEventTriggered(Object o) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline fireChannelRead(Object o) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline fireChannelReadComplete() {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline fireChannelWritabilityChanged() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress socketAddress) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress socketAddress) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress socketAddress, SocketAddress 
socketAddress1) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture bind(SocketAddress socketAddress, ChannelPromise 
channelPromise) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(SocketAddress socketAddress, ChannelPromise 
channelPromise) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture connect(
+            SocketAddress socketAddress,
+            SocketAddress socketAddress1,
+            ChannelPromise channelPromise) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture disconnect(ChannelPromise channelPromise) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture close(ChannelPromise channelPromise) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture deregister(ChannelPromise channelPromise) {
+        return null;
+    }
+
+    @Override
+    public ChannelOutboundInvoker read() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture write(Object o) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture write(Object o, ChannelPromise channelPromise) {
+        return null;
+    }
+
+    @Override
+    public ChannelPipeline flush() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object o, ChannelPromise 
channelPromise) {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture writeAndFlush(Object o) {
+        return null;
+    }
+
+    @Override
+    public ChannelPromise newPromise() {
+        return null;
+    }
+
+    @Override
+    public ChannelProgressivePromise newProgressivePromise() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newSucceededFuture() {
+        return null;
+    }
+
+    @Override
+    public ChannelFuture newFailedFuture(Throwable throwable) {
+        return null;
+    }
+
+    @Override
+    public ChannelPromise voidPromise() {
+        return null;
+    }
+
+    @Override
+    public Iterator<Map.Entry<String, ChannelHandler>> iterator() {
+        return null;
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingTaskManagerFileHandler.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingTaskManagerFileHandler.java
new file mode 100644
index 00000000000..8590f944cfe
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingTaskManagerFileHandler.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.taskmanager;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.blob.TransientBlobKey;
+import org.apache.flink.runtime.blob.TransientBlobService;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders;
+import 
org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.concurrent.FutureUtils;
+
+import javax.annotation.Nonnull;
+
+import java.util.Map;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Testing implementation of {@link AbstractTaskManagerFileHandler}. */
+public class TestingTaskManagerFileHandler
+        extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> {
+
+    private final Queue<CompletableFuture<TransientBlobKey>> 
requestFileUploads;
+
+    private final ResourceID expectedTaskManagerId;
+
+    public TestingTaskManagerFileHandler(
+            @Nonnull GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
+            @Nonnull Time timeout,
+            @Nonnull Map<String, String> responseHeaders,
+            @Nonnull
+                    UntypedResponseMessageHeaders<EmptyRequestBody, 
TaskManagerMessageParameters>
+                            untypedResponseMessageHeaders,
+            @Nonnull GatewayRetriever<ResourceManagerGateway> 
resourceManagerGatewayRetriever,
+            @Nonnull TransientBlobService transientBlobService,
+            @Nonnull Time cacheEntryDuration,
+            Queue<CompletableFuture<TransientBlobKey>> requestFileUploads,
+            ResourceID expectedTaskManagerId) {
+        super(
+                leaderRetriever,
+                timeout,
+                responseHeaders,
+                untypedResponseMessageHeaders,
+                resourceManagerGatewayRetriever,
+                transientBlobService,
+                cacheEntryDuration);
+        this.requestFileUploads = 
Preconditions.checkNotNull(requestFileUploads);
+        this.expectedTaskManagerId = 
Preconditions.checkNotNull(expectedTaskManagerId);
+    }
+
+    @Override
+    protected CompletableFuture<TransientBlobKey> requestFileUpload(
+            ResourceManagerGateway resourceManagerGateway,
+            Tuple2<ResourceID, String> taskManagerIdAndFileName) {
+        
assertThat(taskManagerIdAndFileName.f0).isEqualTo(expectedTaskManagerId);
+        final CompletableFuture<TransientBlobKey> transientBlobKeyFuture =
+                requestFileUploads.poll();
+
+        if (transientBlobKeyFuture != null) {
+            return transientBlobKeyFuture;
+        } else {
+            return FutureUtils.completedExceptionally(new 
FlinkException("Could not upload file."));
+        }
+    }
+}
diff --git a/tools/maven/suppressions-runtime.xml 
b/tools/maven/suppressions-runtime.xml
index 19b43a423fd..90a2746f16f 100644
--- a/tools/maven/suppressions-runtime.xml
+++ b/tools/maven/suppressions-runtime.xml
@@ -33,7 +33,7 @@ under the License.
        <suppress files="CheckpointCoordinatorTest.java" checks="FileLength"/>
 
        <!-- Legacy mockito usages -->
-       <suppress 
files="BlobCachePutTest.java|CheckpointCoordinatorFailureTest.java|CheckpointCoordinatorMasterHooksTest.java|CheckpointCoordinatorRestoringTest.java|CheckpointCoordinatorTestingUtils.java|CheckpointCoordinatorTest.java|CheckpointMetadataLoadingTest.java|CheckpointSettingsSerializableTest.java|CheckpointStateRestoreTest.java|CheckpointStatsHistoryTest.java|CheckpointStatsSnapshotTest.java|CompletedCheckpointStatsSummaryTest.java|CompletedCheckpointTest.java|FailoverStrategyChec
 [...]
+       <suppress 
files="BlobCachePutTest.java|CheckpointCoordinatorFailureTest.java|CheckpointCoordinatorMasterHooksTest.java|CheckpointCoordinatorRestoringTest.java|CheckpointCoordinatorTestingUtils.java|CheckpointCoordinatorTest.java|CheckpointMetadataLoadingTest.java|CheckpointSettingsSerializableTest.java|CheckpointStateRestoreTest.java|CheckpointStatsHistoryTest.java|CheckpointStatsSnapshotTest.java|CompletedCheckpointStatsSummaryTest.java|CompletedCheckpointTest.java|FailoverStrategyChec
 [...]
                          checks="IllegalImport"/>
 
        <!-- Legacy powermock usages -->

Reply via email to