[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556256#comment-16556256 ] ASF GitHub Bot commented on FLINK-7738: --- EronWright commented on issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, client) URL: https://github.com/apache/flink/pull/4767#issuecomment-407890658 @tillrohrmann closing this due to inactivity. Ping me if you want me to take another crack at it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > Labels: pull-request-available > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16556257#comment-16556257 ] ASF GitHub Bot commented on FLINK-7738: --- EronWright closed pull request #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, client) URL: https://github.com/apache/flink/pull/4767 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java index 4808781c7b8..cf465294f5a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/RedirectHandlerTest.java @@ -29,17 +29,24 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.DefaultFullHttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpMethod; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponse; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpVersion; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.KeepAliveWrite; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Router; +import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; import org.junit.Assert; import org.junit.Test; import javax.annotation.Nonnull; +import java.util.HashMap; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -137,6 +144,35 @@ public void testRedirectHandler() throws Exception { } } + /** +* Tests the approach of using the redirect handler as a standalone handler. +*/ + @Test + public void testUserEvent() { + final String correctAddress = "foobar:21345"; + final CompletableFuture localAddressFuture = CompletableFuture.completedFuture(correctAddress); + final Time timeout = Time.seconds(10L); + + final RestfulGateway localGateway = mock(RestfulGateway.class); + when(localGateway.requestRestAddress(any(Time.class))).thenReturn(CompletableFuture.completedFuture(correctAddress)); + final GatewayRetriever gatewayRetriever = mock(GatewayRetriever.class); + when(gatewayRetriever.getNow()).thenReturn(Optional.of(localGateway)); + + final RedirectHandler redirectHandler = new RedirectHandler<>( + localAddressFuture, + gatewayRetriever, + timeout); + final UserEventHandler eventHandler = new UserEventHandler(); + EmbeddedChannel channel = new EmbeddedChannel(redirectHandler, eventHandler); + + // write a (routed) HTTP request, then validate that a user event was propagated + DefaultFullHttpRequest request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, "/"); + Routed routed = new Routed(null, false, request, "/", new HashMap<>(), new HashMap<>()); + channel.writeInbound(routed); + Assert.assertNotNull(eventHandler.gateway); + Assert.assertNotNull(eventHandler.routed); + } + private static class TestingHandler extends RedirectHandler { protected TestingHandler( @@ -154,4 +190,25 @@ protected void respondAsLeader(ChannelHandlerContext channelHandlerContext, Rout } } + private static class UserEventHandler extends ChannelInboundHandlerAdapter { + + public volatile T gateway; + + public volatile Routed routed; + + @Override + @SuppressWarnings("unchecked") + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof RedirectHandler.GatewayRetrieved) { + gateway = ((RedirectHandler.GatewayRetrieved) evt).getGateway(); + } + super.use
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16406457#comment-16406457 ] ASF GitHub Bot commented on FLINK-7738: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/4767 @tillrohrmann do you need a websocket yet? > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16326322#comment-16326322 ] ASF GitHub Bot commented on FLINK-7738: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4767 Hi @EronWright, yes I think we still need support for web sockets. The first REST based client won't use this but later on we should definitely add this functionality. At the moment we try hard to make Flip-6 feature equivalent to the old distributed architecture and therefore we couldn't make progress here. But once this has been done, we should re-iterate over this PR again. > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright >Priority: Major > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16324419#comment-16324419 ] ASF GitHub Bot commented on FLINK-7738: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/4767 @tillrohrmann are you still interested in this websocket code for the REST server? Aside from rebasing, any 'must fix' issues here? > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202661#comment-16202661 ] ASF GitHub Bot commented on FLINK-7738: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/4767 I don't much like the use of `RequestBody` and `ResponseBody` here, or even that the WebSocket distinguishes between client and server messages. Honestly a `MessageBody` marker interface may suffice. WDTY? > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202468#comment-16202468 ] ASF GitHub Bot commented on FLINK-7738: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r144381682 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractWebSocketHandler.java --- @@ -0,0 +1,301 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.rest.handler.util.HandlerUtils; +import org.apache.flink.runtime.rest.messages.ErrorResponseBody; +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.RequestBody; +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.rest.messages.WebSocketSpecification; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext; +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpRequest; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.router.Routed; +import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; +import org.apache.flink.shaded.netty4.io.netty.util.AttributeKey; +import org.apache.flink.shaded.netty4.io.netty.util.ReferenceCountUtil; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.concurrent.CompletableFuture; + +/** + * A channel handler for WebSocket resources. + * + * This handler handles handshaking and ongoing messaging with a WebSocket client, + * based on a {@link WebSocketSpecification} that describes the REST resource location, + * parameter type, and message inbound/outbound types. Messages are automatically + * encoded from (and decoded to) JSON text. + * + * Subclasses should override the following methods to extend the respective phases. + * + * {@code handshakeInitiated} - occurs upon receipt of a handshake request from an HTTP client. Useful for parameter validation. + * {@code handshakeCompleted} - occurs upon successful completion; WebSocket is ready for I/O. + * {@code messageReceived}: occurs when a WebSocket message is received on the channel. + * + * + * The handler supports gateway availability announcements. + * + * @param The gateway type. + * @param The REST parameter type. + * @param The outbound message type. + * @param The inbound message type. + */ +public abstract class AbstractWebSocketHandler extends ChannelInboundHandlerAdapter { + + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private final RedirectHandler redirectHandler; + + private final AttributeKey gatewayAttr; + + private final WebSocketSpecification specification; + + private final ChannelHandler messageCodec; + + private final AttributeKey parametersAttr; + + /** +* Creates a new handler. +*/ + public AbstractWebSocketHandler( + @Nonnull CompletableFuture localAddressFuture, + @Nonnull GatewayRetriever leaderRet
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16200785#comment-16200785 ] ASF GitHub Bot commented on FLINK-7738: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/4767 Updated the description based on the latest PR. > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191474#comment-16191474 ] ASF GitHub Bot commented on FLINK-7738: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142712872 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocket.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.websocket; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; + +/** + * A WebSocket for sending and receiving messages. + */ +public interface WebSocket { + + /** +* Adds a listener for websocket messages. +*/ + void addListener(WebSocketListener listener); + + /** +* Sends a message. +*/ + ChannelFuture send(ResponseBody message); --- End diff -- Good catch > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191468#comment-16191468 ] ASF GitHub Bot commented on FLINK-7738: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142712091 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java --- @@ -325,6 +406,98 @@ public TestParameters getUnresolvedMessageParameters() { } } + private static class TestWebSocketOperation { + + private static class WsParameters extends MessageParameters { + private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter(); + + @Override + public Collection> getPathParameters() { + return Collections.singleton(jobIDPathParameter); + } + + @Override + public Collection> getQueryParameters() { + return Collections.emptyList(); + } + } + + static class WsHeaders implements MessageHeaders { + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return "/test/:jobid/subscribe"; + } + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class getResponseClass() { + return WebSocketUpgradeResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public WsParameters getUnresolvedMessageParameters() { + return new WsParameters(); + } + } + + static class WsRestHandler extends AbstractRestHandler { + + private final TestEventProvider eventProvider; + + WsRestHandler( + CompletableFuture localAddressFuture, + GatewayRetriever leaderRetriever, + TestEventProvider eventProvider, + Time timeout) { + super(localAddressFuture, leaderRetriever, timeout, new WsHeaders()); + this.eventProvider = eventProvider; + } + + @Override + protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull RestfulGateway gateway) throws RestHandlerException { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + Assert.assertEquals(PATH_JOB_ID, jobID); + ChannelHandler messageHandler = new WsMessageHandler(eventProvider, jobID); --- End diff -- The main value of `AbstractRestHandler` in this scenario is in decoding the HTTP request into a `HandlerRequest`. By factoring that code into a `MessageToMessageDecoder` we could reuse it and avoid the need for `AbstractRestHandler` in this scenario. In other words, the 'Netty way' would be to use a pipeline of handlers, which is more flexible than an inheritance hierarchy in my opinion. Normal operation: `[HttpCodec] -> [RestRequestDecoder] -> [RestHandler]` WebSocket operation: `[HttpCodec] -> [RestRequestDecoder] -> [WebSocketHandler]` We could go further by encapsulating each operation in a handler that simply adds the appropriate child handlers, similar to how `HttpCodec` simply adds an encoder and decoder to the pipeline. WDYT? > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191413#comment-16191413 ] ASF GitHub Bot commented on FLINK-7738: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142700416 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -275,4 +301,121 @@ public HttpResponseStatus getHttpResponseStatus() { return httpResponseStatus; } } + + public , U extends MessageParameters, R extends ResponseBody> CompletableFuture sendWebSocketRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, Class messageClazz, WebSocketListener... listeners) throws IOException { --- End diff -- I too was unhappy about using a special response body, but felt that the alternative required some rework of the REST handler that was best done in a follow-up. With some rework we can eliminate the funky response body. > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191393#comment-16191393 ] ASF GitHub Bot commented on FLINK-7738: --- Github user EronWright commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142696121 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java --- @@ -157,7 +160,12 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, Routed routed, T HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR); } } else { - HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode()); + if (resp instanceof WebSocketUpgradeResponseBody) { + upgradeToWebSocket(ctx, routed, (WebSocketUpgradeResponseBody) resp); --- End diff -- The REST handler is not active after the upgrade is complete, and it would be harmless to remove from the pipeline. The message handler takes over, reading and writing websocket frames based on typed messages. > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191130#comment-16191130 ] ASF GitHub Bot commented on FLINK-7738: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142641670 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocket.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.websocket; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; + +/** + * A WebSocket for sending and receiving messages. + */ +public interface WebSocket { + + /** +* Adds a listener for websocket messages. +*/ + void addListener(WebSocketListener listener); + + /** +* Sends a message. +*/ + ChannelFuture send(ResponseBody message); --- End diff -- My understanding is that the WebSocket interface is only used on the client, so shouldn't this be typed to `RequestBody`? > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191131#comment-16191131 ] ASF GitHub Bot commented on FLINK-7738: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142640401 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -275,4 +301,121 @@ public HttpResponseStatus getHttpResponseStatus() { return httpResponseStatus; } } + + public , U extends MessageParameters, R extends ResponseBody> CompletableFuture sendWebSocketRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, Class messageClazz, WebSocketListener... listeners) throws IOException { + Preconditions.checkNotNull(targetAddress); + Preconditions.checkArgument(0 <= targetPort && targetPort < 65536, "The target port " + targetPort + " is not in the range (0, 65536]."); + Preconditions.checkNotNull(messageHeaders); + Preconditions.checkNotNull(messageParameters); + Preconditions.checkState(messageParameters.isResolved(), "Message parameters were not resolved."); + + String targetUrl = MessageParameters.resolveUrl(messageHeaders.getTargetRestEndpointURL(), messageParameters); + URI webSocketURL = URI.create("ws://" + targetAddress + ":" + targetPort).resolve(targetUrl); + LOG.debug("Sending WebSocket request to {}", webSocketURL); + + final HttpHeaders headers = new DefaultHttpHeaders() + .add(HttpHeaders.Names.CONTENT_TYPE, RestConstants.REST_CONTENT_TYPE); + + Bootstrap bootstrap1 = bootstrap.clone().handler(new ClientBootstrap() { + @Override + protected void initChannel(SocketChannel channel) throws Exception { + super.initChannel(channel); + channel.pipeline() + .addLast(new WebSocketClientProtocolHandler(webSocketURL, WebSocketVersion.V13, null, false, headers, 65535)) + .addLast(new WsResponseHandler(channel, messageClazz, listeners)); + } + }); + + return CompletableFuture.supplyAsync(() -> bootstrap1.connect(targetAddress, targetPort), executor) + .thenApply((channel) -> { + try { + return channel.sync(); + } catch (InterruptedException e) { + throw new FlinkRuntimeException(e); + } + }) + .thenApply((ChannelFuture::channel)) + .thenCompose(channel -> { + WsResponseHandler handler = channel.pipeline().get(WsResponseHandler.class); + return handler.getWebSocketFuture(); + }); + } + + private static class WsResponseHandler extends SimpleChannelInboundHandler implements WebSocket { + + private final Channel channel; + private final Class messageClazz; + private final List listeners = new CopyOnWriteArrayList<>(); + + private final CompletableFuture webSocketFuture = new CompletableFuture<>(); + + CompletableFuture getWebSocketFuture() { + return webSocketFuture; + } + + public WsResponseHandler(Channel channel, Class messageClazz, WebSocketListener[] listeners) { + this.channel = channel; + this.messageClazz = messageClazz; + this.listeners.addAll(Arrays.asList(listeners)); + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + LOG.warn("WebSocket exception", cause); + webSocketFuture.completeExceptionally(cause); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { + if (evt instanceof WebSocketClientProtocolHandler.ClientHandshakeStateEvent) { + WebSocketClientProtocolHandler.ClientHandshakeStateEvent wsevt = (WebSocketClientProtocolHandler.ClientHandshakeStateEvent) evt; + switch(wsevt) { --- End diff -- missing space after switch > Create WebSocket handler (server) > --
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191129#comment-16191129 ] ASF GitHub Bot commented on FLINK-7738: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142637674 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocketListener.java --- @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.websocket; + +import org.apache.flink.runtime.rest.messages.ResponseBody; +import org.apache.flink.runtime.util.event.EventListener; + +/** + * A listener for WebSocket messages. + */ +public interface WebSocketListener extends EventListener { } --- End diff -- I would add a proper type parameter. Currently every implementation would be forced to do instanceof+cast checks. > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191132#comment-16191132 ] ASF GitHub Bot commented on FLINK-7738: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142639095 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/websocket/WebSocket.java --- @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.websocket; + +import org.apache.flink.runtime.rest.messages.ResponseBody; + +import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture; + +/** + * A WebSocket for sending and receiving messages. + */ +public interface WebSocket { --- End diff -- similar to the WebSocketListener this should have a type parameters. > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191099#comment-16191099 ] ASF GitHub Bot commented on FLINK-7738: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142633088 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/AbstractRestHandler.java --- @@ -157,7 +160,12 @@ protected void respondAsLeader(final ChannelHandlerContext ctx, Routed routed, T HandlerUtils.sendErrorResponse(ctx, httpRequest, new ErrorResponseBody("Internal server error."), HttpResponseStatus.INTERNAL_SERVER_ERROR); } } else { - HandlerUtils.sendResponse(ctx, httpRequest, resp, messageHeaders.getResponseStatusCode()); + if (resp instanceof WebSocketUpgradeResponseBody) { + upgradeToWebSocket(ctx, routed, (WebSocketUpgradeResponseBody) resp); --- End diff -- help me out here. After the upgrade is complete, which parts of the AbstractRestHandler class are still used? > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191100#comment-16191100 ] ASF GitHub Bot commented on FLINK-7738: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142634775 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/RestEndpointITCase.java --- @@ -325,6 +406,98 @@ public TestParameters getUnresolvedMessageParameters() { } } + private static class TestWebSocketOperation { + + private static class WsParameters extends MessageParameters { + private final JobIDPathParameter jobIDPathParameter = new JobIDPathParameter(); + + @Override + public Collection> getPathParameters() { + return Collections.singleton(jobIDPathParameter); + } + + @Override + public Collection> getQueryParameters() { + return Collections.emptyList(); + } + } + + static class WsHeaders implements MessageHeaders { + + @Override + public HttpMethodWrapper getHttpMethod() { + return HttpMethodWrapper.GET; + } + + @Override + public String getTargetRestEndpointURL() { + return "/test/:jobid/subscribe"; + } + + @Override + public Class getRequestClass() { + return EmptyRequestBody.class; + } + + @Override + public Class getResponseClass() { + return WebSocketUpgradeResponseBody.class; + } + + @Override + public HttpResponseStatus getResponseStatusCode() { + return HttpResponseStatus.OK; + } + + @Override + public WsParameters getUnresolvedMessageParameters() { + return new WsParameters(); + } + } + + static class WsRestHandler extends AbstractRestHandler { + + private final TestEventProvider eventProvider; + + WsRestHandler( + CompletableFuture localAddressFuture, + GatewayRetriever leaderRetriever, + TestEventProvider eventProvider, + Time timeout) { + super(localAddressFuture, leaderRetriever, timeout, new WsHeaders()); + this.eventProvider = eventProvider; + } + + @Override + protected CompletableFuture handleRequest(@Nonnull HandlerRequest request, @Nonnull RestfulGateway gateway) throws RestHandlerException { + JobID jobID = request.getPathParameter(JobIDPathParameter.class); + Assert.assertEquals(PATH_JOB_ID, jobID); + ChannelHandler messageHandler = new WsMessageHandler(eventProvider, jobID); --- End diff -- if this is how a AbstractRestHandler implementation for WebSockets would actually look like I'm questioning the benefit of implementing it as a AbstractRestHandler in the first place. An explicit AbstractWebSocketRestHandler class could have an abstract `initializeWebSocket(HandlerRequest ...)` method instead of hacking it into `AbstractRestHandler#handleRequest` and a separate method for creating the event provider that is types to the actual response we're sending back. > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16191101#comment-16191101 ] ASF GitHub Bot commented on FLINK-7738: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4767#discussion_r142634073 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java --- @@ -275,4 +301,121 @@ public HttpResponseStatus getHttpResponseStatus() { return httpResponseStatus; } } + + public , U extends MessageParameters, R extends ResponseBody> CompletableFuture sendWebSocketRequest(String targetAddress, int targetPort, M messageHeaders, U messageParameters, Class messageClazz, WebSocketListener... listeners) throws IOException { --- End diff -- I really dislike how the `WebSocketUpgradeResponseBody` is defined as the response in the headers. Not only is this not the actual response we're getting back (that would be R), we now also introduce an arbitrary response type, which voids the type safety and prevents us from auto-generating proper documentation. The MessageHeaders are very much a high-level user-facing specification, but here we're using it for the setup of the websockets which is a pretty relatively low-level affair. > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16190784#comment-16190784 ] ASF GitHub Bot commented on FLINK-7738: --- GitHub user EronWright opened a pull request: https://github.com/apache/flink/pull/4767 [FLINK-7738] [flip-6] Create WebSocket handler (server, client) ## What is the purpose of the change Introduces WebSocket support for the FLIP-6 REST server and client. The basic idea is to use the normal REST handler to initiate a websocket upgrade.In this way, the normal request parsing logic may be used. For example, a REST method of `/jobs/:jobid/subscribe` may be developed using a normal REST handler. The handler responds such that the server initiates the upgrade procedure rather than producing a normal REST response. A new type of handler based on `AbstractWebSocketMessageHandler` is then installed into the pipeline for subsequent interaction. Netty's `ChannelGroup` is leveraged to act as an event bus to easily dispatch a message to one or more channels based on a routing key. In the above example, the routing key might be `jobid`, meaning that a given channel is listening to events related to a certain job. It is expected that a concrete subclass of `RestServerEndpoint` create one or more `KeyedChannelRouter` instances as needed for its handlers, and then write messages as it sees fit. The client was similarly adapted to open a `WebSocket` with associated listeners. Consider the work to be a stop-gap pending further discussion. The `RestEndpointITCase` test was enhanced with an end-to-end demonstration. A separate unit test for `AbstractRestHandler` was also introduced. ## Brief change log - Introduce `AbstractWebSocketMessageHandler` to handle inbound and outbound websocket messages. - Introduce `WebSocketUpgradeResponseBody` as a special REST response that triggers a websocket upgrade. - Update `AbstractRestHandler` to handle websocket upgrades. - Introduce `KeyedChannelRouter` to route websocket messages to interested channels. - Update `RestClient` with a new method, `sendWebSocketRequest`. - Introduce `WebSocket` and `WebSocketListener`. - Update `RestEndpointITCase` with end-to-end websocket test. ## Verifying this change This change added tests and can be verified as follows: - `AbstractRestHandlerTest` - `RestEndpointITCase` - `AbstractWebSocketMessageHandlerTest` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive):no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no You can merge this pull request into a Git repository by running: $ git pull https://github.com/EronWright/flink FLINK-7738-2 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4767.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4767 commit f56168846731ab4205a2b04a42285e0b3a3f1972 Author: Wright, Eron Date: 2017-10-04T00:26:56Z [FLINK-7738] [flip-6] Create WebSocket handler (server, client) > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)