This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9546f8243a24e7b45582b6de6702f819f1d73f97 Author: Chesnay Schepler <ches...@apache.org> AuthorDate: Thu Aug 17 10:46:54 2023 +0200 [FLINK-32888] Handle hasNext() throwing EndOfDataDecoderException This _probably_ happens when a non-empty http content is received that does not contain any attribute data. --- .../flink/runtime/rest/FileUploadHandler.java | 12 ++++- .../runtime/rest/FileUploadHandlerITCase.java | 56 ++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java index c3b797bcf72..c9e1fd78d74 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/FileUploadHandler.java @@ -148,7 +148,7 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> { currentHttpPostRequestDecoder.offer(httpContent); while (httpContent != LastHttpContent.EMPTY_LAST_CONTENT - && currentHttpPostRequestDecoder.hasNext()) { + && hasNext(currentHttpPostRequestDecoder)) { final InterfaceHttpData data = currentHttpPostRequestDecoder.next(); if (data.getHttpDataType() == InterfaceHttpData.HttpDataType.FileUpload) { final DiskFileUpload fileUpload = (DiskFileUpload) data; @@ -212,6 +212,16 @@ public class FileUploadHandler extends SimpleChannelInboundHandler<HttpObject> { } } + private static boolean hasNext(HttpPostRequestDecoder decoder) { + try { + return decoder.hasNext(); + } catch (HttpPostRequestDecoder.EndOfDataDecoderException e) { + // this can occur if the final chuck wasn't empty, but didn't contain any attribute data + // unfortunately the Netty APIs don't give us any way to check this + return false; + } + } + private void handleError( ChannelHandlerContext ctx, String errorMessage, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java index 75879a9cea7..45d14f1a444 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerITCase.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.rest.messages.RequestBody; import org.apache.flink.runtime.rest.util.RestMapperUtils; import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.FileUtils; import org.apache.flink.util.function.BiConsumerWithException; @@ -42,10 +43,12 @@ import org.junit.jupiter.api.io.TempDir; import java.io.File; import java.io.IOException; +import java.io.RandomAccessFile; import java.io.StringWriter; import java.lang.reflect.Field; import java.nio.file.Path; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.LinkedHashSet; @@ -123,6 +126,15 @@ class FileUploadHandlerITCase { return finalizeRequest(builder, headerUrl); } + private Request buildMixedRequest( + String headerUrl, MultipartUploadExtension.TestRequestBody json, File file) + throws IOException { + MultipartBody.Builder builder = new MultipartBody.Builder(); + builder = addJsonPart(builder, json, FileUploadHandler.HTTP_ATTRIBUTE_REQUEST); + builder = addFilePart(builder, file, file.getName()); + return finalizeRequest(builder, headerUrl); + } + private Request buildMixedRequest( String headerUrl, MultipartUploadExtension.TestRequestBody json) throws IOException { MultipartBody.Builder builder = new MultipartBody.Builder(); @@ -227,6 +239,50 @@ class FileUploadHandlerITCase { verifyNoFileIsRegisteredToDeleteOnExitHook(); } + /** + * This test checks for a specific multipart request chunk layout using a magic number. + * + * <p>These things are very susceptible to interference from other requests or parts of the + * payload; for example if the JSON payload increases by a single byte it can already break the + * number. Do not reuse the client. + * + * <p>To find the magic number you can define a static counter, and loop the test in the IDE + * (without forking!) while incrementing the counter on each run. + */ + @Test + void testMixedMultipartEndOfDataDecoderExceptionHandling(@TempDir Path tmp) throws Exception { + OkHttpClient client = createOkHttpClientWithNoTimeouts(); + + MultipartUploadExtension.MultipartMixedHandler mixedHandler = + multipartUpdateExtensionWrapper.getCustomExtension().getMixedHandler(); + + MultipartUploadExtension.TestRequestBody json = + new MultipartUploadExtension.TestRequestBody(); + + File file = TempDirUtils.newFile(tmp); + try (RandomAccessFile rw = new RandomAccessFile(file, "rw")) { + // magic value that reliably reproduced EndOfDataDecoderException in hasNext() + rw.setLength(1424); + } + multipartUpdateExtensionWrapper + .getCustomExtension() + .setFileUploadVerifier( + (handlerRequest, restfulGateway) -> + MultipartUploadExtension.assertUploadedFilesEqual( + handlerRequest, Collections.singleton(file))); + + Request singleFileMixedRequest = + buildMixedRequest( + mixedHandler.getMessageHeaders().getTargetRestEndpointURL(), json, file); + try (Response response = client.newCall(singleFileMixedRequest).execute()) { + assertThat(response.code()) + .isEqualTo(mixedHandler.getMessageHeaders().getResponseStatusCode().code()); + assertThat(mixedHandler.lastReceivedRequest).isEqualTo(json); + } + + verifyNoFileIsRegisteredToDeleteOnExitHook(); + } + @Test void testJsonMultipart() throws Exception { OkHttpClient client = createOkHttpClientWithNoTimeouts();