Repository: flink
Updated Branches:
  refs/heads/master a19d8bfa9 -> 755ae5192


[FLINK-7872] Allow to pass in additional HTTP headers

HandlerUtils#sendResponse now accepts a map of additional http response headers
and their values. This allows to set additional headers such as the
ACCESS_CONTROL_ALLOW_ORIGIN header and its value.

This closes #4859.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/eddb5b0a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/eddb5b0a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/eddb5b0a

Branch: refs/heads/master
Commit: eddb5b0a4c44443acc5ec2a07686a50b088303f8
Parents: 865ce91
Author: Till Rohrmann <[email protected]>
Authored: Thu Oct 19 11:06:41 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Tue Nov 7 15:07:41 2017 +0100

----------------------------------------------------------------------
 .../program/rest/RestClusterClientTest.java     |  1 +
 .../webmonitor/RuntimeMonitorHandler.java       |  7 ++-
 .../runtime/webmonitor/RedirectHandlerTest.java |  3 +-
 .../dispatcher/DispatcherRestEndpoint.java      | 31 +++++++++--
 .../rest/handler/AbstractRestHandler.java       | 56 ++++++++++++++++----
 .../rest/handler/LegacyRestHandlerAdapter.java  |  4 +-
 .../rest/handler/PipelineErrorHandler.java      |  5 +-
 .../runtime/rest/handler/RedirectHandler.java   | 25 ++++++---
 .../rest/handler/RestHandlerConfiguration.java  | 26 ++++++++-
 .../runtime/rest/handler/RouterHandler.java     |  9 +++-
 .../job/AbstractExecutionGraphHandler.java      |  4 +-
 .../rest/handler/job/BlobServerPortHandler.java |  9 +++-
 .../rest/handler/job/JobConfigHandler.java      |  3 ++
 .../rest/handler/job/JobExceptionsHandler.java  |  3 ++
 .../rest/handler/job/JobPlanHandler.java        |  3 ++
 .../rest/handler/job/JobSubmitHandler.java      |  9 +++-
 .../rest/handler/job/JobTerminationHandler.java |  4 +-
 .../job/JobVertexAccumulatorsHandler.java       | 11 +++-
 .../checkpoints/AbstractCheckpointHandler.java  |  4 +-
 .../checkpoints/CheckpointConfigHandler.java    |  3 ++
 .../CheckpointStatisticDetailsHandler.java      | 12 ++++-
 .../CheckpointingStatisticsHandler.java         |  4 +-
 .../TaskCheckpointStatisticDetailsHandler.java  | 12 ++++-
 .../handler/legacy/TaskManagerLogHandler.java   |  3 +-
 .../legacy/files/StaticFileServerHandler.java   |  3 +-
 .../runtime/rest/handler/util/HandlerUtils.java | 45 +++++++++++++---
 .../flink/runtime/rest/RestEndpointITCase.java  |  1 +
 .../handler/job/BlobServerPortHandlerTest.java  |  8 ++-
 .../rest/handler/job/JobSubmitHandlerTest.java  |  7 ++-
 29 files changed, 264 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
----------------------------------------------------------------------
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index cd6fc0c..3b6ddaf 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -325,6 +325,7 @@ public class RestClusterClientTest extends TestLogger {
                                CompletableFuture.completedFuture(restAddress),
                                mockGatewayRetriever,
                                RpcUtils.INF_TIMEOUT,
+                               Collections.emptyMap(),
                                headers);
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
index 993a225..fd6b2ca 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/RuntimeMonitorHandler.java
@@ -42,6 +42,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URLDecoder;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Optional;
@@ -74,7 +75,11 @@ public class RuntimeMonitorHandler extends 
RedirectHandler<JobManagerGateway> im
                        CompletableFuture<String> localJobManagerAddressFuture,
                        Time timeout) {
 
-               super(localJobManagerAddressFuture, retriever, timeout);
+               super(
+                       localJobManagerAddressFuture,
+                       retriever,
+                       timeout,
+                       
Collections.singletonMap(HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN, 
cfg.getAllowOrigin()));
                this.handler = checkNotNull(handler);
                this.allowOrigin = cfg.getAllowOrigin();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
index 4808781..3a976e4 100644
--- 
a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
+++ 
b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java
@@ -40,6 +40,7 @@ import org.junit.Test;
 
 import javax.annotation.Nonnull;
 
+import java.util.Collections;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 
@@ -143,7 +144,7 @@ public class RedirectHandlerTest extends TestLogger {
                                @Nonnull CompletableFuture<String> 
localAddressFuture,
                                @Nonnull GatewayRetriever<RestfulGateway> 
leaderRetriever,
                                @Nonnull Time timeout) {
-                       super(localAddressFuture, leaderRetriever, timeout);
+                       super(localAddressFuture, leaderRetriever, timeout, 
Collections.emptyMap());
                }
 
                @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
index 447cc0e..7f9c148 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherRestEndpoint.java
@@ -74,6 +74,7 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
@@ -116,11 +117,13 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                ArrayList<Tuple2<RestHandlerSpecification, 
ChannelInboundHandler>> handlers = new ArrayList<>(3);
 
                final Time timeout = restConfiguration.getTimeout();
+               final Map<String, String> responseHeaders = 
restConfiguration.getResponseHeaders();
 
                LegacyRestHandlerAdapter<DispatcherGateway, 
ClusterOverviewWithVersion, EmptyMessageParameters> clusterOverviewHandler = 
new LegacyRestHandlerAdapter<>(
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        ClusterOverviewHeaders.getInstance(),
                        new ClusterOverviewHandler(
                                executor,
@@ -130,6 +133,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        DashboardConfigurationHeaders.getInstance(),
                        new DashboardConfigHandler(
                                executor,
@@ -139,6 +143,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        CurrentJobsOverviewHandlerHeaders.getInstance(),
                        new CurrentJobsOverviewHandler(
                                executor,
@@ -150,6 +155,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        ClusterConfigurationInfoHeaders.getInstance(),
                        new ClusterConfigHandler(
                                executor,
@@ -159,12 +165,14 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        JobTerminationHeaders.getInstance());
 
                JobConfigHandler jobConfigHandler = new JobConfigHandler(
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        JobConfigHeaders.getInstance(),
                        executionGraphCache,
                        executor);
@@ -173,6 +181,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        CheckpointConfigHeaders.getInstance(),
                        executionGraphCache,
                        executor);
@@ -181,6 +190,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        CheckpointingStatisticsHeaders.getInstance(),
                        executionGraphCache,
                        executor);
@@ -189,6 +199,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        CheckpointStatisticDetailsHeaders.getInstance(),
                        executionGraphCache,
                        executor,
@@ -198,6 +209,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        JobPlanHeaders.getInstance(),
                        executionGraphCache,
                        executor);
@@ -206,6 +218,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        TaskCheckpointStatisticsHeaders.getInstance(),
                        executionGraphCache,
                        executor,
@@ -215,6 +228,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        JobExceptionsHeaders.getInstance(),
                        executionGraphCache,
                        executor);
@@ -223,10 +237,23 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                        restAddressFuture,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        JobVertexAccumulatorsHeaders.getInstance(),
                        executionGraphCache,
                        executor);
 
+               BlobServerPortHandler blobServerPortHandler = new 
BlobServerPortHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders);
+
+               JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
+                       restAddressFuture,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders);
+
                final File tmpDir = restConfiguration.getTmpDir();
 
                Optional<StaticFileServerHandler<DispatcherGateway>> 
optWebContent;
@@ -255,11 +282,7 @@ public class DispatcherRestEndpoint extends 
RestServerEndpoint {
                
handlers.add(Tuple2.of(TaskCheckpointStatisticsHeaders.getInstance(), 
taskCheckpointStatisticDetailsHandler));
                handlers.add(Tuple2.of(JobExceptionsHeaders.getInstance(), 
jobExceptionsHandler));
                
handlers.add(Tuple2.of(JobVertexAccumulatorsHeaders.getInstance(), 
jobVertexAccumulatorsHandler));
-
-               BlobServerPortHandler blobServerPortHandler = new 
BlobServerPortHandler(restAddressFuture, leaderRetriever, timeout);
                
handlers.add(Tuple2.of(blobServerPortHandler.getMessageHeaders(), 
blobServerPortHandler));
-
-               JobSubmitHandler jobSubmitHandler = new 
JobSubmitHandler(restAddressFuture, leaderRetriever, timeout);
                handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), 
jobSubmitHandler));
 
                // This handler MUST be added last, as it otherwise masks all 
subsequent GET handlers

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
index 01a8c17..abde346 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java
@@ -48,6 +48,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -70,8 +71,9 @@ public abstract class AbstractRestHandler<T extends 
RestfulGateway, R extends Re
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends T> leaderRetriever,
                        Time timeout,
+                       Map<String, String> responseHeaders,
                        MessageHeaders<R, P, M> messageHeaders) {
-               super(localRestAddress, leaderRetriever, timeout);
+               super(localRestAddress, leaderRetriever, timeout, 
responseHeaders);
                this.messageHeaders = messageHeaders;
        }
 
@@ -92,7 +94,12 @@ public abstract class AbstractRestHandler<T extends 
RestfulGateway, R extends Re
                                // The RestServerEndpoint defines a 
HttpObjectAggregator in the pipeline that always returns
                                // FullHttpRequests.
                                log.error("Implementation error: Received a 
request that wasn't a FullHttpRequest.");
-                               HandlerUtils.sendErrorResponse(ctx, 
httpRequest, new ErrorResponseBody("Bad request received."), 
HttpResponseStatus.BAD_REQUEST);
+                               HandlerUtils.sendErrorResponse(
+                                       ctx,
+                                       httpRequest,
+                                       new ErrorResponseBody("Bad request 
received."),
+                                       HttpResponseStatus.BAD_REQUEST,
+                                       responseHeaders);
                                return;
                        }
 
@@ -104,7 +111,12 @@ public abstract class AbstractRestHandler<T extends 
RestfulGateway, R extends Re
                                        request = mapper.readValue("{}", 
messageHeaders.getRequestClass());
                                } catch (JsonParseException | 
JsonMappingException je) {
                                        log.error("Request did not conform to 
expected format.", je);
-                                       HandlerUtils.sendErrorResponse(ctx, 
httpRequest, new ErrorResponseBody("Bad request received."), 
HttpResponseStatus.BAD_REQUEST);
+                                       HandlerUtils.sendErrorResponse(
+                                               ctx,
+                                               httpRequest,
+                                               new ErrorResponseBody("Bad 
request received."),
+                                               HttpResponseStatus.BAD_REQUEST,
+                                               responseHeaders);
                                        return;
                                }
                        } else {
@@ -113,7 +125,12 @@ public abstract class AbstractRestHandler<T extends 
RestfulGateway, R extends Re
                                        request = mapper.readValue(in, 
messageHeaders.getRequestClass());
                                } catch (JsonParseException | 
JsonMappingException je) {
                                        log.error("Failed to read request.", 
je);
-                                       HandlerUtils.sendErrorResponse(ctx, 
httpRequest, new ErrorResponseBody(String.format("Request did not match 
expected format %s.", messageHeaders.getRequestClass().getSimpleName())), 
HttpResponseStatus.BAD_REQUEST);
+                                       HandlerUtils.sendErrorResponse(
+                                               ctx,
+                                               httpRequest,
+                                               new 
ErrorResponseBody(String.format("Request did not match expected format %s.", 
messageHeaders.getRequestClass().getSimpleName())),
+                                               HttpResponseStatus.BAD_REQUEST,
+                                               responseHeaders);
                                        return;
                                }
                        }
@@ -129,7 +146,8 @@ public abstract class AbstractRestHandler<T extends 
RestfulGateway, R extends Re
                                        ctx,
                                        httpRequest,
                                        new 
ErrorResponseBody(String.format("Bad request, could not parse parameters: %s", 
hre.getMessage())),
-                                       HttpResponseStatus.BAD_REQUEST);
+                                       HttpResponseStatus.BAD_REQUEST,
+                                       responseHeaders);
                                return;
                        }
 
@@ -151,18 +169,38 @@ public abstract class AbstractRestHandler<T extends 
RestfulGateway, R extends Re
 
                                                log.error("Exception occurred 
in REST handler.", error);
 
-                                               
HandlerUtils.sendErrorResponse(ctx, httpRequest, new 
ErrorResponseBody(rhe.getMessage()), rhe.getHttpResponseStatus());
+                                               HandlerUtils.sendErrorResponse(
+                                                       ctx,
+                                                       httpRequest,
+                                                       new 
ErrorResponseBody(rhe.getMessage()),
+                                                       
rhe.getHttpResponseStatus(),
+                                                       responseHeaders);
                                        } else {
                                                log.error("Implementation 
error: Unhandled exception.", error);
-                                               
HandlerUtils.sendErrorResponse(ctx, httpRequest, new 
ErrorResponseBody("Internal server error."), 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                                               HandlerUtils.sendErrorResponse(
+                                                       ctx,
+                                                       httpRequest,
+                                                       new 
ErrorResponseBody("Internal server error."),
+                                                       
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                                                       responseHeaders);
                                        }
                                } else {
-                                       HandlerUtils.sendResponse(ctx, 
httpRequest, resp, messageHeaders.getResponseStatusCode());
+                                       HandlerUtils.sendResponse(
+                                               ctx,
+                                               httpRequest,
+                                               resp,
+                                               
messageHeaders.getResponseStatusCode(),
+                                               responseHeaders);
                                }
                        });
                } catch (Throwable e) {
                        log.error("Request processing failed.", e);
-                       HandlerUtils.sendErrorResponse(ctx, httpRequest, new 
ErrorResponseBody("Internal server error."), 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                       HandlerUtils.sendErrorResponse(
+                               ctx,
+                               httpRequest,
+                               new ErrorResponseBody("Internal server error."),
+                               HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                               responseHeaders);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
index e9eaff7..7ff9d3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/LegacyRestHandlerAdapter.java
@@ -29,6 +29,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -46,9 +47,10 @@ public class LegacyRestHandlerAdapter<T extends 
RestfulGateway, R extends Respon
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<T> leaderRetriever,
                        Time timeout,
+                       Map<String, String> headers,
                        MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
                        LegacyRestHandler<T, R, M> legacyRestHandler) {
-               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders);
+               super(localRestAddress, leaderRetriever, timeout, headers, 
messageHeaders);
 
                this.legacyRestHandler = 
Preconditions.checkNotNull(legacyRestHandler);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
index b43afdc..046118a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/PipelineErrorHandler.java
@@ -29,6 +29,8 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import org.slf4j.Logger;
 
+import java.util.Collections;
+
 /**
  * This is the last handler in the pipeline. It logs all error messages.
  */
@@ -50,7 +52,8 @@ public class PipelineErrorHandler extends 
SimpleChannelInboundHandler<HttpReques
                        ctx,
                        message,
                        new ErrorResponseBody("Bad request received."),
-                       HttpResponseStatus.BAD_REQUEST);
+                       HttpResponseStatus.BAD_REQUEST,
+                       Collections.emptyMap());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
index 40b6776..6a3fb7d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RedirectHandler.java
@@ -41,6 +41,7 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.Nonnull;
 
+import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
@@ -62,15 +63,19 @@ public abstract class RedirectHandler<T extends 
RestfulGateway> extends SimpleCh
 
        protected final Time timeout;
 
+       protected final Map<String, String> responseHeaders;
+
        private String localAddress;
 
        protected RedirectHandler(
                        @Nonnull CompletableFuture<String> localAddressFuture,
                        @Nonnull GatewayRetriever<? extends T> leaderRetriever,
-                       @Nonnull Time timeout) {
+                       @Nonnull Time timeout,
+                       @Nonnull Map<String, String> responseHeaders) {
                this.localAddressFuture = 
Preconditions.checkNotNull(localAddressFuture);
                this.leaderRetriever = 
Preconditions.checkNotNull(leaderRetriever);
                this.timeout = Preconditions.checkNotNull(timeout);
+               this.responseHeaders = 
Preconditions.checkNotNull(responseHeaders);
                localAddress = null;
        }
 
@@ -90,7 +95,8 @@ public abstract class RedirectHandler<T extends 
RestfulGateway> extends SimpleCh
                                                channelHandlerContext,
                                                routed.request(),
                                                new ErrorResponseBody("Fatal 
error. Could not obtain local address. Please try to refresh."),
-                                               
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                                               
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                                               responseHeaders);
 
                                        return;
                                }
@@ -120,7 +126,8 @@ public abstract class RedirectHandler<T extends 
RestfulGateway> extends SimpleCh
                                                                                
channelHandlerContext,
                                                                                
routed.request(),
                                                                                
new ErrorResponseBody("Could not retrieve the redirect address of the current 
leader. Please try to refresh."),
-                                                                               
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                                                                               
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                                                                               
responseHeaders);
                                                                        } else 
if (optRedirectAddress.isPresent()) {
                                                                                
response = HandlerRedirectUtils.getRedirectResponse(
                                                                                
        optRedirectAddress.get(),
@@ -136,7 +143,8 @@ public abstract class RedirectHandler<T extends 
RestfulGateway> extends SimpleCh
                                                                                
                channelHandlerContext,
                                                                                
                routed.request(),
                                                                                
        new ErrorResponseBody("Error while responding to the request."),
-                                                                               
        HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                                                                               
        HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                                                                               
        responseHeaders);
                                                                                
}
                                                                        }
                                                                } finally {
@@ -152,7 +160,8 @@ public abstract class RedirectHandler<T extends 
RestfulGateway> extends SimpleCh
                                                        channelHandlerContext,
                                                        routed.request(),
                                                        new 
ErrorResponseBody("Service temporarily unavailable due to an ongoing leader 
election. Please refresh."),
-                                                       
HttpResponseStatus.SERVICE_UNAVAILABLE));
+                                                       
HttpResponseStatus.SERVICE_UNAVAILABLE,
+                                                       responseHeaders));
 
                        } catch (Throwable throwable) {
                                logger.warn("Error occurred while processing 
web request.", throwable);
@@ -161,14 +170,16 @@ public abstract class RedirectHandler<T extends 
RestfulGateway> extends SimpleCh
                                        channelHandlerContext,
                                        routed.request(),
                                        new ErrorResponseBody("Error occurred 
in RedirectHandler: " + throwable.getMessage() + '.'),
-                                       
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                                       
HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                                       responseHeaders);
                        }
                } else {
                        HandlerUtils.sendErrorResponse(
                                channelHandlerContext,
                                routed.request(),
                                new ErrorResponseBody("Local address has not 
been resolved. This indicates an internal error."),
-                               HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                               HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                               responseHeaders);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
index 06d71f8..acdd63c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RestHandlerConfiguration.java
@@ -23,7 +23,11 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.WebOptions;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders;
+
 import java.io.File;
+import java.util.Collections;
+import java.util.Map;
 import java.util.UUID;
 
 /**
@@ -39,11 +43,14 @@ public class RestHandlerConfiguration {
 
        private final File tmpDir;
 
+       private final Map<String, String> responseHeaders;
+
        public RestHandlerConfiguration(
                        long refreshInterval,
                        int maxCheckpointStatisticCacheEntries,
                        Time timeout,
-                       File tmpDir) {
+                       File tmpDir,
+                       Map<String, String> responseHeaders) {
                Preconditions.checkArgument(refreshInterval > 0L, "The refresh 
interval (ms) should be larger than 0.");
                this.refreshInterval = refreshInterval;
 
@@ -51,6 +58,8 @@ public class RestHandlerConfiguration {
 
                this.timeout = Preconditions.checkNotNull(timeout);
                this.tmpDir = Preconditions.checkNotNull(tmpDir);
+
+               this.responseHeaders = 
Preconditions.checkNotNull(responseHeaders);
        }
 
        public long getRefreshInterval() {
@@ -69,6 +78,10 @@ public class RestHandlerConfiguration {
                return tmpDir;
        }
 
+       public Map<String, String> getResponseHeaders() {
+               return Collections.unmodifiableMap(responseHeaders);
+       }
+
        public static RestHandlerConfiguration fromConfiguration(Configuration 
configuration) {
                final long refreshInterval = 
configuration.getLong(WebOptions.REFRESH_INTERVAL);
 
@@ -79,6 +92,15 @@ public class RestHandlerConfiguration {
                final String rootDir = "flink-web-" + UUID.randomUUID();
                final File tmpDir = new 
File(configuration.getString(WebOptions.TMP_DIR), rootDir);
 
-               return new RestHandlerConfiguration(refreshInterval, 
maxCheckpointStatisticCacheEntries, timeout, tmpDir);
+               final Map<String, String> responseHeaders = 
Collections.singletonMap(
+                       HttpHeaders.Names.ACCESS_CONTROL_ALLOW_ORIGIN,
+                       
configuration.getString(WebOptions.ACCESS_CONTROL_ALLOW_ORIGIN));
+
+               return new RestHandlerConfiguration(
+                       refreshInterval,
+                       maxCheckpointStatisticCacheEntries,
+                       timeout,
+                       tmpDir,
+                       responseHeaders);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
index cfc456f..d1d0837 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/RouterHandler.java
@@ -30,6 +30,8 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.util.Collections;
+
 /**
  * This class is an extension of {@link Handler} that replaces the standard 
error response to be identical with those
  * sent by the {@link AbstractRestHandler}.
@@ -43,6 +45,11 @@ public class RouterHandler extends Handler {
 
        @Override
        protected void respondNotFound(ChannelHandlerContext ctx, HttpRequest 
request) {
-               HandlerUtils.sendErrorResponse(ctx, request, new 
ErrorResponseBody("Not found."), HttpResponseStatus.NOT_FOUND);
+               HandlerUtils.sendErrorResponse(
+                       ctx,
+                       request,
+                       new ErrorResponseBody("Not found."),
+                       HttpResponseStatus.NOT_FOUND,
+                       Collections.emptyMap());
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
index 63c3e35..7192832 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/AbstractExecutionGraphHandler.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nonnull;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.Executor;
@@ -55,10 +56,11 @@ public abstract class AbstractExecutionGraphHandler<R 
extends ResponseBody, M ex
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout,
+                       Map<String, String> responseHeaders,
                        MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor) {
-               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders);
+               super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
 
                this.executionGraphCache = 
Preconditions.checkNotNull(executionGraphCache);
                this.executor = Preconditions.checkNotNull(executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
index cdf562f..1d14563 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandler.java
@@ -34,6 +34,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import javax.annotation.Nonnull;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 
@@ -42,8 +43,12 @@ import java.util.concurrent.CompletionException;
  */
 public final class BlobServerPortHandler extends 
AbstractRestHandler<DispatcherGateway, EmptyRequestBody, 
BlobServerPortResponseBody, EmptyMessageParameters> {
 
-       public BlobServerPortHandler(CompletableFuture<String> 
localRestAddress, GatewayRetriever<DispatcherGateway> leaderRetriever, Time 
timeout) {
-               super(localRestAddress, leaderRetriever, timeout, 
BlobServerPortHeaders.getInstance());
+       public BlobServerPortHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<DispatcherGateway> leaderRetriever,
+                       Time timeout,
+                       Map<String, String> headers) {
+               super(localRestAddress, leaderRetriever, timeout, headers, 
BlobServerPortHeaders.getInstance());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
index f27d84f..7154246 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobConfigHandler.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -42,6 +43,7 @@ public class JobConfigHandler extends 
AbstractExecutionGraphHandler<JobConfigInf
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout,
+                       Map<String, String> responseHeaders,
                        MessageHeaders<EmptyRequestBody, JobConfigInfo, 
JobMessageParameters> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor) {
@@ -50,6 +52,7 @@ public class JobConfigHandler extends 
AbstractExecutionGraphHandler<JobConfigInf
                        localRestAddress,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        messageHeaders,
                        executionGraphCache,
                        executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
index feabbea..62f3e5e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobExceptionsHandler.java
@@ -36,6 +36,7 @@ import org.apache.flink.util.ExceptionUtils;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -50,6 +51,7 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphHandler<JobExcep
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout,
+                       Map<String, String> responseHeaders,
                        MessageHeaders<EmptyRequestBody, JobExceptionsInfo, 
JobMessageParameters> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor) {
@@ -58,6 +60,7 @@ public class JobExceptionsHandler extends 
AbstractExecutionGraphHandler<JobExcep
                        localRestAddress,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        messageHeaders,
                        executionGraphCache,
                        executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
index c8e6f8b..e7a30fb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobPlanHandler.java
@@ -29,6 +29,7 @@ import org.apache.flink.runtime.rest.messages.MessageHeaders;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -41,6 +42,7 @@ public class JobPlanHandler extends 
AbstractExecutionGraphHandler<JobPlanInfo, J
                CompletableFuture<String> localRestAddress,
                GatewayRetriever<? extends RestfulGateway> leaderRetriever,
                Time timeout,
+               Map<String, String> headers,
                MessageHeaders<EmptyRequestBody, JobPlanInfo, 
JobMessageParameters> messageHeaders,
                ExecutionGraphCache executionGraphCache,
                Executor executor) {
@@ -49,6 +51,7 @@ public class JobPlanHandler extends 
AbstractExecutionGraphHandler<JobPlanInfo, J
                        localRestAddress,
                        leaderRetriever,
                        timeout,
+                       headers,
                        messageHeaders,
                        executionGraphCache,
                        executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
index f810b5a..efb162e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandler.java
@@ -36,6 +36,7 @@ import javax.annotation.Nonnull;
 
 import java.io.ByteArrayInputStream;
 import java.io.ObjectInputStream;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 /**
@@ -43,8 +44,12 @@ import java.util.concurrent.CompletableFuture;
  */
 public final class JobSubmitHandler extends 
AbstractRestHandler<DispatcherGateway, JobSubmitRequestBody, 
JobSubmitResponseBody, EmptyMessageParameters> {
 
-       public JobSubmitHandler(CompletableFuture<String> localRestAddress, 
GatewayRetriever<DispatcherGateway> leaderRetriever, Time timeout) {
-               super(localRestAddress, leaderRetriever, timeout, 
JobSubmitHeaders.getInstance());
+       public JobSubmitHandler(
+                       CompletableFuture<String> localRestAddress,
+                       GatewayRetriever<DispatcherGateway> leaderRetriever,
+                       Time timeout,
+                       Map<String, String> headers) {
+               super(localRestAddress, leaderRetriever, timeout, headers, 
JobSubmitHeaders.getInstance());
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java
index ffd5b4c..0998177 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobTerminationHandler.java
@@ -39,6 +39,7 @@ import org.apache.flink.util.ExceptionUtils;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.TimeoutException;
@@ -52,8 +53,9 @@ public class JobTerminationHandler extends 
AbstractRestHandler<DispatcherGateway
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<DispatcherGateway> leaderRetriever,
                        Time timeout,
+                       Map<String, String> headers,
                        MessageHeaders<EmptyRequestBody, EmptyResponseBody, 
JobTerminationMessageParameters> messageHeaders) {
-               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders);
+               super(localRestAddress, leaderRetriever, timeout, headers, 
messageHeaders);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
index f7c9b49..55c465c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/JobVertexAccumulatorsHandler.java
@@ -37,6 +37,7 @@ import 
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
 import java.util.ArrayList;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -49,10 +50,18 @@ public class JobVertexAccumulatorsHandler extends 
AbstractExecutionGraphHandler<
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout,
+                       Map<String, String> responseHeaders,
                        MessageHeaders<EmptyRequestBody, 
JobVertexAccumulatorsInfo, JobVertexMessageParameters> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor) {
-               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+               super(
+                       localRestAddress,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       messageHeaders,
+                       executionGraphCache,
+                       executor);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
index 23d09f6..dcc16fd 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/AbstractCheckpointHandler.java
@@ -37,6 +37,7 @@ import org.apache.flink.util.Preconditions;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -53,11 +54,12 @@ public abstract class AbstractCheckpointHandler<R extends 
ResponseBody, M extend
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout,
+                       Map<String, String> responseHeaders,
                        MessageHeaders<EmptyRequestBody, R, M> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor,
                        CheckpointStatsCache checkpointStatsCache) {
-               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+               super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders, executionGraphCache, executor);
 
                this.checkpointStatsCache = 
Preconditions.checkNotNull(checkpointStatsCache);
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
index 1efa7af..91c9ae1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointConfigHandler.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
 import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -47,6 +48,7 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphHandler<Check
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout,
+                       Map<String, String> responseHeaders,
                        MessageHeaders<EmptyRequestBody, CheckpointConfigInfo, 
JobMessageParameters> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor) {
@@ -54,6 +56,7 @@ public class CheckpointConfigHandler extends 
AbstractExecutionGraphHandler<Check
                        localRestAddress,
                        leaderRetriever,
                        timeout,
+                       responseHeaders,
                        messageHeaders,
                        executionGraphCache,
                        executor);

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
index e6b9f6b..2816336 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointStatisticDetailsHandler.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.runtime.rest.messages.checkpoints.CheckpointStatistics;
 import org.apache.flink.runtime.webmonitor.RestfulGateway;
 import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
 
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -41,11 +42,20 @@ public class CheckpointStatisticDetailsHandler extends 
AbstractCheckpointHandler
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout,
+                       Map<String, String> responseHeaders,
                        MessageHeaders<EmptyRequestBody, CheckpointStatistics, 
CheckpointMessageParameters> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor,
                        CheckpointStatsCache checkpointStatsCache) {
-               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor, checkpointStatsCache);
+               super(
+                       localRestAddress,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       messageHeaders,
+                       executionGraphCache,
+                       executor,
+                       checkpointStatsCache);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
index a8514d5..b9db367 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/CheckpointingStatisticsHandler.java
@@ -44,6 +44,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -56,10 +57,11 @@ public class CheckpointingStatisticsHandler extends 
AbstractExecutionGraphHandle
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout,
+                       Map<String, String> responseHeaders,
                        MessageHeaders<EmptyRequestBody, 
CheckpointingStatistics, JobMessageParameters> messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor) {
-               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor);
+               super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders, executionGraphCache, executor);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
index 5479159..cff3bf0 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/checkpoints/TaskCheckpointStatisticDetailsHandler.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
@@ -53,11 +54,20 @@ public class TaskCheckpointStatisticDetailsHandler extends 
AbstractCheckpointHan
                        CompletableFuture<String> localRestAddress,
                        GatewayRetriever<? extends RestfulGateway> 
leaderRetriever,
                        Time timeout,
+                       Map<String, String> responseHeaders,
                        MessageHeaders<EmptyRequestBody, 
TaskCheckpointStatisticsWithSubtaskDetails, TaskCheckpointMessageParameters> 
messageHeaders,
                        ExecutionGraphCache executionGraphCache,
                        Executor executor,
                        CheckpointStatsCache checkpointStatsCache) {
-               super(localRestAddress, leaderRetriever, timeout, 
messageHeaders, executionGraphCache, executor, checkpointStatsCache);
+               super(
+                       localRestAddress,
+                       leaderRetriever,
+                       timeout,
+                       responseHeaders,
+                       messageHeaders,
+                       executionGraphCache,
+                       executor,
+                       checkpointStatsCache);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
index 118e356..e4ad461 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/TaskManagerLogHandler.java
@@ -69,6 +69,7 @@ import java.io.UnsupportedEncodingException;
 import java.net.InetSocketAddress;
 import java.net.URLDecoder;
 import java.nio.channels.FileChannel;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Objects;
 import java.util.Optional;
@@ -125,7 +126,7 @@ public class TaskManagerLogHandler extends 
RedirectHandler<JobManagerGateway> im
                Time timeout,
                FileMode fileMode,
                Configuration config) {
-               super(localJobManagerAddressPromise, retriever, timeout);
+               super(localJobManagerAddressPromise, retriever, timeout, 
Collections.emptyMap());
 
                this.executor = checkNotNull(executor);
                this.config = config;

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
index da115ee..650647b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/files/StaticFileServerHandler.java
@@ -64,6 +64,7 @@ import java.nio.file.Files;
 import java.text.ParseException;
 import java.text.SimpleDateFormat;
 import java.util.Calendar;
+import java.util.Collections;
 import java.util.Date;
 import java.util.GregorianCalendar;
 import java.util.Locale;
@@ -114,7 +115,7 @@ public class StaticFileServerHandler<T extends 
RestfulGateway> extends RedirectH
                        Time timeout,
                        File rootPath) throws IOException {
 
-               super(localJobManagerAddressFuture, retriever, timeout);
+               super(localJobManagerAddressFuture, retriever, timeout, 
Collections.emptyMap());
 
                this.rootPath = checkNotNull(rootPath).getCanonicalFile();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
index ea0b934..604c0b8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerUtils.java
@@ -43,6 +43,7 @@ import javax.annotation.Nonnull;
 
 import java.io.IOException;
 import java.io.StringWriter;
+import java.util.Map;
 
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONNECTION;
 import static 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpHeaders.Names.CONTENT_TYPE;
@@ -64,22 +65,34 @@ public class HandlerUtils {
         * @param httpRequest originating http request
         * @param response which should be sent
         * @param statusCode of the message to send
+        * @param headers additional header values
         * @param <P> type of the response
         */
        public static <P extends ResponseBody> void sendResponse(
                        ChannelHandlerContext channelHandlerContext,
                        HttpRequest httpRequest,
                        P response,
-                       HttpResponseStatus statusCode) {
+                       HttpResponseStatus statusCode,
+                       Map<String, String> headers) {
                StringWriter sw = new StringWriter();
                try {
                        mapper.writeValue(sw, response);
                } catch (IOException ioe) {
                        LOG.error("Internal server error. Could not map 
response to JSON.", ioe);
-                       sendErrorResponse(channelHandlerContext, httpRequest, 
new ErrorResponseBody("Internal server error. Could not map response to 
JSON."), HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                       sendErrorResponse(
+                               channelHandlerContext,
+                               httpRequest,
+                               new ErrorResponseBody("Internal server error. 
Could not map response to JSON."),
+                               HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                               headers);
                        return;
                }
-               sendResponse(channelHandlerContext, httpRequest, sw.toString(), 
statusCode);
+               sendResponse(
+                       channelHandlerContext,
+                       httpRequest,
+                       sw.toString(),
+                       statusCode,
+                       headers);
        }
 
        /**
@@ -89,12 +102,14 @@ public class HandlerUtils {
         * @param httpRequest originating http request
         * @param errorMessage which should be sent
         * @param statusCode of the message to send
+        * @param headers additional header values
         */
        public static void sendErrorResponse(
                        ChannelHandlerContext channelHandlerContext,
                        HttpRequest httpRequest,
                        ErrorResponseBody errorMessage,
-                       HttpResponseStatus statusCode) {
+                       HttpResponseStatus statusCode,
+                       Map<String, String> headers) {
 
                StringWriter sw = new StringWriter();
                try {
@@ -102,9 +117,19 @@ public class HandlerUtils {
                } catch (IOException e) {
                        // this should never happen
                        LOG.error("Internal server error. Could not map error 
response to JSON.", e);
-                       sendResponse(channelHandlerContext, httpRequest, 
"Internal server error. Could not map error response to JSON.", 
HttpResponseStatus.INTERNAL_SERVER_ERROR);
+                       sendResponse(
+                               channelHandlerContext,
+                               httpRequest,
+                               "Internal server error. Could not map error 
response to JSON.",
+                               HttpResponseStatus.INTERNAL_SERVER_ERROR,
+                               headers);
                }
-               sendResponse(channelHandlerContext, httpRequest, sw.toString(), 
statusCode);
+               sendResponse(
+                       channelHandlerContext,
+                       httpRequest,
+                       sw.toString(),
+                       statusCode,
+                       headers);
        }
 
        /**
@@ -114,16 +139,22 @@ public class HandlerUtils {
         * @param httpRequest originating http request
         * @param message which should be sent
         * @param statusCode of the message to send
+        * @param headers additional header values
         */
        public static void sendResponse(
                        @Nonnull ChannelHandlerContext channelHandlerContext,
                        @Nonnull HttpRequest httpRequest,
                        @Nonnull String message,
-                       @Nonnull HttpResponseStatus statusCode) {
+                       @Nonnull HttpResponseStatus statusCode,
+                       @Nonnull Map<String, String> headers) {
                HttpResponse response = new DefaultHttpResponse(HTTP_1_1, 
statusCode);
 
                response.headers().set(CONTENT_TYPE, "application/json");
 
+               for (Map.Entry<String, String> headerEntry : 
headers.entrySet()) {
+                       response.headers().set(headerEntry.getKey(), 
headerEntry.getValue());
+               }
+
                if (HttpHeaders.isKeepAlive(httpRequest)) {
                        response.headers().set(CONNECTION, 
HttpHeaders.Values.KEEP_ALIVE);
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
index 00fda57..4d3c6b5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java
@@ -222,6 +222,7 @@ public class RestEndpointITCase extends TestLogger {
                                localAddressFuture,
                                leaderRetriever,
                                timeout,
+                               Collections.emptyMap(),
                                new TestHeaders());
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
index 0ea18db..15c2eb4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/BlobServerPortHandlerTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
@@ -58,7 +59,8 @@ public class BlobServerPortHandlerTest extends TestLogger {
                BlobServerPortHandler handler = new BlobServerPortHandler(
                        
CompletableFuture.completedFuture("http://localhost:1234";),
                        mockGatewayRetriever,
-                       RpcUtils.INF_TIMEOUT);
+                       RpcUtils.INF_TIMEOUT,
+                       Collections.emptyMap());
 
                BlobServerPortResponseBody portResponse = handler
                        .handleRequest(new 
HandlerRequest<>(EmptyRequestBody.getInstance(), 
EmptyMessageParameters.getInstance()), mockGateway)
@@ -77,7 +79,8 @@ public class BlobServerPortHandlerTest extends TestLogger {
                BlobServerPortHandler handler = new BlobServerPortHandler(
                        
CompletableFuture.completedFuture("http://localhost:1234";),
                        mockGatewayRetriever,
-                       RpcUtils.INF_TIMEOUT);
+                       RpcUtils.INF_TIMEOUT,
+                       Collections.emptyMap());
 
                try {
                        handler
@@ -93,5 +96,6 @@ public class BlobServerPortHandlerTest extends TestLogger {
        }
 
        private static class TestException extends Exception {
+               private static final long serialVersionUID = 
-7064446788277853899L;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/eddb5b0a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
index 1196d40..212af5f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
@@ -35,6 +35,7 @@ import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseSt
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
 import static org.mockito.Matchers.any;
@@ -55,7 +56,8 @@ public class JobSubmitHandlerTest extends TestLogger {
                JobSubmitHandler handler = new JobSubmitHandler(
                        
CompletableFuture.completedFuture("http://localhost:1234";),
                        mockGatewayRetriever,
-                       RpcUtils.INF_TIMEOUT);
+                       RpcUtils.INF_TIMEOUT,
+                       Collections.emptyMap());
 
                JobSubmitRequestBody request = new JobSubmitRequestBody(new 
byte[0]);
 
@@ -76,7 +78,8 @@ public class JobSubmitHandlerTest extends TestLogger {
                JobSubmitHandler handler = new JobSubmitHandler(
                        
CompletableFuture.completedFuture("http://localhost:1234";),
                        mockGatewayRetriever,
-                       RpcUtils.INF_TIMEOUT);
+                       RpcUtils.INF_TIMEOUT,
+                       Collections.emptyMap());
 
                JobGraph job = new JobGraph("testjob");
                JobSubmitRequestBody request = new JobSubmitRequestBody(job);

Reply via email to