Repository: flink Updated Branches: refs/heads/master a19d8bfa9 -> 755ae5192
[FLINK-7872] Allow to pass in additional HTTP headers HandlerUtils#sendResponse now accepts a map of additional http response headers and their values. This allows to set additional headers such as the ACCESS_CONTROL_ALLOW_ORIGIN header and its value. This closes #4859. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eddb5b0a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eddb5b0a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eddb5b0a Branch: refs/heads/master Commit: eddb5b0a4c44443acc5ec2a07686a50b088303f8 Parents: 865ce91 Author: Till Rohrmann <[email protected]> Authored: Thu Oct 19 11:06:41 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Tue Nov 7 15:07:41 2017 +0100 ---------------------------------------------------------------------- .../program/rest/RestClusterClientTest.java | 1 + .../webmonitor/RuntimeMonitorHandler.java | 7 ++- .../runtime/webmonitor/RedirectHandlerTest.java | 3 +- .../dispatcher/DispatcherRestEndpoint.java | 31 +++++++++-- .../rest/handler/AbstractRestHandler.java | 56 ++++++++++++++++---- .../rest/handler/LegacyRestHandlerAdapter.java | 4 +- .../rest/handler/PipelineErrorHandler.java | 5 +- .../runtime/rest/handler/RedirectHandler.java | 25 ++++++--- .../rest/handler/RestHandlerConfiguration.java | 26 ++++++++- .../runtime/rest/handler/RouterHandler.java | 9 +++- .../job/AbstractExecutionGraphHandler.java | 4 +- .../rest/handler/job/BlobServerPortHandler.java | 9 +++- .../rest/handler/job/JobConfigHandler.java | 3 ++ .../rest/handler/job/JobExceptionsHandler.java | 3 ++ .../rest/handler/job/JobPlanHandler.java | 3 ++ .../rest/handler/job/JobSubmitHandler.java | 9 +++- .../rest/handler/job/JobTerminationHandler.java | 4 +- .../job/JobVertexAccumulatorsHandler.java | 11 +++- .../checkpoints/AbstractCheckpointHandler.java | 4 +- .../checkpoints/CheckpointConfigHandler.java | 3 ++ .../CheckpointStatisticDetailsHandler.java | 12 ++++- .../CheckpointingStatisticsHandler.java | 4 +- .../TaskCheckpointStatisticDetailsHandler.java | 12 ++++- .../handler/legacy/TaskManagerLogHandler.java | 3 +- .../legacy/files/StaticFileServerHandler.java | 3 +- .../runtime/rest/handler/util/HandlerUtils.java | 45 +++++++++++++--- .../flink/runtime/rest/RestEndpointITCase.java | 1 + .../handler/job/BlobServerPortHandlerTest.java | 8 ++- .../rest/handler/job/JobSubmitHandlerTest.java | 7 ++- 29 files changed, 264 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java ---------------------------------------------------------------------- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index cd6fc0c..3b6ddaf 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -325,6 +325,7 @@ public class RestClusterClientTest extends TestLogger { CompletableFuture.completedFuture(restAddress), mockGatewayRetriever, RpcUtils.INF_TIMEOUT, + Collections.emptyMap(), headers); } } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java index 993a225..fd6b2ca 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java @@ -42,6 +42,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.net.URLDecoder; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -74,7 +75,11 @@ public class RuntimeMonitorHandler extends RedirectHandler<JobManagerGateway> im CompletableFuture<String> localJobManagerAddressFuture, Time timeout) { - super(localJobManagerAddressFuture, retriever, timeout); + super( + localJobManagerAddressFuture, + retriever, + timeout, + Collections.singletonMap(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, cfg.getAllowOrigin())); this.handler = checkNotNull(handler); this.allowOrigin = cfg.getAllowOrigin(); } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java index 4808781..3a976e4 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java @@ -40,6 +40,7 @@ import org.junit.Test; import javax.annotation.Nonnull; +import java.util.Collections; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -143,7 +144,7 @@ public class RedirectHandlerTest extends TestLogger { @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<RestfulGateway> leaderRetriever, @Nonnull Time timeout) { - super(localAddressFuture, leaderRetriever, timeout); + super(localAddressFuture, leaderRetriever, timeout, Collections.emptyMap()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java index 447cc0e..7f9c148 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java @@ -74,6 +74,7 @@ import java.io.File; import java.io.IOException; import java.util.ArrayList; import java.util.Collection; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -116,11 +117,13 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { ArrayList<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = new ArrayList<>(3); final Time timeout = restConfiguration.getTimeout(); + final Map<String, String> responseHeaders = restConfiguration.getResponseHeaders(); LegacyRestHandlerAdapter<DispatcherGateway, ClusterOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = new LegacyRestHandlerAdapter<>( restAddressFuture, leaderRetriever, timeout, + responseHeaders, ClusterOverviewHeaders.getInstance(), new ClusterOverviewHandler( executor, @@ -130,6 +133,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { restAddressFuture, leaderRetriever, timeout, + responseHeaders, DashboardConfigurationHeaders.getInstance(), new DashboardConfigHandler( executor, @@ -139,6 +143,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { restAddressFuture, leaderRetriever, timeout, + responseHeaders, CurrentJobsOverviewHandlerHeaders.getInstance(), new CurrentJobsOverviewHandler( executor, @@ -150,6 +155,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { restAddressFuture, leaderRetriever, timeout, + responseHeaders, ClusterConfigurationInfoHeaders.getInstance(), new ClusterConfigHandler( executor, @@ -159,12 +165,14 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { restAddressFuture, leaderRetriever, timeout, + responseHeaders, JobTerminationHeaders.getInstance()); JobConfigHandler jobConfigHandler = new JobConfigHandler( restAddressFuture, leaderRetriever, timeout, + responseHeaders, JobConfigHeaders.getInstance(), executionGraphCache, executor); @@ -173,6 +181,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { restAddressFuture, leaderRetriever, timeout, + responseHeaders, CheckpointConfigHeaders.getInstance(), executionGraphCache, executor); @@ -181,6 +190,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { restAddressFuture, leaderRetriever, timeout, + responseHeaders, CheckpointingStatisticsHeaders.getInstance(), executionGraphCache, executor); @@ -189,6 +199,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { restAddressFuture, leaderRetriever, timeout, + responseHeaders, CheckpointStatisticDetailsHeaders.getInstance(), executionGraphCache, executor, @@ -198,6 +209,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { restAddressFuture, leaderRetriever, timeout, + responseHeaders, JobPlanHeaders.getInstance(), executionGraphCache, executor); @@ -206,6 +218,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { restAddressFuture, leaderRetriever, timeout, + responseHeaders, TaskCheckpointStatisticsHeaders.getInstance(), executionGraphCache, executor, @@ -215,6 +228,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { restAddressFuture, leaderRetriever, timeout, + responseHeaders, JobExceptionsHeaders.getInstance(), executionGraphCache, executor); @@ -223,10 +237,23 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { restAddressFuture, leaderRetriever, timeout, + responseHeaders, JobVertexAccumulatorsHeaders.getInstance(), executionGraphCache, executor); + BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders); + + JobSubmitHandler jobSubmitHandler = new JobSubmitHandler( + restAddressFuture, + leaderRetriever, + timeout, + responseHeaders); + final File tmpDir = restConfiguration.getTmpDir(); Optional<StaticFileServerHandler<DispatcherGateway>> optWebContent; @@ -255,11 +282,7 @@ public class DispatcherRestEndpoint extends RestServerEndpoint { handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), taskCheckpointStatisticDetailsHandler)); handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), jobExceptionsHandler)); handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), jobVertexAccumulatorsHandler)); - - BlobServerPortHandler blobServerPortHandler = new BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout); handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), blobServerPortHandler)); - - JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(restAddressFuture, leaderRetriever, timeout); handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler)); // This handler MUST be added last, as it otherwise masks all subsequent GET handlers http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java index 01a8c17..abde346 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java @@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -70,8 +71,9 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re CompletableFuture<String> localRestAddress, GatewayRetriever<? extends T> leaderRetriever, Time timeout, + Map<String, String> responseHeaders, MessageHeaders<R, P, M> messageHeaders) { - super(localRestAddress, leaderRetriever, timeout); + super(localRestAddress, leaderRetriever, timeout, responseHeaders); this.messageHeaders = messageHeaders; } @@ -92,7 +94,12 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re // The RestServerEndpoint defines a HttpObjectAggregator in the pipeline that always returns // FullHttpRequests. log.error("Implementation error: Received a request that wasn't a FullHttpRequest."); - HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody("Bad request received."), + HttpResponseStatus.BAD_REQUEST, + responseHeaders); return; } @@ -104,7 +111,12 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re request = mapper.readValue("{}", messageHeaders.getRequestClass()); } catch (JsonParseException | JsonMappingException je) { log.error("Request did not conform to expected format.", je); - HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Bad request received."), HttpResponseStatus.BAD_REQUEST); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody("Bad request received."), + HttpResponseStatus.BAD_REQUEST, + responseHeaders); return; } } else { @@ -113,7 +125,12 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re request = mapper.readValue(in, messageHeaders.getRequestClass()); } catch (JsonParseException | JsonMappingException je) { log.error("Failed to read request.", je); - HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody(String.format("Request did not match expected format %s.", messageHeaders.getRequestClass().getSimpleName())), HttpResponseStatus.BAD_REQUEST); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody(String.format("Request did not match expected format %s.", messageHeaders.getRequestClass().getSimpleName())), + HttpResponseStatus.BAD_REQUEST, + responseHeaders); return; } } @@ -129,7 +146,8 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re ctx, httpRequest, new ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", hre.getMessage())), - HttpResponseStatus.BAD_REQUEST); + HttpResponseStatus.BAD_REQUEST, + responseHeaders); return; } @@ -151,18 +169,38 @@ public abstract class AbstractRestHandler<T extends RestfulGateway, R extends Re log.error("Exception occurred in REST handler.", error); - HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus()); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody(rhe.getMessage()), + rhe.getHttpResponseStatus(), + responseHeaders); } else { log.error("Implementation error: Unhandled exception.", error); - HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody("Internal server error."), + HttpResponseStatus.INTERNAL_SERVER_ERROR, + responseHeaders); } } else { - HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode()); + HandlerUtils.sendResponse( + ctx, + httpRequest, + resp, + messageHeaders.getResponseStatusCode(), + responseHeaders); } }); } catch (Throwable e) { log.error("Request processing failed.", e); - HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR); + HandlerUtils.sendErrorResponse( + ctx, + httpRequest, + new ErrorResponseBody("Internal server error."), + HttpResponseStatus.INTERNAL_SERVER_ERROR, + responseHeaders); } } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java index e9eaff7..7ff9d3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java @@ -29,6 +29,7 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -46,9 +47,10 @@ public class LegacyRestHandlerAdapter<T extends RestfulGateway, R extends Respon CompletableFuture<String> localRestAddress, GatewayRetriever<T> leaderRetriever, Time timeout, + Map<String, String> headers, MessageHeaders<EmptyRequestBody, R, M> messageHeaders, LegacyRestHandler<T, R, M> legacyRestHandler) { - super(localRestAddress, leaderRetriever, timeout, messageHeaders); + super(localRestAddress, leaderRetriever, timeout, headers, messageHeaders); this.legacyRestHandler = Preconditions.checkNotNull(legacyRestHandler); } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java index b43afdc..046118a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java @@ -29,6 +29,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt import org.slf4j.Logger; +import java.util.Collections; + /** * This is the last handler in the pipeline. It logs all error messages. */ @@ -50,7 +52,8 @@ public class PipelineErrorHandler extends SimpleChannelInboundHandler<HttpReques ctx, message, new ErrorResponseBody("Bad request received."), - HttpResponseStatus.BAD_REQUEST); + HttpResponseStatus.BAD_REQUEST, + Collections.emptyMap()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java index 40b6776..6a3fb7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java @@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; +import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -62,15 +63,19 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh protected final Time timeout; + protected final Map<String, String> responseHeaders; + private String localAddress; protected RedirectHandler( @Nonnull CompletableFuture<String> localAddressFuture, @Nonnull GatewayRetriever<? extends T> leaderRetriever, - @Nonnull Time timeout) { + @Nonnull Time timeout, + @Nonnull Map<String, String> responseHeaders) { this.localAddressFuture = Preconditions.checkNotNull(localAddressFuture); this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever); this.timeout = Preconditions.checkNotNull(timeout); + this.responseHeaders = Preconditions.checkNotNull(responseHeaders); localAddress = null; } @@ -90,7 +95,8 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh channelHandlerContext, routed.request(), new ErrorResponseBody("Fatal error. Could not obtain local address. Please try to refresh."), - HttpResponseStatus.INTERNAL_SERVER_ERROR); + HttpResponseStatus.INTERNAL_SERVER_ERROR, + responseHeaders); return; } @@ -120,7 +126,8 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh channelHandlerContext, routed.request(), new ErrorResponseBody("Could not retrieve the redirect address of the current leader. Please try to refresh."), - HttpResponseStatus.INTERNAL_SERVER_ERROR); + HttpResponseStatus.INTERNAL_SERVER_ERROR, + responseHeaders); } else if (optRedirectAddress.isPresent()) { response = HandlerRedirectUtils.getRedirectResponse( optRedirectAddress.get(), @@ -136,7 +143,8 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh channelHandlerContext, routed.request(), new ErrorResponseBody("Error while responding to the request."), - HttpResponseStatus.INTERNAL_SERVER_ERROR); + HttpResponseStatus.INTERNAL_SERVER_ERROR, + responseHeaders); } } } finally { @@ -152,7 +160,8 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh channelHandlerContext, routed.request(), new ErrorResponseBody("Service temporarily unavailable due to an ongoing leader election. Please refresh."), - HttpResponseStatus.SERVICE_UNAVAILABLE)); + HttpResponseStatus.SERVICE_UNAVAILABLE, + responseHeaders)); } catch (Throwable throwable) { logger.warn("Error occurred while processing web request.", throwable); @@ -161,14 +170,16 @@ public abstract class RedirectHandler<T extends RestfulGateway> extends SimpleCh channelHandlerContext, routed.request(), new ErrorResponseBody("Error occurred in RedirectHandler: " + throwable.getMessage() + '.'), - HttpResponseStatus.INTERNAL_SERVER_ERROR); + HttpResponseStatus.INTERNAL_SERVER_ERROR, + responseHeaders); } } else { HandlerUtils.sendErrorResponse( channelHandlerContext, routed.request(), new ErrorResponseBody("Local address has not been resolved. This indicates an internal error."), - HttpResponseStatus.INTERNAL_SERVER_ERROR); + HttpResponseStatus.INTERNAL_SERVER_ERROR, + responseHeaders); } } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java index 06d71f8..acdd63c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java @@ -23,7 +23,11 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.WebOptions; import org.apache.flink.util.Preconditions; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders; + import java.io.File; +import java.util.Collections; +import java.util.Map; import java.util.UUID; /** @@ -39,11 +43,14 @@ public class RestHandlerConfiguration { private final File tmpDir; + private final Map<String, String> responseHeaders; + public RestHandlerConfiguration( long refreshInterval, int maxCheckpointStatisticCacheEntries, Time timeout, - File tmpDir) { + File tmpDir, + Map<String, String> responseHeaders) { Preconditions.checkArgument(refreshInterval > 0L, "The refresh interval (ms) should be larger than 0."); this.refreshInterval = refreshInterval; @@ -51,6 +58,8 @@ public class RestHandlerConfiguration { this.timeout = Preconditions.checkNotNull(timeout); this.tmpDir = Preconditions.checkNotNull(tmpDir); + + this.responseHeaders = Preconditions.checkNotNull(responseHeaders); } public long getRefreshInterval() { @@ -69,6 +78,10 @@ public class RestHandlerConfiguration { return tmpDir; } + public Map<String, String> getResponseHeaders() { + return Collections.unmodifiableMap(responseHeaders); + } + public static RestHandlerConfiguration fromConfiguration(Configuration configuration) { final long refreshInterval = configuration.getLong(WebOptions.REFRESH_INTERVAL); @@ -79,6 +92,15 @@ public class RestHandlerConfiguration { final String rootDir = "flink-web-" + UUID.randomUUID(); final File tmpDir = new File(configuration.getString(WebOptions.TMP_DIR), rootDir); - return new RestHandlerConfiguration(refreshInterval, maxCheckpointStatisticCacheEntries, timeout, tmpDir); + final Map<String, String> responseHeaders = Collections.singletonMap( + HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, + configuration.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN)); + + return new RestHandlerConfiguration( + refreshInterval, + maxCheckpointStatisticCacheEntries, + timeout, + tmpDir, + responseHeaders); } } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java index cfc456f..d1d0837 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java @@ -30,6 +30,8 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.Collections; + /** * This class is an extension of {@link Handler} that replaces the standard error response to be identical with those * sent by the {@link AbstractRestHandler}. @@ -43,6 +45,11 @@ public class RouterHandler extends Handler { @Override protected void respondNotFound(ChannelHandlerContext ctx, HttpRequest request) { - HandlerUtils.sendErrorResponse(ctx, request, new ErrorResponseBody("Not found."), HttpResponseStatus.NOT_FOUND); + HandlerUtils.sendErrorResponse( + ctx, + request, + new ErrorResponseBody("Not found."), + HttpResponseStatus.NOT_FOUND, + Collections.emptyMap()); } } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java index 63c3e35..7192832 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java @@ -36,6 +36,7 @@ import org.apache.flink.util.Preconditions; import javax.annotation.Nonnull; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -55,10 +56,11 @@ public abstract class AbstractExecutionGraphHandler<R extends ResponseBody, M ex CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, + Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, R, M> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { - super(localRestAddress, leaderRetriever, timeout, messageHeaders); + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders); this.executionGraphCache = Preconditions.checkNotNull(executionGraphCache); this.executor = Preconditions.checkNotNull(executor); http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java index cdf562f..1d14563 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java @@ -34,6 +34,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt import javax.annotation.Nonnull; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; @@ -42,8 +43,12 @@ import java.util.concurrent.CompletionException; */ public final class BlobServerPortHandler extends AbstractRestHandler<DispatcherGateway, EmptyRequestBody, BlobServerPortResponseBody, EmptyMessageParameters> { - public BlobServerPortHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<DispatcherGateway> leaderRetriever, Time timeout) { - super(localRestAddress, leaderRetriever, timeout, BlobServerPortHeaders.getInstance()); + public BlobServerPortHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<DispatcherGateway> leaderRetriever, + Time timeout, + Map<String, String> headers) { + super(localRestAddress, leaderRetriever, timeout, headers, BlobServerPortHeaders.getInstance()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java index f27d84f..7154246 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -42,6 +43,7 @@ public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInf CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, + Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, JobConfigInfo, JobMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { @@ -50,6 +52,7 @@ public class JobConfigHandler extends AbstractExecutionGraphHandler<JobConfigInf localRestAddress, leaderRetriever, timeout, + responseHeaders, messageHeaders, executionGraphCache, executor); http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java index feabbea..62f3e5e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java @@ -36,6 +36,7 @@ import org.apache.flink.util.ExceptionUtils; import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -50,6 +51,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExcep CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, + Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, JobExceptionsInfo, JobMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { @@ -58,6 +60,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphHandler<JobExcep localRestAddress, leaderRetriever, timeout, + responseHeaders, messageHeaders, executionGraphCache, executor); http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java index c8e6f8b..e7a30fb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -41,6 +42,7 @@ public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, J CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, + Map<String, String> headers, MessageHeaders<EmptyRequestBody, JobPlanInfo, JobMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { @@ -49,6 +51,7 @@ public class JobPlanHandler extends AbstractExecutionGraphHandler<JobPlanInfo, J localRestAddress, leaderRetriever, timeout, + headers, messageHeaders, executionGraphCache, executor); http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java index f810b5a..efb162e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java @@ -36,6 +36,7 @@ import javax.annotation.Nonnull; import java.io.ByteArrayInputStream; import java.io.ObjectInputStream; +import java.util.Map; import java.util.concurrent.CompletableFuture; /** @@ -43,8 +44,12 @@ import java.util.concurrent.CompletableFuture; */ public final class JobSubmitHandler extends AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, JobSubmitResponseBody, EmptyMessageParameters> { - public JobSubmitHandler(CompletableFuture<String> localRestAddress, GatewayRetriever<DispatcherGateway> leaderRetriever, Time timeout) { - super(localRestAddress, leaderRetriever, timeout, JobSubmitHeaders.getInstance()); + public JobSubmitHandler( + CompletableFuture<String> localRestAddress, + GatewayRetriever<DispatcherGateway> leaderRetriever, + Time timeout, + Map<String, String> headers) { + super(localRestAddress, leaderRetriever, timeout, headers, JobSubmitHeaders.getInstance()); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java index ffd5b4c..0998177 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java @@ -39,6 +39,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.TimeoutException; @@ -52,8 +53,9 @@ public class JobTerminationHandler extends AbstractRestHandler<DispatcherGateway CompletableFuture<String> localRestAddress, GatewayRetriever<DispatcherGateway> leaderRetriever, Time timeout, + Map<String, String> headers, MessageHeaders<EmptyRequestBody, EmptyResponseBody, JobTerminationMessageParameters> messageHeaders) { - super(localRestAddress, leaderRetriever, timeout, messageHeaders); + super(localRestAddress, leaderRetriever, timeout, headers, messageHeaders); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java index f7c9b49..55c465c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; import java.util.ArrayList; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -49,10 +50,18 @@ public class JobVertexAccumulatorsHandler extends AbstractExecutionGraphHandler< CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, + Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, JobVertexAccumulatorsInfo, JobVertexMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { - super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + super( + localRestAddress, + leaderRetriever, + timeout, + responseHeaders, + messageHeaders, + executionGraphCache, + executor); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java index 23d09f6..dcc16fd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java @@ -37,6 +37,7 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -53,11 +54,12 @@ public abstract class AbstractCheckpointHandler<R extends ResponseBody, M extend CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, + Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, R, M> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor, CheckpointStatsCache checkpointStatsCache) { - super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); this.checkpointStatsCache = Preconditions.checkNotNull(checkpointStatsCache); } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java index 1efa7af..91c9ae1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java @@ -35,6 +35,7 @@ import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -47,6 +48,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<Check CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, + Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, JobMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { @@ -54,6 +56,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphHandler<Check localRestAddress, leaderRetriever, timeout, + responseHeaders, messageHeaders, executionGraphCache, executor); http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java index e6b9f6b..2816336 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java @@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics; import org.apache.flink.runtime.webmonitor.RestfulGateway; import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -41,11 +42,20 @@ public class CheckpointStatisticDetailsHandler extends AbstractCheckpointHandler CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, + Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, CheckpointStatistics, CheckpointMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor, CheckpointStatsCache checkpointStatsCache) { - super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor, checkpointStatsCache); + super( + localRestAddress, + leaderRetriever, + timeout, + responseHeaders, + messageHeaders, + executionGraphCache, + executor, + checkpointStatsCache); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java index a8514d5..b9db367 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java @@ -44,6 +44,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -56,10 +57,11 @@ public class CheckpointingStatisticsHandler extends AbstractExecutionGraphHandle CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, + Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, CheckpointingStatistics, JobMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor) { - super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor); + super(localRestAddress, leaderRetriever, timeout, responseHeaders, messageHeaders, executionGraphCache, executor); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java index 5479159..cff3bf0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java @@ -41,6 +41,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt import java.util.ArrayList; import java.util.List; +import java.util.Map; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -53,11 +54,20 @@ public class TaskCheckpointStatisticDetailsHandler extends AbstractCheckpointHan CompletableFuture<String> localRestAddress, GatewayRetriever<? extends RestfulGateway> leaderRetriever, Time timeout, + Map<String, String> responseHeaders, MessageHeaders<EmptyRequestBody, TaskCheckpointStatisticsWithSubtaskDetails, TaskCheckpointMessageParameters> messageHeaders, ExecutionGraphCache executionGraphCache, Executor executor, CheckpointStatsCache checkpointStatsCache) { - super(localRestAddress, leaderRetriever, timeout, messageHeaders, executionGraphCache, executor, checkpointStatsCache); + super( + localRestAddress, + leaderRetriever, + timeout, + responseHeaders, + messageHeaders, + executionGraphCache, + executor, + checkpointStatsCache); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java index 118e356..e4ad461 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java @@ -69,6 +69,7 @@ import java.io.UnsupportedEncodingException; import java.net.InetSocketAddress; import java.net.URLDecoder; import java.nio.channels.FileChannel; +import java.util.Collections; import java.util.HashMap; import java.util.Objects; import java.util.Optional; @@ -125,7 +126,7 @@ public class TaskManagerLogHandler extends RedirectHandler<JobManagerGateway> im Time timeout, FileMode fileMode, Configuration config) { - super(localJobManagerAddressPromise, retriever, timeout); + super(localJobManagerAddressPromise, retriever, timeout, Collections.emptyMap()); this.executor = checkNotNull(executor); this.config = config; http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java index da115ee..650647b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java @@ -64,6 +64,7 @@ import java.nio.file.Files; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; +import java.util.Collections; import java.util.Date; import java.util.GregorianCalendar; import java.util.Locale; @@ -114,7 +115,7 @@ public class StaticFileServerHandler<T extends RestfulGateway> extends RedirectH Time timeout, File rootPath) throws IOException { - super(localJobManagerAddressFuture, retriever, timeout); + super(localJobManagerAddressFuture, retriever, timeout, Collections.emptyMap()); this.rootPath = checkNotNull(rootPath).getCanonicalFile(); } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java index ea0b934..604c0b8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java @@ -43,6 +43,7 @@ import javax.annotation.Nonnull; import java.io.IOException; import java.io.StringWriter; +import java.util.Map; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION; import static org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE; @@ -64,22 +65,34 @@ public class HandlerUtils { * @param httpRequest originating http request * @param response which should be sent * @param statusCode of the message to send + * @param headers additional header values * @param <P> type of the response */ public static <P extends ResponseBody> void sendResponse( ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, P response, - HttpResponseStatus statusCode) { + HttpResponseStatus statusCode, + Map<String, String> headers) { StringWriter sw = new StringWriter(); try { mapper.writeValue(sw, response); } catch (IOException ioe) { LOG.error("Internal server error. Could not map response to JSON.", ioe); - sendErrorResponse(channelHandlerContext, httpRequest, new ErrorResponseBody("Internal server error. Could not map response to JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR); + sendErrorResponse( + channelHandlerContext, + httpRequest, + new ErrorResponseBody("Internal server error. Could not map response to JSON."), + HttpResponseStatus.INTERNAL_SERVER_ERROR, + headers); return; } - sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode); + sendResponse( + channelHandlerContext, + httpRequest, + sw.toString(), + statusCode, + headers); } /** @@ -89,12 +102,14 @@ public class HandlerUtils { * @param httpRequest originating http request * @param errorMessage which should be sent * @param statusCode of the message to send + * @param headers additional header values */ public static void sendErrorResponse( ChannelHandlerContext channelHandlerContext, HttpRequest httpRequest, ErrorResponseBody errorMessage, - HttpResponseStatus statusCode) { + HttpResponseStatus statusCode, + Map<String, String> headers) { StringWriter sw = new StringWriter(); try { @@ -102,9 +117,19 @@ public class HandlerUtils { } catch (IOException e) { // this should never happen LOG.error("Internal server error. Could not map error response to JSON.", e); - sendResponse(channelHandlerContext, httpRequest, "Internal server error. Could not map error response to JSON.", HttpResponseStatus.INTERNAL_SERVER_ERROR); + sendResponse( + channelHandlerContext, + httpRequest, + "Internal server error. Could not map error response to JSON.", + HttpResponseStatus.INTERNAL_SERVER_ERROR, + headers); } - sendResponse(channelHandlerContext, httpRequest, sw.toString(), statusCode); + sendResponse( + channelHandlerContext, + httpRequest, + sw.toString(), + statusCode, + headers); } /** @@ -114,16 +139,22 @@ public class HandlerUtils { * @param httpRequest originating http request * @param message which should be sent * @param statusCode of the message to send + * @param headers additional header values */ public static void sendResponse( @Nonnull ChannelHandlerContext channelHandlerContext, @Nonnull HttpRequest httpRequest, @Nonnull String message, - @Nonnull HttpResponseStatus statusCode) { + @Nonnull HttpResponseStatus statusCode, + @Nonnull Map<String, String> headers) { HttpResponse response = new DefaultHttpResponse(HTTP_1_1, statusCode); response.headers().set(CONTENT_TYPE, "application/json"); + for (Map.Entry<String, String> headerEntry : headers.entrySet()) { + response.headers().set(headerEntry.getKey(), headerEntry.getValue()); + } + if (HttpHeaders.isKeepAlive(httpRequest)) { response.headers().set(CONNECTION, HttpHeaders.Values.KEEP_ALIVE); } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java index 00fda57..4d3c6b5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java @@ -222,6 +222,7 @@ public class RestEndpointITCase extends TestLogger { localAddressFuture, leaderRetriever, timeout, + Collections.emptyMap(), new TestHeaders()); } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java index 0ea18db..15c2eb4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java @@ -35,6 +35,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -58,7 +59,8 @@ public class BlobServerPortHandlerTest extends TestLogger { BlobServerPortHandler handler = new BlobServerPortHandler( CompletableFuture.completedFuture("http://localhost:1234"), mockGatewayRetriever, - RpcUtils.INF_TIMEOUT); + RpcUtils.INF_TIMEOUT, + Collections.emptyMap()); BlobServerPortResponseBody portResponse = handler .handleRequest(new HandlerRequest<>(EmptyRequestBody.getInstance(), EmptyMessageParameters.getInstance()), mockGateway) @@ -77,7 +79,8 @@ public class BlobServerPortHandlerTest extends TestLogger { BlobServerPortHandler handler = new BlobServerPortHandler( CompletableFuture.completedFuture("http://localhost:1234"), mockGatewayRetriever, - RpcUtils.INF_TIMEOUT); + RpcUtils.INF_TIMEOUT, + Collections.emptyMap()); try { handler @@ -93,5 +96,6 @@ public class BlobServerPortHandlerTest extends TestLogger { } private static class TestException extends Exception { + private static final long serialVersionUID = -7064446788277853899L; } } http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java index 1196d40..212af5f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java @@ -35,6 +35,7 @@ import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt import org.junit.Assert; import org.junit.Test; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import static org.mockito.Matchers.any; @@ -55,7 +56,8 @@ public class JobSubmitHandlerTest extends TestLogger { JobSubmitHandler handler = new JobSubmitHandler( CompletableFuture.completedFuture("http://localhost:1234"), mockGatewayRetriever, - RpcUtils.INF_TIMEOUT); + RpcUtils.INF_TIMEOUT, + Collections.emptyMap()); JobSubmitRequestBody request = new JobSubmitRequestBody(new byte[0]); @@ -76,7 +78,8 @@ public class JobSubmitHandlerTest extends TestLogger { JobSubmitHandler handler = new JobSubmitHandler( CompletableFuture.completedFuture("http://localhost:1234"), mockGatewayRetriever, - RpcUtils.INF_TIMEOUT); + RpcUtils.INF_TIMEOUT, + Collections.emptyMap()); JobGraph job = new JobGraph("testjob"); JobSubmitRequestBody request = new JobSubmitRequestBody(job);
