This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c5516533a57 [FLINK-32884] [flink-clients] Sending messageHeaders with 
decorated customHeaders for PyFlink client (#23452)
c5516533a57 is described below

commit c5516533a5705297fe3cfd33860e59da30750f69
Author: Elkhan Dadash <elkh...@users.noreply.github.com>
AuthorDate: Tue Sep 26 22:30:12 2023 -0700

    [FLINK-32884] [flink-clients] Sending messageHeaders with decorated 
customHeaders for PyFlink client (#23452)
---
 .../org/apache/flink/client/program/rest/RestClusterClient.java  | 2 +-
 .../org/apache/flink/client/program/rest/UrlPrefixDecorator.java | 9 +++++++++
 .../apache/flink/client/program/rest/RestClusterClientTest.java  | 6 ++++++
 .../flink/runtime/rest/messages/CustomHeadersDecorator.java      | 9 +++++++++
 4 files changed, 25 insertions(+), 1 deletion(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index b05dc331853..cc5a6d2d833 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -1021,7 +1021,7 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                                                         restClient.sendRequest(
                                                                 
webMonitorBaseUrl.getHost(),
                                                                 
webMonitorBaseUrl.getPort(),
-                                                                messageHeaders,
+                                                                headers,
                                                                 
messageParameters,
                                                                 request,
                                                                 filesToUpload);
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
index bd6398176c5..52f1b8cf56e 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/UrlPrefixDecorator.java
@@ -107,4 +107,13 @@ public class UrlPrefixDecorator<
     public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() {
         return decorated.getSupportedAPIVersions();
     }
+
+    @Override
+    public Collection<Class<?>> getResponseTypeParameters() {
+        return decorated.getResponseTypeParameters();
+    }
+
+    public MessageHeaders<R, P, M> getDecorated() {
+        return decorated;
+    }
 }
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index 4f7e2f0b08d..e1db5c6d1b0 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -61,6 +61,7 @@ import 
org.apache.flink.runtime.rest.handler.async.AsynchronousOperationInfo;
 import org.apache.flink.runtime.rest.handler.async.AsynchronousOperationResult;
 import org.apache.flink.runtime.rest.handler.async.TriggerResponse;
 import 
org.apache.flink.runtime.rest.messages.AccumulatorsIncludeSerializedValueQueryParameter;
+import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator;
 import org.apache.flink.runtime.rest.messages.EmptyMessageParameters;
 import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
 import org.apache.flink.runtime.rest.messages.EmptyResponseBody;
@@ -909,6 +910,11 @@ class RestClusterClientTest {
         final AtomicBoolean firstSubmitRequestFailed = new 
AtomicBoolean(false);
         failHttpRequest =
                 (messageHeaders, messageParameters, requestBody) -> {
+                    messageHeaders =
+                            ((UrlPrefixDecorator)
+                                            ((CustomHeadersDecorator) 
messageHeaders)
+                                                    .getDecorated())
+                                    .getDecorated();
                     if (messageHeaders instanceof JobExecutionResultHeaders) {
                         return !firstExecutionResultPollFailed.getAndSet(true);
                     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
index 979c849166c..05fa8289dec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java
@@ -96,6 +96,11 @@ public class CustomHeadersDecorator<
         return customHeaders;
     }
 
+    @Override
+    public Collection<Class<?>> getResponseTypeParameters() {
+        return decorated.getResponseTypeParameters();
+    }
+
     /**
      * Sets the custom headers for the message.
      *
@@ -117,4 +122,8 @@ public class CustomHeadersDecorator<
         }
         customHeaders.add(httpHeader);
     }
+
+    public MessageHeaders<R, P, M> getDecorated() {
+        return decorated;
+    }
 }

Reply via email to