This is an automated email from the ASF dual-hosted git repository. guoyangze pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c418b57870aaf5b4f4f9d0f69a70366b583ae166 Author: Weihua Hu <huweihua....@gmail.com> AuthorDate: Thu Jun 8 17:30:18 2023 +0800 [hotfix] migrate some taskmanager rest handler tests to junit5 and remove Mockito usage. --- .../AbstractTaskManagerFileHandlerTest.java | 395 ++------------------- .../taskmanager/TaskManagerLogListHandlerTest.java | 36 +- .../taskmanager/TestingChannelHandlerContext.java | 277 +++++++++++++++ .../taskmanager/TestingChannelPipeline.java | 380 ++++++++++++++++++++ .../taskmanager/TestingTaskManagerFileHandler.java | 90 +++++ tools/maven/suppressions-runtime.xml | 2 +- 6 files changed, 801 insertions(+), 379 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java index 1ec46093a91..db08a298346 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/AbstractTaskManagerFileHandlerTest.java @@ -19,11 +19,9 @@ package org.apache.flink.runtime.rest.handler.taskmanager; import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobServer; import org.apache.flink.runtime.blob.TransientBlobKey; -import org.apache.flink.runtime.blob.TransientBlobService; import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; @@ -37,64 +35,35 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerFileMessage import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParameter; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; import org.apache.flink.runtime.rest.versioning.RuntimeRestAPIVersion; -import org.apache.flink.runtime.webmonitor.RestfulGateway; -import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.testutils.TestingUtils; +import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.FileUtils; -import org.apache.flink.util.FlinkException; -import org.apache.flink.util.Preconditions; -import org.apache.flink.util.TestLogger; -import org.apache.flink.util.concurrent.FutureUtils; - -import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; -import org.apache.flink.shaded.netty4.io.netty.channel.Channel; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelProgressivePromise; -import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise; -import org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise; -import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion; -import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; + import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; -import org.apache.flink.shaded.netty4.io.netty.util.Attribute; -import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; -import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor; -import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ImmediateEventExecutor; - -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import javax.annotation.Nonnull; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; -import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.Collection; import java.util.Collections; -import java.util.Map; import java.util.Queue; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.greaterThan; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.mock; +import static org.assertj.core.api.Assertions.assertThat; /** Tests for the {@link AbstractTaskManagerFileHandler}. */ -public class AbstractTaskManagerFileHandlerTest extends TestLogger { +class AbstractTaskManagerFileHandlerTest { private static final ResourceID EXPECTED_TASK_MANAGER_ID = ResourceID.generate(); @@ -102,7 +71,7 @@ public class AbstractTaskManagerFileHandlerTest extends TestLogger { new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, HttpMethod.GET, TestUntypedMessageHeaders.URL); - @ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder(); + @TempDir private static File temporaryFolder; private static BlobServer blobServer; @@ -116,12 +85,11 @@ public class AbstractTaskManagerFileHandlerTest extends TestLogger { private TransientBlobKey transientBlobKey2; - @BeforeClass - public static void setup() throws IOException, HandlerRequestException { + @BeforeAll + static void setup() throws IOException, HandlerRequestException { final Configuration configuration = new Configuration(); - blobServer = - new BlobServer(configuration, temporaryFolder.newFolder(), new VoidBlobStore()); + blobServer = new BlobServer(configuration, temporaryFolder, new VoidBlobStore()); handlerRequest = HandlerRequest.resolveParametersAndCreate( @@ -134,8 +102,8 @@ public class AbstractTaskManagerFileHandlerTest extends TestLogger { Collections.emptyList()); } - @Before - public void setupTest() throws IOException { + @BeforeEach + void setupTest() throws IOException { fileContent1 = UUID.randomUUID().toString(); final File file1 = createFileWithContent(fileContent1); transientBlobKey1 = storeFileInBlobServer(file1); @@ -145,8 +113,8 @@ public class AbstractTaskManagerFileHandlerTest extends TestLogger { transientBlobKey2 = storeFileInBlobServer(file2); } - @AfterClass - public static void teardown() throws IOException { + @AfterAll + static void teardown() throws IOException { if (blobServer != null) { blobServer.close(); blobServer = null; @@ -155,46 +123,46 @@ public class AbstractTaskManagerFileHandlerTest extends TestLogger { /** Tests that the {@link AbstractTaskManagerFileHandler} serves the requested file. */ @Test - public void testFileServing() throws Exception { + void testFileServing() throws Exception { final Time cacheEntryDuration = Time.milliseconds(1000L); final Queue<CompletableFuture<TransientBlobKey>> requestFileUploads = new ArrayDeque<>(1); requestFileUploads.add(CompletableFuture.completedFuture(transientBlobKey1)); - final TestTaskManagerFileHandler testTaskManagerFileHandler = + final TestingTaskManagerFileHandler testingTaskManagerFileHandler = createTestTaskManagerFileHandler( cacheEntryDuration, requestFileUploads, EXPECTED_TASK_MANAGER_ID); - final File outputFile = temporaryFolder.newFile(); + final File outputFile = TempDirUtils.newFile(temporaryFolder.toPath()); final TestingChannelHandlerContext testingContext = new TestingChannelHandlerContext(outputFile); - testTaskManagerFileHandler.respondToRequest( + testingTaskManagerFileHandler.respondToRequest( testingContext, HTTP_REQUEST, handlerRequest, null); - assertThat(outputFile.length(), is(greaterThan(0L))); - assertThat(FileUtils.readFileUtf8(outputFile), is(equalTo(fileContent1))); + assertThat(outputFile).isNotEmpty(); + assertThat(FileUtils.readFileUtf8(outputFile)).isEqualTo(fileContent1); } /** Tests that files are cached. */ @Test - public void testFileCaching() throws Exception { + void testFileCaching() throws Exception { final File outputFile = runFileCachingTest(Time.milliseconds(5000L), Time.milliseconds(0L)); - assertThat(outputFile.length(), is(greaterThan(0L))); - assertThat(FileUtils.readFileUtf8(outputFile), is(equalTo(fileContent1))); + assertThat(outputFile).isNotEmpty(); + assertThat(FileUtils.readFileUtf8(outputFile)).isEqualTo(fileContent1); } /** Tests that file cache entries expire. */ @Test - public void testFileCacheExpiration() throws Exception { + void testFileCacheExpiration() throws Exception { final Time cacheEntryDuration = Time.milliseconds(5L); final File outputFile = runFileCachingTest(cacheEntryDuration, cacheEntryDuration); - assertThat(outputFile.length(), is(greaterThan(0L))); - assertThat(FileUtils.readFileUtf8(outputFile), is(equalTo(fileContent2))); + assertThat(outputFile).isNotEmpty(); + assertThat(FileUtils.readFileUtf8(outputFile)).isEqualTo(fileContent2); } private File runFileCachingTest(Time cacheEntryDuration, Time delayBetweenRequests) @@ -203,33 +171,32 @@ public class AbstractTaskManagerFileHandlerTest extends TestLogger { requestFileUploads.add(CompletableFuture.completedFuture(transientBlobKey1)); requestFileUploads.add(CompletableFuture.completedFuture(transientBlobKey2)); - final TestTaskManagerFileHandler testTaskManagerFileHandler = + final TestingTaskManagerFileHandler testingTaskManagerFileHandler = createTestTaskManagerFileHandler( cacheEntryDuration, requestFileUploads, EXPECTED_TASK_MANAGER_ID); - final File outputFile = temporaryFolder.newFile(); + final File outputFile = TempDirUtils.newFile(temporaryFolder.toPath()); final TestingChannelHandlerContext testingContext = new TestingChannelHandlerContext(outputFile); - testTaskManagerFileHandler.respondToRequest( + testingTaskManagerFileHandler.respondToRequest( testingContext, HTTP_REQUEST, handlerRequest, null); Thread.sleep(delayBetweenRequests.toMilliseconds()); // the handler should not trigger the file upload again because it is still cached - testTaskManagerFileHandler.respondToRequest( + testingTaskManagerFileHandler.respondToRequest( testingContext, HTTP_REQUEST, handlerRequest, null); return outputFile; } - private AbstractTaskManagerFileHandlerTest.TestTaskManagerFileHandler - createTestTaskManagerFileHandler( - Time cacheEntryDuration, - Queue<CompletableFuture<TransientBlobKey>> requestFileUploads, - ResourceID expectedTaskManagerId) { + private TestingTaskManagerFileHandler createTestTaskManagerFileHandler( + Time cacheEntryDuration, + Queue<CompletableFuture<TransientBlobKey>> requestFileUploads, + ResourceID expectedTaskManagerId) { final ResourceManagerGateway resourceManagerGateway = new TestingResourceManagerGateway(); - return new TestTaskManagerFileHandler( + return new TestingTaskManagerFileHandler( () -> CompletableFuture.completedFuture(null), TestingUtils.infiniteTime(), Collections.emptyMap(), @@ -242,7 +209,7 @@ public class AbstractTaskManagerFileHandlerTest extends TestLogger { } private static File createFileWithContent(String fileContent) throws IOException { - final File file = temporaryFolder.newFile(); + final File file = TempDirUtils.newFile(temporaryFolder.toPath()); // write random content into the file try (FileOutputStream fileOutputStream = new FileOutputStream(file)) { @@ -259,290 +226,6 @@ public class AbstractTaskManagerFileHandlerTest extends TestLogger { } } - /** Class under test. */ - private static final class TestTaskManagerFileHandler - extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> { - - private final Queue<CompletableFuture<TransientBlobKey>> requestFileUploads; - - private final ResourceID expectedTaskManagerId; - - protected TestTaskManagerFileHandler( - @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, - @Nonnull Time timeout, - @Nonnull Map<String, String> responseHeaders, - @Nonnull - UntypedResponseMessageHeaders< - EmptyRequestBody, TaskManagerMessageParameters> - untypedResponseMessageHeaders, - @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, - @Nonnull TransientBlobService transientBlobService, - @Nonnull Time cacheEntryDuration, - Queue<CompletableFuture<TransientBlobKey>> requestFileUploads, - ResourceID expectedTaskManagerId) { - super( - leaderRetriever, - timeout, - responseHeaders, - untypedResponseMessageHeaders, - resourceManagerGatewayRetriever, - transientBlobService, - cacheEntryDuration); - this.requestFileUploads = Preconditions.checkNotNull(requestFileUploads); - this.expectedTaskManagerId = Preconditions.checkNotNull(expectedTaskManagerId); - } - - @Override - protected CompletableFuture<TransientBlobKey> requestFileUpload( - ResourceManagerGateway resourceManagerGateway, - Tuple2<ResourceID, String> taskManagerIdAndFileName) { - assertThat(taskManagerIdAndFileName.f0, is(equalTo(expectedTaskManagerId))); - final CompletableFuture<TransientBlobKey> transientBlobKeyFuture = - requestFileUploads.poll(); - - if (transientBlobKeyFuture != null) { - return transientBlobKeyFuture; - } else { - return FutureUtils.completedExceptionally( - new FlinkException("Could not upload file.")); - } - } - } - - /** Testing implementation of {@link ChannelHandlerContext}. */ - private static final class TestingChannelHandlerContext implements ChannelHandlerContext { - - final File outputFile; - - private TestingChannelHandlerContext(File outputFile) { - this.outputFile = Preconditions.checkNotNull(outputFile); - } - - @Override - public ChannelFuture write(Object msg, ChannelPromise promise) { - if (msg instanceof DefaultFileRegion) { - final DefaultFileRegion defaultFileRegion = (DefaultFileRegion) msg; - - try (final FileOutputStream fileOutputStream = new FileOutputStream(outputFile)) { - fileOutputStream.getChannel(); - - defaultFileRegion.transferTo(fileOutputStream.getChannel(), 0L); - } catch (IOException ioe) { - throw new RuntimeException(ioe); - } - } - - return new DefaultChannelPromise(new EmbeddedChannel()); - } - - @Override - public EventExecutor executor() { - return ImmediateEventExecutor.INSTANCE; - } - - @Override - public ChannelFuture write(Object msg) { - return write(msg, null); - } - - @Override - public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { - final ChannelFuture channelFuture = write(msg, promise); - flush(); - return channelFuture; - } - - @Override - public ChannelFuture writeAndFlush(Object msg) { - return writeAndFlush(msg, null); - } - - @Override - public ChannelPipeline pipeline() { - return mock(ChannelPipeline.class); - } - - // ----------------------------------------------------- - // Automatically generated implementation - // ----------------------------------------------------- - - @Override - public Channel channel() { - return null; - } - - @Override - public String name() { - return null; - } - - @Override - public ChannelHandler handler() { - return null; - } - - @Override - public boolean isRemoved() { - return false; - } - - @Override - public ChannelHandlerContext fireChannelRegistered() { - return null; - } - - @Override - public ChannelHandlerContext fireChannelUnregistered() { - return null; - } - - @Override - public ChannelHandlerContext fireChannelActive() { - return null; - } - - @Override - public ChannelHandlerContext fireChannelInactive() { - return null; - } - - @Override - public ChannelHandlerContext fireExceptionCaught(Throwable cause) { - return null; - } - - @Override - public ChannelHandlerContext fireUserEventTriggered(Object event) { - return null; - } - - @Override - public ChannelHandlerContext fireChannelRead(Object msg) { - return null; - } - - @Override - public ChannelHandlerContext fireChannelReadComplete() { - return null; - } - - @Override - public ChannelHandlerContext fireChannelWritabilityChanged() { - return null; - } - - @Override - public ChannelFuture bind(SocketAddress localAddress) { - return null; - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress) { - return null; - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { - return null; - } - - @Override - public ChannelFuture disconnect() { - return null; - } - - @Override - public ChannelFuture close() { - return null; - } - - @Override - public ChannelFuture deregister() { - return null; - } - - @Override - public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { - return null; - } - - @Override - public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { - return null; - } - - @Override - public ChannelFuture connect( - SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { - return null; - } - - @Override - public ChannelFuture disconnect(ChannelPromise promise) { - return null; - } - - @Override - public ChannelFuture close(ChannelPromise promise) { - return null; - } - - @Override - public ChannelFuture deregister(ChannelPromise promise) { - return null; - } - - @Override - public ChannelHandlerContext read() { - return null; - } - - @Override - public ChannelHandlerContext flush() { - return null; - } - - @Override - public ByteBufAllocator alloc() { - return null; - } - - @Override - public ChannelPromise newPromise() { - return null; - } - - @Override - public ChannelProgressivePromise newProgressivePromise() { - return null; - } - - @Override - public ChannelFuture newSucceededFuture() { - return null; - } - - @Override - public ChannelFuture newFailedFuture(Throwable cause) { - return null; - } - - @Override - public ChannelPromise voidPromise() { - return null; - } - - @Override - public <T> Attribute<T> attr(AttributeKey<T> key) { - return null; - } - - @Override - public <T> boolean hasAttr(AttributeKey<T> attributeKey) { - return false; - } - } - /** Testing {@link UntypedResponseMessageHeaders}. */ private static final class TestUntypedMessageHeaders implements UntypedResponseMessageHeaders< diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java index 8fdc9c840f7..35831a70b82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TaskManagerLogListHandlerTest.java @@ -31,13 +31,12 @@ import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerIdPathParam import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerLogsHeaders; import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; import org.apache.flink.testutils.TestingUtils; -import org.apache.flink.util.TestLogger; import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; -import org.junit.Before; -import org.junit.Test; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; import java.util.Arrays; import java.util.Collections; @@ -47,23 +46,18 @@ import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import static org.hamcrest.Matchers.containsString; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.hamcrest.Matchers.instanceOf; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; +import static org.assertj.core.api.Assertions.assertThat; /** Test for the {@link TaskManagerLogListHandler}. */ -public class TaskManagerLogListHandlerTest extends TestLogger { +class TaskManagerLogListHandlerTest { private static final ResourceID EXPECTED_TASK_MANAGER_ID = ResourceID.generate(); private TestingResourceManagerGateway resourceManagerGateway; private TaskManagerLogListHandler taskManagerLogListHandler; private HandlerRequest<EmptyRequestBody> handlerRequest; - @Before - public void setUp() throws HandlerRequestException { + @BeforeEach + void setUp() throws HandlerRequestException { resourceManagerGateway = new TestingResourceManagerGateway(); taskManagerLogListHandler = new TaskManagerLogListHandler( @@ -76,7 +70,7 @@ public class TaskManagerLogListHandlerTest extends TestLogger { } @Test - public void testGetTaskManagerLogsList() throws Exception { + void testGetTaskManagerLogsList() throws Exception { List<LogInfo> logsList = Arrays.asList( new LogInfo("taskmanager.log", 1024L, 1632844800000L), @@ -88,11 +82,11 @@ public class TaskManagerLogListHandlerTest extends TestLogger { taskManagerLogListHandler .handleRequest(handlerRequest, resourceManagerGateway) .get(); - assertThat(logListInfo.getLogInfos(), hasSize(logsList.size())); + assertThat(logListInfo.getLogInfos()).containsExactlyInAnyOrderElementsOf(logsList); } @Test - public void testGetTaskManagerLogsListForUnknownTaskExecutorException() throws Exception { + void testGetTaskManagerLogsListForUnknownTaskExecutorException() throws Exception { resourceManagerGateway.setRequestTaskManagerLogListFunction( EXPECTED_TASK_MANAGER_ID -> FutureUtils.completedExceptionally( @@ -101,15 +95,13 @@ public class TaskManagerLogListHandlerTest extends TestLogger { taskManagerLogListHandler.handleRequest(handlerRequest, resourceManagerGateway).get(); } catch (ExecutionException e) { final Throwable cause = e.getCause(); - assertThat(cause, is(instanceOf(RestHandlerException.class))); + assertThat(cause).isInstanceOf(RestHandlerException.class); final RestHandlerException restHandlerException = (RestHandlerException) cause; - assertThat( - restHandlerException.getHttpResponseStatus(), - is(equalTo(HttpResponseStatus.NOT_FOUND))); - assertThat( - restHandlerException.getMessage(), - containsString("Could not find TaskExecutor " + EXPECTED_TASK_MANAGER_ID)); + assertThat(restHandlerException.getHttpResponseStatus()) + .isEqualTo(HttpResponseStatus.NOT_FOUND); + assertThat(restHandlerException.getMessage()) + .contains("Could not find TaskExecutor " + EXPECTED_TASK_MANAGER_ID); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelHandlerContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelHandlerContext.java new file mode 100644 index 00000000000..1c9a69a6723 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelHandlerContext.java @@ -0,0 +1,277 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.buffer.ByteBufAllocator; +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelProgressivePromise; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise; +import org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPromise; +import org.apache.flink.shaded.netty4.io.netty.channel.DefaultFileRegion; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.apache.flink.shaded.netty4.io.netty.util.Attribute; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutor; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.ImmediateEventExecutor; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.net.SocketAddress; + +/** Testing implementation of {@link ChannelHandlerContext}. */ +class TestingChannelHandlerContext implements ChannelHandlerContext { + + private final File outputFile; + private final ChannelPipeline pipeline = new TestingChannelPipeline(); + + TestingChannelHandlerContext(File outputFile) { + this.outputFile = Preconditions.checkNotNull(outputFile); + } + + @Override + public ChannelFuture write(Object msg, ChannelPromise promise) { + if (msg instanceof DefaultFileRegion) { + final DefaultFileRegion defaultFileRegion = (DefaultFileRegion) msg; + + try (final FileOutputStream fileOutputStream = new FileOutputStream(outputFile)) { + fileOutputStream.getChannel(); + + defaultFileRegion.transferTo(fileOutputStream.getChannel(), 0L); + } catch (IOException ioe) { + throw new RuntimeException(ioe); + } + } + + return new DefaultChannelPromise(new EmbeddedChannel()); + } + + @Override + public EventExecutor executor() { + return ImmediateEventExecutor.INSTANCE; + } + + @Override + public ChannelFuture write(Object msg) { + return write(msg, null); + } + + @Override + public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) { + final ChannelFuture channelFuture = write(msg, promise); + flush(); + return channelFuture; + } + + @Override + public ChannelFuture writeAndFlush(Object msg) { + return writeAndFlush(msg, null); + } + + @Override + public ChannelPipeline pipeline() { + return pipeline; + } + + // ----------------------------------------------------- + // Automatically generated implementation + // ----------------------------------------------------- + + @Override + public Channel channel() { + return null; + } + + @Override + public String name() { + return null; + } + + @Override + public ChannelHandler handler() { + return null; + } + + @Override + public boolean isRemoved() { + return false; + } + + @Override + public ChannelHandlerContext fireChannelRegistered() { + return null; + } + + @Override + public ChannelHandlerContext fireChannelUnregistered() { + return null; + } + + @Override + public ChannelHandlerContext fireChannelActive() { + return null; + } + + @Override + public ChannelHandlerContext fireChannelInactive() { + return null; + } + + @Override + public ChannelHandlerContext fireExceptionCaught(Throwable cause) { + return null; + } + + @Override + public ChannelHandlerContext fireUserEventTriggered(Object event) { + return null; + } + + @Override + public ChannelHandlerContext fireChannelRead(Object msg) { + return null; + } + + @Override + public ChannelHandlerContext fireChannelReadComplete() { + return null; + } + + @Override + public ChannelHandlerContext fireChannelWritabilityChanged() { + return null; + } + + @Override + public ChannelFuture bind(SocketAddress localAddress) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) { + return null; + } + + @Override + public ChannelFuture disconnect() { + return null; + } + + @Override + public ChannelFuture close() { + return null; + } + + @Override + public ChannelFuture deregister() { + return null; + } + + @Override + public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress remoteAddress, ChannelPromise promise) { + return null; + } + + @Override + public ChannelFuture connect( + SocketAddress remoteAddress, SocketAddress localAddress, ChannelPromise promise) { + return null; + } + + @Override + public ChannelFuture disconnect(ChannelPromise promise) { + return null; + } + + @Override + public ChannelFuture close(ChannelPromise promise) { + return null; + } + + @Override + public ChannelFuture deregister(ChannelPromise promise) { + return null; + } + + @Override + public ChannelHandlerContext read() { + return null; + } + + @Override + public ChannelHandlerContext flush() { + return null; + } + + @Override + public ByteBufAllocator alloc() { + return null; + } + + @Override + public ChannelPromise newPromise() { + return null; + } + + @Override + public ChannelProgressivePromise newProgressivePromise() { + return null; + } + + @Override + public ChannelFuture newSucceededFuture() { + return null; + } + + @Override + public ChannelFuture newFailedFuture(Throwable cause) { + return null; + } + + @Override + public ChannelPromise voidPromise() { + return null; + } + + @Override + public <T> Attribute<T> attr(AttributeKey<T> key) { + return null; + } + + @Override + public <T> boolean hasAttr(AttributeKey<T> attributeKey) { + return false; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelPipeline.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelPipeline.java new file mode 100644 index 00000000000..437c835db35 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingChannelPipeline.java @@ -0,0 +1,380 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.shaded.netty4.io.netty.channel.Channel; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundInvoker; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPipeline; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelProgressivePromise; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise; +import org.apache.flink.shaded.netty4.io.netty.util.concurrent.EventExecutorGroup; + +import java.net.SocketAddress; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +/** Testing implementation of {@link ChannelPipeline}. */ +class TestingChannelPipeline implements ChannelPipeline { + + @Override + public ChannelPipeline addFirst(String s, ChannelHandler channelHandler) { + return null; + } + + @Override + public ChannelPipeline addFirst( + EventExecutorGroup eventExecutorGroup, String s, ChannelHandler channelHandler) { + return null; + } + + @Override + public ChannelPipeline addLast(String s, ChannelHandler channelHandler) { + return null; + } + + @Override + public ChannelPipeline addLast( + EventExecutorGroup eventExecutorGroup, String s, ChannelHandler channelHandler) { + return null; + } + + @Override + public ChannelPipeline addBefore(String s, String s1, ChannelHandler channelHandler) { + return null; + } + + @Override + public ChannelPipeline addBefore( + EventExecutorGroup eventExecutorGroup, + String s, + String s1, + ChannelHandler channelHandler) { + return null; + } + + @Override + public ChannelPipeline addAfter(String s, String s1, ChannelHandler channelHandler) { + return null; + } + + @Override + public ChannelPipeline addAfter( + EventExecutorGroup eventExecutorGroup, + String s, + String s1, + ChannelHandler channelHandler) { + return null; + } + + @Override + public ChannelPipeline addFirst(ChannelHandler... channelHandlers) { + return null; + } + + @Override + public ChannelPipeline addFirst( + EventExecutorGroup eventExecutorGroup, ChannelHandler... channelHandlers) { + return null; + } + + @Override + public ChannelPipeline addLast(ChannelHandler... channelHandlers) { + return null; + } + + @Override + public ChannelPipeline addLast( + EventExecutorGroup eventExecutorGroup, ChannelHandler... channelHandlers) { + return null; + } + + @Override + public ChannelPipeline remove(ChannelHandler channelHandler) { + return null; + } + + @Override + public ChannelHandler remove(String s) { + return null; + } + + @Override + public <T extends ChannelHandler> T remove(Class<T> aClass) { + return null; + } + + @Override + public ChannelHandler removeFirst() { + return null; + } + + @Override + public ChannelHandler removeLast() { + return null; + } + + @Override + public ChannelPipeline replace( + ChannelHandler channelHandler, String s, ChannelHandler channelHandler1) { + return null; + } + + @Override + public ChannelHandler replace(String s, String s1, ChannelHandler channelHandler) { + return null; + } + + @Override + public <T extends ChannelHandler> T replace( + Class<T> aClass, String s, ChannelHandler channelHandler) { + return null; + } + + @Override + public ChannelHandler first() { + return null; + } + + @Override + public ChannelHandlerContext firstContext() { + return null; + } + + @Override + public ChannelHandler last() { + return null; + } + + @Override + public ChannelHandlerContext lastContext() { + return null; + } + + @Override + public ChannelHandler get(String s) { + return null; + } + + @Override + public <T extends ChannelHandler> T get(Class<T> aClass) { + return null; + } + + @Override + public ChannelHandlerContext context(ChannelHandler channelHandler) { + return null; + } + + @Override + public ChannelHandlerContext context(String s) { + return null; + } + + @Override + public ChannelHandlerContext context(Class<? extends ChannelHandler> aClass) { + return null; + } + + @Override + public Channel channel() { + return null; + } + + @Override + public List<String> names() { + return null; + } + + @Override + public Map<String, ChannelHandler> toMap() { + return null; + } + + @Override + public ChannelPipeline fireChannelRegistered() { + return null; + } + + @Override + public ChannelPipeline fireChannelUnregistered() { + return null; + } + + @Override + public ChannelPipeline fireChannelActive() { + return null; + } + + @Override + public ChannelPipeline fireChannelInactive() { + return null; + } + + @Override + public ChannelPipeline fireExceptionCaught(Throwable throwable) { + return null; + } + + @Override + public ChannelPipeline fireUserEventTriggered(Object o) { + return null; + } + + @Override + public ChannelPipeline fireChannelRead(Object o) { + return null; + } + + @Override + public ChannelPipeline fireChannelReadComplete() { + return null; + } + + @Override + public ChannelPipeline fireChannelWritabilityChanged() { + return null; + } + + @Override + public ChannelFuture bind(SocketAddress socketAddress) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress, SocketAddress socketAddress1) { + return null; + } + + @Override + public ChannelFuture disconnect() { + return null; + } + + @Override + public ChannelFuture close() { + return null; + } + + @Override + public ChannelFuture deregister() { + return null; + } + + @Override + public ChannelFuture bind(SocketAddress socketAddress, ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture connect(SocketAddress socketAddress, ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture connect( + SocketAddress socketAddress, + SocketAddress socketAddress1, + ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture disconnect(ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture close(ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture deregister(ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelOutboundInvoker read() { + return null; + } + + @Override + public ChannelFuture write(Object o) { + return null; + } + + @Override + public ChannelFuture write(Object o, ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelPipeline flush() { + return null; + } + + @Override + public ChannelFuture writeAndFlush(Object o, ChannelPromise channelPromise) { + return null; + } + + @Override + public ChannelFuture writeAndFlush(Object o) { + return null; + } + + @Override + public ChannelPromise newPromise() { + return null; + } + + @Override + public ChannelProgressivePromise newProgressivePromise() { + return null; + } + + @Override + public ChannelFuture newSucceededFuture() { + return null; + } + + @Override + public ChannelFuture newFailedFuture(Throwable throwable) { + return null; + } + + @Override + public ChannelPromise voidPromise() { + return null; + } + + @Override + public Iterator<Map.Entry<String, ChannelHandler>> iterator() { + return null; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingTaskManagerFileHandler.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingTaskManagerFileHandler.java new file mode 100644 index 00000000000..8590f944cfe --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/taskmanager/TestingTaskManagerFileHandler.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.taskmanager; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.blob.TransientBlobKey; +import org.apache.flink.runtime.blob.TransientBlobService; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.UntypedResponseMessageHeaders; +import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerMessageParameters; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.concurrent.FutureUtils; + +import javax.annotation.Nonnull; + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CompletableFuture; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Testing implementation of {@link AbstractTaskManagerFileHandler}. */ +public class TestingTaskManagerFileHandler + extends AbstractTaskManagerFileHandler<TaskManagerMessageParameters> { + + private final Queue<CompletableFuture<TransientBlobKey>> requestFileUploads; + + private final ResourceID expectedTaskManagerId; + + public TestingTaskManagerFileHandler( + @Nonnull GatewayRetriever<? extends RestfulGateway> leaderRetriever, + @Nonnull Time timeout, + @Nonnull Map<String, String> responseHeaders, + @Nonnull + UntypedResponseMessageHeaders<EmptyRequestBody, TaskManagerMessageParameters> + untypedResponseMessageHeaders, + @Nonnull GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever, + @Nonnull TransientBlobService transientBlobService, + @Nonnull Time cacheEntryDuration, + Queue<CompletableFuture<TransientBlobKey>> requestFileUploads, + ResourceID expectedTaskManagerId) { + super( + leaderRetriever, + timeout, + responseHeaders, + untypedResponseMessageHeaders, + resourceManagerGatewayRetriever, + transientBlobService, + cacheEntryDuration); + this.requestFileUploads = Preconditions.checkNotNull(requestFileUploads); + this.expectedTaskManagerId = Preconditions.checkNotNull(expectedTaskManagerId); + } + + @Override + protected CompletableFuture<TransientBlobKey> requestFileUpload( + ResourceManagerGateway resourceManagerGateway, + Tuple2<ResourceID, String> taskManagerIdAndFileName) { + assertThat(taskManagerIdAndFileName.f0).isEqualTo(expectedTaskManagerId); + final CompletableFuture<TransientBlobKey> transientBlobKeyFuture = + requestFileUploads.poll(); + + if (transientBlobKeyFuture != null) { + return transientBlobKeyFuture; + } else { + return FutureUtils.completedExceptionally(new FlinkException("Could not upload file.")); + } + } +} diff --git a/tools/maven/suppressions-runtime.xml b/tools/maven/suppressions-runtime.xml index 19b43a423fd..90a2746f16f 100644 --- a/tools/maven/suppressions-runtime.xml +++ b/tools/maven/suppressions-runtime.xml @@ -33,7 +33,7 @@ under the License. <suppress files="CheckpointCoordinatorTest.java" checks="FileLength"/> <!-- Legacy mockito usages --> - <suppress files="BlobCachePutTest.java|CheckpointCoordinatorFailureTest.java|CheckpointCoordinatorMasterHooksTest.java|CheckpointCoordinatorRestoringTest.java|CheckpointCoordinatorTestingUtils.java|CheckpointCoordinatorTest.java|CheckpointMetadataLoadingTest.java|CheckpointSettingsSerializableTest.java|CheckpointStateRestoreTest.java|CheckpointStatsHistoryTest.java|CheckpointStatsSnapshotTest.java|CompletedCheckpointStatsSummaryTest.java|CompletedCheckpointTest.java|FailoverStrategyChec [...] + <suppress files="BlobCachePutTest.java|CheckpointCoordinatorFailureTest.java|CheckpointCoordinatorMasterHooksTest.java|CheckpointCoordinatorRestoringTest.java|CheckpointCoordinatorTestingUtils.java|CheckpointCoordinatorTest.java|CheckpointMetadataLoadingTest.java|CheckpointSettingsSerializableTest.java|CheckpointStateRestoreTest.java|CheckpointStatsHistoryTest.java|CheckpointStatsSnapshotTest.java|CompletedCheckpointStatsSummaryTest.java|CompletedCheckpointTest.java|FailoverStrategyChec [...] checks="IllegalImport"/> <!-- Legacy powermock usages -->