This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3e1e32bd89a2ed871e79cd16634c6f66d5ff3db8
Author: Chesnay Schepler <ches...@apache.org>
AuthorDate: Fri Aug 18 14:13:28 2023 +0200

    [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException
---
 .../flink/runtime/rest/FileUploadHandler.java      | 12 ++++-
 .../runtime/rest/FileUploadHandlerITCase.java      | 58 ++++++++++++++++++++++
 2 files changed, 69 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
index c3b797bcf72..c9e1fd78d74 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java
@@ -148,7 +148,7 @@ public class FileUploadHandler extends 
SimpleChannelInboundHandler<HttpObject> {
                 currentHttpPostRequestDecoder.offer(httpContent);
 
                 while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT
-                        && currentHttpPostRequestDecoder.hasNext()) {
+                        && hasNext(currentHttpPostRequestDecoder)) {
                     final InterfaceHttpData data = 
currentHttpPostRequestDecoder.next();
                     if (data.getHttpDataType() == 
InterfaceHttpData.HttpDataType.FileUpload) {
                         final DiskFileUpload fileUpload = (DiskFileUpload) 
data;
@@ -212,6 +212,16 @@ public class FileUploadHandler extends 
SimpleChannelInboundHandler<HttpObject> {
         }
     }
 
+    private static boolean hasNext(HttpPostRequestDecoder decoder) {
+        try {
+            return decoder.hasNext();
+        } catch (HttpPostRequestDecoder.EndOfDataDecoderException e) {
+            // this can occur if the final chuck wasn't empty, but didn't 
contain any attribute data
+            // unfortunately the Netty APIs don't give us any way to check this
+            return false;
+        }
+    }
+
     private void handleError(
             ChannelHandlerContext ctx,
             String errorMessage,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
index 34d17955922..480b58da26e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.rest.messages.RequestBody;
 import org.apache.flink.runtime.rest.util.RestMapperUtils;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
 import org.apache.flink.util.FileUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.util.function.BiConsumerWithException;
@@ -39,13 +40,16 @@ import okhttp3.Response;
 import org.junit.ClassRule;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.RandomAccessFile;
 import java.io.StringWriter;
 import java.lang.reflect.Field;
 import java.nio.file.Path;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -71,6 +75,8 @@ public class FileUploadHandlerITCase extends TestLogger {
     @Rule
     public final MultipartUploadResource multipartUpdateResource = new 
MultipartUploadResource();
 
+    @Rule public final TemporaryFolder tmp = new TemporaryFolder();
+
     private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
 
     @ClassRule
@@ -115,6 +121,15 @@ public class FileUploadHandlerITCase extends TestLogger {
         return finalizeRequest(builder, headerUrl);
     }
 
+    private Request buildMixedRequest(
+            String headerUrl, MultipartUploadResource.TestRequestBody json, 
File file)
+            throws IOException {
+        MultipartBody.Builder builder = new MultipartBody.Builder();
+        builder = addJsonPart(builder, json, 
FileUploadHandler.HTTP_ATTRIBUTE_REQUEST);
+        builder = addFilePart(builder, file, file.getName());
+        return finalizeRequest(builder, headerUrl);
+    }
+
     private Request buildMixedRequest(
             String headerUrl, MultipartUploadResource.TestRequestBody json) 
throws IOException {
         MultipartBody.Builder builder = new MultipartBody.Builder();
@@ -219,6 +234,49 @@ public class FileUploadHandlerITCase extends TestLogger {
         verifyNoFileIsRegisteredToDeleteOnExitHook();
     }
 
+    /**
+     * This test checks for a specific multipart request chunk layout using a 
magic number.
+     *
+     * <p>These things are very susceptible to interference from other 
requests or parts of the
+     * payload; for example if the JSON payload increases by a single byte it 
can already break the
+     * number. Do not reuse the client.
+     *
+     * <p>To find the magic number you can define a static counter, and loop 
the test in the IDE
+     * (without forking!) while incrementing the counter on each run.
+     */
+    @Test
+    public void testMixedMultipartEndOfDataDecoderExceptionHandling() throws 
Exception {
+        OkHttpClient client = createOkHttpClientWithNoTimeouts();
+
+        MultipartUploadResource.MultipartMixedHandler mixedHandler =
+                multipartUpdateResource.getMixedHandler();
+
+        MultipartUploadResource.TestRequestBody json =
+                new MultipartUploadResource.TestRequestBody();
+
+        File file = TempDirUtils.newFile(tmp.newFolder().toPath());
+        try (RandomAccessFile rw = new RandomAccessFile(file, "rw")) {
+            // magic value that reliably reproduced EndOfDataDecoderException 
in hasNext()
+            rw.setLength(1424);
+        }
+        multipartUpdateResource.setFileUploadVerifier(
+                (handlerRequest, restfulGateway) ->
+                        MultipartUploadResource.assertUploadedFilesEqual(
+                                handlerRequest, Collections.singleton(file)));
+
+        Request singleFileMixedRequest =
+                buildMixedRequest(
+                        
mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), json, file);
+        try (Response response = 
client.newCall(singleFileMixedRequest).execute()) {
+            assertEquals(
+                    
mixedHandler.getMessageHeaders().getResponseStatusCode().code(),
+                    response.code());
+            assertEquals(json, mixedHandler.lastReceivedRequest);
+        }
+
+        verifyNoFileIsRegisteredToDeleteOnExitHook();
+    }
+
     @Test
     public void testJsonMultipart() throws Exception {
         OkHttpClient client = createOkHttpClientWithNoTimeouts();

Reply via email to