This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 1354d2fae3f [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816) 1354d2fae3f is described below commit 1354d2fae3fde2a448ce1fac5dee7859973a93e1 Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Thu Jul 6 21:34:36 2023 +0200 [FLINK-32373][sql-client] Support passing headers with SQL Client gateway requests (#22816) --- .../flink/configuration/ConfigConstants.java | 6 ++ .../org/apache/flink/runtime/rest/HttpHeader.java | 83 ++++++++++++++ .../org/apache/flink/runtime/rest/RestClient.java | 19 ++-- .../rest/messages/CustomHeadersDecorator.java | 120 +++++++++++++++++++++ .../runtime/rest/messages/MessageHeaders.java | 14 +++ .../flink/table/client/gateway/ExecutorImpl.java | 57 ++++++++-- .../table/client/gateway/ExecutorImplITCase.java | 19 ++++ 7 files changed, 302 insertions(+), 16 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index 3c7a89c9e00..a4968db4550 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -1756,6 +1756,12 @@ public final class ConfigConstants { /** The user lib directory name. */ public static final String DEFAULT_FLINK_USR_LIB_DIR = "usrlib"; + /** + * The environment variable name which contains a list of newline-separated HTTP headers for + * Flink's REST client. + */ + public static final String FLINK_REST_CLIENT_HEADERS = "FLINK_REST_CLIENT_HEADERS"; + // ---------------------------- Encoding ------------------------------ public static final Charset DEFAULT_CHARSET = StandardCharsets.UTF_8; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java new file mode 100644 index 00000000000..06ee95bd451 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/HttpHeader.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest; + +import java.util.Objects; + +/** Represents an HTTP header with a name and a value. */ +public class HttpHeader { + + /** The name of the HTTP header. */ + private final String name; + + /** The value of the HTTP header. */ + private final String value; + + /** + * Constructs an {@code HttpHeader} object with the specified name and value. + * + * @param name the name of the HTTP header + * @param value the value of the HTTP header + */ + public HttpHeader(String name, String value) { + this.name = name; + this.value = value; + } + + /** + * Returns the name of this HTTP header. + * + * @return the name of this HTTP header + */ + public String getName() { + return name; + } + + /** + * Returns the value of this HTTP header. + * + * @return the value of this HTTP header + */ + public String getValue() { + return value; + } + + @Override + public String toString() { + return "HttpHeader{" + "name='" + name + '\'' + ", value='" + value + '\'' + '}'; + } + + @Override + public boolean equals(Object other) { + if (this == other) { + return true; + } + if (other == null || getClass() != other.getClass()) { + return false; + } + HttpHeader that = (HttpHeader) other; + return Objects.equals(getName(), that.getName()) + && Objects.equals(getValue(), that.getValue()); + } + + @Override + public int hashCode() { + return Objects.hash(getName(), getValue()); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 2055a2227cf..ed399188db5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -69,6 +69,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.FullHttpRespon import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpClientCodec; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderNames; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaderValues; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpObjectAggregator; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; @@ -384,7 +385,8 @@ public class RestClient implements AutoCloseableAsync { targetUrl, messageHeaders.getHttpMethod().getNettyHttpMethod(), payload, - fileUploads); + fileUploads, + messageHeaders.getCustomHeaders()); final JavaType responseType; @@ -419,7 +421,8 @@ public class RestClient implements AutoCloseableAsync { String targetUrl, HttpMethod httpMethod, ByteBuf jsonPayload, - Collection<FileUpload> fileUploads) + Collection<FileUpload> fileUploads, + Collection<HttpHeader> customHeaders) throws IOException { if (fileUploads.isEmpty()) { @@ -427,22 +430,22 @@ public class RestClient implements AutoCloseableAsync { new DefaultFullHttpRequest( HttpVersion.HTTP_1_1, httpMethod, targetUrl, jsonPayload); - httpRequest - .headers() - .set(HttpHeaderNames.HOST, targetAddress) + HttpHeaders headers = httpRequest.headers(); + headers.set(HttpHeaderNames.HOST, targetAddress) .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE) .add(HttpHeaderNames.CONTENT_LENGTH, jsonPayload.capacity()) .add(HttpHeaderNames.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE); + customHeaders.forEach(ch -> headers.set(ch.getName(), ch.getValue())); return new SimpleRequest(httpRequest); } else { HttpRequest httpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, httpMethod, targetUrl); - httpRequest - .headers() - .set(HttpHeaderNames.HOST, targetAddress) + HttpHeaders headers = httpRequest.headers(); + headers.set(HttpHeaderNames.HOST, targetAddress) .set(HttpHeaderNames.CONNECTION, HttpHeaderValues.CLOSE); + customHeaders.forEach(ch -> headers.set(ch.getName(), ch.getValue())); // takes care of splitting the request into multiple parts HttpPostRequestEncoder bodyRequestEncoder; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java new file mode 100644 index 00000000000..979c849166c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/CustomHeadersDecorator.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.messages; + +import org.apache.flink.runtime.rest.HttpHeader; +import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.runtime.rest.versioning.RestAPIVersion; + +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; + +import java.util.ArrayList; +import java.util.Collection; + +/** + * Decorator class for {@link MessageHeaders} that adds the ability to include custom HTTP headers. + */ +public class CustomHeadersDecorator< + R extends RequestBody, P extends ResponseBody, M extends MessageParameters> + implements MessageHeaders<R, P, M> { + + private final MessageHeaders<R, P, M> decorated; + private Collection<HttpHeader> customHeaders; + + /** + * Creates a new {@code CustomHeadersDecorator} for a given {@link MessageHeaders} object. + * + * @param decorated The MessageHeaders to decorate. + */ + public CustomHeadersDecorator(MessageHeaders<R, P, M> decorated) { + this.decorated = decorated; + } + + @Override + public HttpMethodWrapper getHttpMethod() { + return decorated.getHttpMethod(); + } + + @Override + public String getTargetRestEndpointURL() { + return decorated.getTargetRestEndpointURL(); + } + + @Override + public Collection<? extends RestAPIVersion<?>> getSupportedAPIVersions() { + return decorated.getSupportedAPIVersions(); + } + + @Override + public Class<P> getResponseClass() { + return decorated.getResponseClass(); + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return decorated.getResponseStatusCode(); + } + + @Override + public String getDescription() { + return decorated.getDescription(); + } + + @Override + public Class<R> getRequestClass() { + return decorated.getRequestClass(); + } + + @Override + public M getUnresolvedMessageParameters() { + return decorated.getUnresolvedMessageParameters(); + } + + /** + * Returns the custom headers added to the message. + * + * @return The custom headers as a collection of {@link HttpHeader}. + */ + @Override + public Collection<HttpHeader> getCustomHeaders() { + return customHeaders; + } + + /** + * Sets the custom headers for the message. + * + * @param customHeaders A collection of custom headers. + */ + public void setCustomHeaders(Collection<HttpHeader> customHeaders) { + this.customHeaders = customHeaders; + } + + /** + * Adds a custom header to the message. Initializes the custom headers collection if it hasn't + * been initialized yet. + * + * @param httpHeader The header to add. + */ + public void addCustomHeader(HttpHeader httpHeader) { + if (customHeaders == null) { + customHeaders = new ArrayList<>(); + } + customHeaders.add(httpHeader); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java index a6c0d11d69b..63c54083493 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageHeaders.java @@ -18,8 +18,10 @@ package org.apache.flink.runtime.rest.messages; +import org.apache.flink.runtime.rest.HttpHeader; import org.apache.flink.runtime.rest.HttpMethodWrapper; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import java.util.Collection; @@ -93,4 +95,16 @@ public interface MessageHeaders< return getHttpMethod().name().toLowerCase(Locale.ROOT) + className.substring(0, headersSuffixStart); } + + /** + * Returns a collection of custom HTTP headers. + * + * <p>This default implementation returns an empty list. Override this method to provide custom + * headers if needed. + * + * @return a collection of custom {@link HttpHeaders}, empty by default. + */ + default Collection<HttpHeader> getCustomHeaders() { + return Collections.emptyList(); + } } diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java index f4f6b40ed56..d755aa510ba 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/ExecutorImpl.java @@ -19,10 +19,13 @@ package org.apache.flink.table.client.gateway; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.fs.AutoCloseableRegistry; +import org.apache.flink.runtime.rest.HttpHeader; import org.apache.flink.runtime.rest.RestClient; +import org.apache.flink.runtime.rest.messages.CustomHeadersDecorator; import org.apache.flink.runtime.rest.messages.EmptyMessageParameters; import org.apache.flink.runtime.rest.messages.EmptyRequestBody; import org.apache.flink.runtime.rest.messages.MessageHeaders; @@ -63,6 +66,7 @@ import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementReq import org.apache.flink.table.gateway.rest.message.statement.ExecuteStatementResponseBody; import org.apache.flink.table.gateway.rest.message.statement.FetchResultsMessageParameters; import org.apache.flink.table.gateway.rest.message.statement.FetchResultsResponseBody; +import org.apache.flink.table.gateway.rest.message.util.GetApiVersionResponseBody; import org.apache.flink.table.gateway.rest.serde.ResultInfo; import org.apache.flink.table.gateway.rest.util.RowFormat; import org.apache.flink.table.gateway.rest.util.SqlGatewayRestAPIVersion; @@ -80,6 +84,8 @@ import javax.annotation.Nullable; import java.io.IOException; import java.net.InetSocketAddress; import java.net.URL; +import java.util.ArrayList; +import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -113,6 +119,7 @@ public class ExecutorImpl implements Executor { private final SqlGatewayRestAPIVersion connectionVersion; private final SessionHandle sessionHandle; private final RowFormat rowFormat; + private final Collection<HttpHeader> customHttpHeaders; public ExecutorImpl( DefaultContext defaultContext, InetSocketAddress gatewayAddress, String sessionId) { @@ -170,6 +177,8 @@ public class ExecutorImpl implements Executor { this.registry = new AutoCloseableRegistry(); this.gatewayUrl = gatewayUrl; this.rowFormat = rowFormat; + this.customHttpHeaders = + readHeadersFromEnvironmentVariable(ConfigConstants.FLINK_REST_CLIENT_HEADERS); try { // register required resource this.executorService = Executors.newCachedThreadPool(); @@ -399,11 +408,12 @@ public class ExecutorImpl implements Executor { P extends ResponseBody> CompletableFuture<P> sendRequest(M messageHeaders, U messageParameters, R request) { Preconditions.checkNotNull(connectionVersion, "The connection version should not be null."); - return sendRequest( - new UrlPrefixDecorator<>(messageHeaders, gatewayUrl.getPath()), - messageParameters, - request, - connectionVersion); + CustomHeadersDecorator<R, P, U> headers = + new CustomHeadersDecorator<>( + new UrlPrefixDecorator<>(messageHeaders, gatewayUrl.getPath())); + headers.setCustomHeaders(customHttpHeaders); + + return sendRequest(headers, messageParameters, request, connectionVersion); } private < @@ -512,14 +522,20 @@ public class ExecutorImpl implements Executor { } private SqlGatewayRestAPIVersion negotiateVersion() throws Exception { + + CustomHeadersDecorator<EmptyRequestBody, GetApiVersionResponseBody, EmptyMessageParameters> + headers = + new CustomHeadersDecorator<>( + new UrlPrefixDecorator<>( + GetApiVersionHeaders.getInstance(), gatewayUrl.getPath())); + headers.setCustomHeaders(customHttpHeaders); + List<SqlGatewayRestAPIVersion> gatewayVersions = getResponse( restClient.sendRequest( gatewayUrl.getHost(), gatewayUrl.getPort(), - new UrlPrefixDecorator<>( - GetApiVersionHeaders.getInstance(), - gatewayUrl.getPath()), + headers, EmptyMessageParameters.getInstance(), EmptyRequestBody.getInstance(), Collections.emptyList(), @@ -577,4 +593,29 @@ public class ExecutorImpl implements Executor { // ignore any throwable to keep the cleanup running } } + + private static Collection<HttpHeader> readHeadersFromEnvironmentVariable(String envVarName) { + List<HttpHeader> headers = new ArrayList<>(); + String rawHeaders = System.getenv(envVarName); + + if (rawHeaders != null) { + String[] lines = rawHeaders.split("\n"); + for (String line : lines) { + String[] keyValue = line.split(":", 2); + if (keyValue.length == 2) { + headers.add(new HttpHeader(keyValue[0], keyValue[1])); + } else { + LOG.info( + "Skipped a malformed header {} from FLINK_REST_CLIENT_HEADERS env variable. Expecting newline-separated headers in format header_name:header_value.", + line); + } + } + } + return headers; + } + + @VisibleForTesting + Collection<HttpHeader> getCustomHttpHeaders() { + return customHttpHeaders; + } } diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java index 8b4c7d24be9..14d4272fdf6 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/ExecutorImplITCase.java @@ -29,6 +29,7 @@ import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.StateBackendOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.configuration.WebOptions; +import org.apache.flink.runtime.rest.HttpHeader; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; import org.apache.flink.table.api.DataTypes; @@ -576,6 +577,24 @@ class ExecutorImplITCase { .isFalse(); } + @Test + void testCustomHeadersSupport() { + final Map<String, String> envMap = + Collections.singletonMap( + ConfigConstants.FLINK_REST_CLIENT_HEADERS, + "Cookie:authCookie=12:345\nCustomHeader:value1,value2\nMalformedHeaderSkipped"); + org.apache.flink.core.testutils.CommonTestUtils.setEnv(envMap); + try (final ExecutorImpl executor = (ExecutorImpl) createTestServiceExecutor()) { + final List<HttpHeader> customHttpHeaders = + new ArrayList<>(executor.getCustomHttpHeaders()); + final HttpHeader expectedHeader1 = new HttpHeader("Cookie", "authCookie=12:345"); + final HttpHeader expectedHeader2 = new HttpHeader("CustomHeader", "value1,value2"); + assertThat(customHttpHeaders).hasSize(2); + assertThat(customHttpHeaders.get(0)).isEqualTo(expectedHeader1); + assertThat(customHttpHeaders.get(1)).isEqualTo(expectedHeader2); + } + } + // -------------------------------------------------------------------------------------------- // Helper method // --------------------------------------------------------------------------------------------