[ https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517493#comment-16517493 ]
ASF GitHub Bot commented on FLINK-9599: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/6178#discussion_r196560163 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -0,0 +1,471 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.FileUploads; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification; +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.EmptyResponseBody; +import org.apache.flink.runtime.rest.messages.MessageHeaders; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.util.RestMapperUtils; +import org.apache.flink.runtime.rpc.RpcUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import okhttp3.MediaType; +import okhttp3.MultipartBody; +import okhttp3.OkHttpClient; +import okhttp3.Request; +import okhttp3.Response; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import javax.annotation.Nonnull; + +import java.io.File; +import java.io.IOException; +import java.io.StringWriter; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Random; +import java.util.concurrent.CompletableFuture; + +import static java.util.Objects.requireNonNull; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link FileUploadHandler}. Ensures that multipart http messages containing files and/or json are properly + * handled. + */ +public class FileUploadHandlerTest { + + private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static final Random RANDOM = new Random(); + + @ClassRule + public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + private static RestServerEndpoint serverEndpoint; + private static String serverAddress; + + private static MultipartMixedHandler mixedHandler; + private static MultipartJsonHandler jsonHandler; + private static MultipartFileHandler fileHandler; + private static File file1; + private static File file2; + + @BeforeClass + public static void setup() throws Exception { + Configuration config = new Configuration(); + config.setInteger(RestOptions.PORT, 0); + config.setString(RestOptions.ADDRESS, "localhost"); + config.setString(WebOptions.UPLOAD_DIR, TEMPORARY_FOLDER.newFolder().getCanonicalPath()); + + RestServerEndpointConfiguration serverConfig = RestServerEndpointConfiguration.fromConfiguration(config); + + final String restAddress = "http://localhost:1234"; + RestfulGateway mockRestfulGateway = mock(RestfulGateway.class); + when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress)); + + final GatewayRetriever<RestfulGateway> mockGatewayRetriever = () -> + CompletableFuture.completedFuture(mockRestfulGateway); + + file1 = TEMPORARY_FOLDER.newFile(); + Files.write(file1.toPath(), "hello".getBytes(ConfigConstants.DEFAULT_CHARSET)); + file2 = TEMPORARY_FOLDER.newFile(); + Files.write(file2.toPath(), "world".getBytes(ConfigConstants.DEFAULT_CHARSET)); + + mixedHandler = new MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever); + jsonHandler = new MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever); + fileHandler = new MultipartFileHandler(CompletableFuture.completedFuture(restAddress), mockGatewayRetriever); + + final List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = Arrays.asList( + Tuple2.of(mixedHandler.getMessageHeaders(), mixedHandler), + Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler), + Tuple2.of(fileHandler.getMessageHeaders(), fileHandler)); + + serverEndpoint = new TestRestServerEndpoint(serverConfig, handlers); + + serverEndpoint.start(); + serverAddress = serverEndpoint.getRestBaseUrl(); + } + + @AfterClass + public static void teardown() throws Exception { + if (serverEndpoint != null) { + serverEndpoint.close(); + serverEndpoint = null; + } + } + + private static Request buildRequest(String headerUrl, int index, boolean includeFile, boolean includeJson) throws IOException { + Preconditions.checkArgument(includeFile || includeJson, "You have to either include JSON or a file."); + MultipartBody.Builder builder = new MultipartBody.Builder(); + + if (includeFile) { + okhttp3.RequestBody filePayload1 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1); + builder = builder.addFormDataPart("file1", file1.getName(), filePayload1); + + okhttp3.RequestBody filePayload2 = okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2); + builder = builder.addFormDataPart("file2", file2.getName(), filePayload2); + } + + if (includeJson) { + TestRequestBody jsonRequestBody = new TestRequestBody(index); + + StringWriter sw = new StringWriter(); + OBJECT_MAPPER.writeValue(sw, jsonRequestBody); + + String jsonPayload = sw.toString(); + + builder = builder.addFormDataPart(org.apache.flink.runtime.rest.FileUploadHandler.HTTP_ATTRIBUTE_REQUEST, jsonPayload); + } + + MultipartBody multipartBody = builder + .setType(MultipartBody.FORM) + .build(); + + return new Request.Builder() + .url(serverAddress + headerUrl) + .post(multipartBody) + .build(); + } + + @Test + public void testMixedMultipart() throws Exception { + OkHttpClient client = new OkHttpClient(); + + Request jsonRequest = buildRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), RANDOM.nextInt(), false, true); --- End diff -- Would be good to have methods `buildFileRequest` and `buildJsonRequest` and `buildJsonFileRequest` instead of the version with boolean. Booleans make it harder to understand which request is generated. > Implement generic mechanism to receive files via rest > ----------------------------------------------------- > > Key: FLINK-9599 > URL: https://issues.apache.org/jira/browse/FLINK-9599 > Project: Flink > Issue Type: New Feature > Components: REST > Reporter: Chesnay Schepler > Assignee: Chesnay Schepler > Priority: Major > Fix For: 1.6.0 > > > As a prerequisite for a cleaner implementation of FLINK-9280 we should > * extend the RestClient to allow the upload of Files > * extend FileUploadHandler to accept mixed multi-part requests (json + files) > * generalize mechanism for accessing uploaded files in {{AbstractHandler}} > Uploaded files can be forwarded to subsequent handlers as an attribute, > similar to the existing special case for the {{JarUploadHandler}}. The JSON > body can be forwarded by replacing the incoming http requests with a simple > {{DefaultFullHttpRequest}}. > Uploaded files will be retrievable through the {{HandlerRequest}}. > I'm not certain if/how we can document that a handler accepts files. -- This message was sent by Atlassian JIRA (v7.6.3#76005)