This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new c5516533a57 [FLINK-32884] [flink-clients] Sending messageHeaders with decorated customHeaders for PyFlink client (#23452) c5516533a57 is described below commit c5516533a5705297fe3cfd33860e59da30750f69 Author: Elkhan Dadash <elkh...@users.noreply.github.com> AuthorDate: Tue Sep 26 22:30:12 2023 -0700 [FLINK-32884] [flink-clients] Sending messageHeaders with decorated customHeaders for PyFlink client (#23452) --- .../org/apache/flink/client/program/rest/RestClusterClient.java | 2 +- .../org/apache/flink/client/program/rest/UrlPrefixDecorator.java | 9 +++++++++ .../apache/flink/client/program/rest/RestClusterClientTest.java | 6 ++++++ .../flink/runtime/rest/messages/CustomHeadersDecorator.java | 9 +++++++++ 4 files changed, 25 insertions(+), 1 deletion(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index b05dc331853..cc5a6d2d833 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -1021,7 +1021,7 @@ public class RestClusterClient<T> implements ClusterClient<T> { restClient.sendRequest( webMonitorBaseUrl.getHost(), webMonitorBaseUrl.getPort(), - messageHeaders, + headers, messageParameters, request, filesToUpload); diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java index bd6398176c5..52f1b8cf56e 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java @@ -107,4 +107,13 @@ public class UrlPrefixDecorator< public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() { return decorated.getSupportedAPIVersions(); } + + @Override + public Collection<Class<?>> getResponseTypeParameters() { + return decorated.getResponseTypeParameters(); + } + + public MessageHeaders<R, P, M> getDecorated() { + return decorated; + } } diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index 4f7e2f0b08d..e1db5c6d1b0 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo; import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult; import org.apache.flink.runtime.rest.handler.async.TriggerResponse; import org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter; +import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.EmptyResponseBody; @@ -909,6 +910,11 @@ class RestClusterClientTest { final AtomicBoolean firstSubmitRequestFailed = new AtomicBoolean(false); failHttpRequest = (messageHeaders, messageParameters, requestBody) -> { + messageHeaders = + ((UrlPrefixDecorator) + ((CustomHeadersDecorator) messageHeaders) + .getDecorated()) + .getDecorated(); if (messageHeaders instanceof JobExecutionResultHeaders) { return !firstExecutionResultPollFailed.getAndSet(true); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java index 979c849166c..05fa8289dec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java @@ -96,6 +96,11 @@ public class CustomHeadersDecorator< return customHeaders; } + @Override + public Collection<Class<?>> getResponseTypeParameters() { + return decorated.getResponseTypeParameters(); + } + /** * Sets the custom headers for the message. * @@ -117,4 +122,8 @@ public class CustomHeadersDecorator< } customHeaders.add(httpHeader); } + + public MessageHeaders<R, P, M> getDecorated() { + return decorated; + } }