[ 
https://issues.apache.org/jira/browse/KNOX-3065?focusedWorklogId=942746&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-942746
 ]

ASF GitHub Bot logged work on KNOX-3065:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Nov/24 09:55
            Start Date: 08/Nov/24 09:55
    Worklog Time Spent: 10m 
      Work Description: hanicz commented on code in PR #947:
URL: https://github.com/apache/knox/pull/947#discussion_r1834047682


##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+    private final HttpAsyncClient asyncClient;
+    private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+    public SSEDispatch(FilterConfig filterConfig) {
+        HttpAsyncClientFactory asyncClientFactory = new 
DefaultHttpAsyncClientFactory();
+        this.asyncClient = 
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+        if (asyncClient instanceof CloseableHttpAsyncClient) {
+            ((CloseableHttpAsyncClient) this.asyncClient).start();
+        }
+    }
+
+    @Override
+    public void doGet(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpGet httpGetRequest = new HttpGet(url);
+        this.commonBaseMethod(httpGetRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPost(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException, URISyntaxException {
+        final HttpPost httpPostRequest = new HttpPost(url);
+        httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPostRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPut(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPut httpPutRequest = new HttpPut(url);
+        httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPutRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPatch(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPatch httpPatchRequest = new HttpPatch(url);
+        httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPatchRequest, inboundRequest, 
outboundResponse);
+    }
+
+    private void commonBaseMethod(HttpUriRequest httpMethod, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws 
IOException {
+        this.addAcceptHeader(httpMethod);
+        this.copyRequestHeaderFields(httpMethod, inboundRequest);
+        this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+    }
+
+    private void executeRequest(HttpUriRequest outboundRequest, 
HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+        AsyncContext asyncContext = inboundRequest.startAsync();
+        //No timeout
+        asyncContext.setTimeout(0L);
+
+        HttpAsyncRequestProducer producer = 
HttpAsyncMethods.create(outboundRequest);
+        AsyncCharConsumer<SSEResponse> consumer = new 
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext);
+        this.getSSEConnection(producer, consumer, outboundRequest);
+    }
+
+    private void getSSEConnection(HttpAsyncRequestProducer producer, 
AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) {
+        LOG.dispatchRequest(outboundRequest.getMethod(), 
outboundRequest.getURI());
+        auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), 
ResourceType.URI, ActionOutcome.UNAVAILABLE, 
RES.requestMethod(outboundRequest.getMethod()));
+        asyncClient.execute(producer, consumer, new 
FutureCallback<SSEResponse>() {
+
+            @Override
+            public void completed(final SSEResponse response) {
+                closeProducer(producer);
+                LOG.sseConnectionDone();
+            }
+
+            @Override
+            public void failed(final Exception ex) {
+                closeProducer(producer);
+                LOG.sseConnectionError(ex.getMessage());
+            }
+
+            @Override
+            public void cancelled() {
+                closeProducer(producer);
+                LOG.sseConnectionCancelled();
+            }
+        });
+    }
+
+    private void addAcceptHeader(HttpUriRequest outboundRequest) {
+        outboundRequest.setHeader(HttpHeaders.ACCEPT, 
SSEDispatch.TEXT_EVENT_STREAM_VALUE);
+    }
+
+    private void handleOkResponse(HttpServletResponse outboundResponse, URI 
url, HttpResponse inboundResponse) {
+        this.prepareServletResponse(outboundResponse, 
inboundResponse.getStatusLine().getStatusCode());
+        this.copyResponseHeaderFields(outboundResponse, inboundResponse);
+        auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, 
ActionOutcome.SUCCESS, RES.responseStatus(HttpStatus.SC_OK));
+    }
+
+    private void handleErrorResponse(HttpServletResponse outboundResponse, URI 
url, HttpResponse httpResponse) {
+        int statusCode = httpResponse.getStatusLine().getStatusCode();
+        outboundResponse.setStatus(statusCode);
+        LOG.dispatchResponseStatusCode(statusCode);
+        auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, 
ActionOutcome.FAILURE, RES.responseStatus(statusCode));
+    }
+
+    private void prepareServletResponse(HttpServletResponse outboundResponse, 
int statusCode) {
+        LOG.dispatchResponseStatusCode(statusCode);
+        outboundResponse.setStatus(statusCode);
+        outboundResponse.setCharacterEncoding(StandardCharsets.UTF_8.name());
+    }
+
+    private boolean isSuccessful(int statusCode) {
+        return (statusCode >= HttpStatus.SC_OK && statusCode < 300);
+    }
+
+    private void closeProducer(HttpAsyncRequestProducer producer) {
+        try {
+            producer.close();
+        } catch (IOException e) {
+            LOG.sseProducerCloseError(e);
+        }
+    }
+
+    private class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
+        private SSEResponse sseResponse;
+        private final HttpServletResponse outboundResponse;
+        private final URI url;
+        private final AsyncContext asyncContext;
+
+        SSECharConsumer(HttpServletResponse outboundResponse, URI url, 
AsyncContext asyncContext) {
+            this.outboundResponse = outboundResponse;
+            this.url = url;
+            this.asyncContext = asyncContext;
+        }
+
+        @Override
+        protected void onResponseReceived(final HttpResponse inboundResponse) {
+            this.sseResponse = new SSEResponse(inboundResponse);
+            if (isSuccessful(inboundResponse.getStatusLine().getStatusCode())) 
{
+                handleOkResponse(outboundResponse, url, inboundResponse);

Review Comment:
   Renamed it to handleSuccessResponse





Issue Time Tracking
-------------------

    Worklog Id:     (was: 942746)
    Time Spent: 2h  (was: 1h 50m)

> Support Server Sent Events (SSE) in Knox
> ----------------------------------------
>
>                 Key: KNOX-3065
>                 URL: https://issues.apache.org/jira/browse/KNOX-3065
>             Project: Apache Knox
>          Issue Type: New Feature
>          Components: Server
>    Affects Versions: 2.1.0
>            Reporter: Tamás Hanicz
>            Priority: Major
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> Currently Knox is not working with SSE. In case of SSE the response header 
> contains Content-Type = text/event-stream, in which case the server will send 
> messages terminated by \n\n. Knox should send these messages to the client as 
> they arrive. Currently Knox collects them all, and once the server closes the 
> connection sends all of them concatenated to each other to the client.
> Requirement:
>  * Full support in Knox for Server Sent Events.
> Definition of done:
>  * Clients can connect to SSE endpoints in the backend via Knox.
>  * Server Sent Events 
> ([https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events])
>  are forwarded by Knox from the server to the client as soon as they are 
> received.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to