[ 
https://issues.apache.org/jira/browse/FLINK-9599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16517493#comment-16517493
 ] 

ASF GitHub Bot commented on FLINK-9599:
---------------------------------------

Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6178#discussion_r196560163
  
    --- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
    @@ -0,0 +1,471 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.runtime.rest;
    +
    +import org.apache.flink.api.common.time.Time;
    +import org.apache.flink.api.java.tuple.Tuple2;
    +import org.apache.flink.configuration.ConfigConstants;
    +import org.apache.flink.configuration.Configuration;
    +import org.apache.flink.configuration.RestOptions;
    +import org.apache.flink.configuration.WebOptions;
    +import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
    +import org.apache.flink.runtime.rest.handler.FileUploads;
    +import org.apache.flink.runtime.rest.handler.HandlerRequest;
    +import org.apache.flink.runtime.rest.handler.RestHandlerException;
    +import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
    +import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
    +import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
    +import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
    +import org.apache.flink.runtime.rest.messages.MessageHeaders;
    +import org.apache.flink.runtime.rest.messages.RequestBody;
    +import org.apache.flink.runtime.rest.util.RestMapperUtils;
    +import org.apache.flink.runtime.rpc.RpcUtils;
    +import org.apache.flink.runtime.webmonitor.RestfulGateway;
    +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
    +import org.apache.flink.util.Preconditions;
    +
    +import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
    +import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
    +import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
    +import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
    +import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
    +
    +import okhttp3.MediaType;
    +import okhttp3.MultipartBody;
    +import okhttp3.OkHttpClient;
    +import okhttp3.Request;
    +import okhttp3.Response;
    +import org.junit.AfterClass;
    +import org.junit.BeforeClass;
    +import org.junit.ClassRule;
    +import org.junit.Test;
    +import org.junit.rules.TemporaryFolder;
    +
    +import javax.annotation.Nonnull;
    +
    +import java.io.File;
    +import java.io.IOException;
    +import java.io.StringWriter;
    +import java.nio.file.Files;
    +import java.nio.file.Path;
    +import java.util.Arrays;
    +import java.util.Collection;
    +import java.util.Collections;
    +import java.util.List;
    +import java.util.Random;
    +import java.util.concurrent.CompletableFuture;
    +
    +import static java.util.Objects.requireNonNull;
    +import static org.junit.Assert.assertArrayEquals;
    +import static org.junit.Assert.assertEquals;
    +import static org.mockito.Matchers.any;
    +import static org.mockito.Mockito.mock;
    +import static org.mockito.Mockito.when;
    +
    +/**
    + * Tests for the {@link FileUploadHandler}. Ensures that multipart http 
messages containing files and/or json are properly
    + * handled.
    + */
    +public class FileUploadHandlerTest {
    +
    +   private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
    +   private static final Random RANDOM = new Random();
    +
    +   @ClassRule
    +   public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
    +
    +   private static RestServerEndpoint serverEndpoint;
    +   private static String serverAddress;
    +
    +   private static MultipartMixedHandler mixedHandler;
    +   private static MultipartJsonHandler jsonHandler;
    +   private static MultipartFileHandler fileHandler;
    +   private static File file1;
    +   private static File file2;
    +
    +   @BeforeClass
    +   public static void setup() throws Exception {
    +           Configuration config = new Configuration();
    +           config.setInteger(RestOptions.PORT, 0);
    +           config.setString(RestOptions.ADDRESS, "localhost");
    +           config.setString(WebOptions.UPLOAD_DIR, 
TEMPORARY_FOLDER.newFolder().getCanonicalPath());
    +
    +           RestServerEndpointConfiguration serverConfig = 
RestServerEndpointConfiguration.fromConfiguration(config);
    +
    +           final String restAddress = "http://localhost:1234";;
    +           RestfulGateway mockRestfulGateway = mock(RestfulGateway.class);
    +           
when(mockRestfulGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(restAddress));
    +
    +           final GatewayRetriever<RestfulGateway> mockGatewayRetriever = 
() ->
    +                   CompletableFuture.completedFuture(mockRestfulGateway);
    +
    +           file1 = TEMPORARY_FOLDER.newFile();
    +           Files.write(file1.toPath(), 
"hello".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +           file2 = TEMPORARY_FOLDER.newFile();
    +           Files.write(file2.toPath(), 
"world".getBytes(ConfigConstants.DEFAULT_CHARSET));
    +
    +           mixedHandler = new 
MultipartMixedHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
    +           jsonHandler = new 
MultipartJsonHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
    +           fileHandler = new 
MultipartFileHandler(CompletableFuture.completedFuture(restAddress), 
mockGatewayRetriever);
    +
    +           final List<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = Arrays.asList(
    +                   Tuple2.of(mixedHandler.getMessageHeaders(), 
mixedHandler),
    +                   Tuple2.of(jsonHandler.getMessageHeaders(), jsonHandler),
    +                   Tuple2.of(fileHandler.getMessageHeaders(), 
fileHandler));
    +
    +           serverEndpoint = new TestRestServerEndpoint(serverConfig, 
handlers);
    +
    +           serverEndpoint.start();
    +           serverAddress = serverEndpoint.getRestBaseUrl();
    +   }
    +
    +   @AfterClass
    +   public static void teardown() throws Exception {
    +           if (serverEndpoint != null) {
    +                   serverEndpoint.close();
    +                   serverEndpoint = null;
    +           }
    +   }
    +
    +   private static Request buildRequest(String headerUrl, int index, 
boolean includeFile, boolean includeJson) throws IOException {
    +           Preconditions.checkArgument(includeFile || includeJson, "You 
have to either include JSON or a file.");
    +           MultipartBody.Builder builder = new MultipartBody.Builder();
    +
    +           if (includeFile) {
    +                   okhttp3.RequestBody filePayload1 = 
okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file1);
    +                   builder = builder.addFormDataPart("file1", 
file1.getName(), filePayload1);
    +
    +                   okhttp3.RequestBody filePayload2 = 
okhttp3.RequestBody.create(MediaType.parse("application/octet-stream"), file2);
    +                   builder = builder.addFormDataPart("file2", 
file2.getName(), filePayload2);
    +           }
    +
    +           if (includeJson) {
    +                   TestRequestBody jsonRequestBody = new 
TestRequestBody(index);
    +
    +                   StringWriter sw = new StringWriter();
    +                   OBJECT_MAPPER.writeValue(sw, jsonRequestBody);
    +
    +                   String jsonPayload = sw.toString();
    +
    +                   builder = 
builder.addFormDataPart(org.apache.flink.runtime.rest.FileUploadHandler.HTTP_ATTRIBUTE_REQUEST,
 jsonPayload);
    +           }
    +
    +           MultipartBody multipartBody = builder
    +                   .setType(MultipartBody.FORM)
    +                   .build();
    +
    +           return new Request.Builder()
    +                   .url(serverAddress + headerUrl)
    +                   .post(multipartBody)
    +                   .build();
    +   }
    +
    +   @Test
    +   public void testMixedMultipart() throws Exception {
    +           OkHttpClient client = new OkHttpClient();
    +
    +           Request jsonRequest = 
buildRequest(mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), 
RANDOM.nextInt(), false, true);
    --- End diff --
    
    Would be good to have methods `buildFileRequest` and `buildJsonRequest` and 
`buildJsonFileRequest` instead of the version with boolean. Booleans make it 
harder to understand which request is generated.


> Implement generic mechanism to receive files via rest
> -----------------------------------------------------
>
>                 Key: FLINK-9599
>                 URL: https://issues.apache.org/jira/browse/FLINK-9599
>             Project: Flink
>          Issue Type: New Feature
>          Components: REST
>            Reporter: Chesnay Schepler
>            Assignee: Chesnay Schepler
>            Priority: Major
>             Fix For: 1.6.0
>
>
> As a prerequisite for a cleaner implementation of FLINK-9280 we should
>  * extend the RestClient to allow the upload of Files
>  * extend FileUploadHandler to accept mixed multi-part requests (json + files)
>  * generalize mechanism for accessing uploaded files in {{AbstractHandler}}
> Uploaded files can be forwarded to subsequent handlers as an attribute, 
> similar to the existing special case for the {{JarUploadHandler}}. The JSON 
> body can be forwarded by replacing the incoming http requests with a simple 
> {{DefaultFullHttpRequest}}.
> Uploaded files will be retrievable through the {{HandlerRequest}}.
> I'm not certain if/how we can document that a handler accepts files.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to