[ 
https://issues.apache.org/jira/browse/KNOX-3065?focusedWorklogId=942558&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-942558
 ]

ASF GitHub Bot logged work on KNOX-3065:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 07/Nov/24 20:34
            Start Date: 07/Nov/24 20:34
    Worklog Time Spent: 10m 
      Work Description: pzampino commented on code in PR #947:
URL: https://github.com/apache/knox/pull/947#discussion_r1831518329


##########
gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java:
##########
@@ -123,4 +123,19 @@ public interface SpiGatewayMessages {
 
   @Message( level = MessageLevel.ERROR, text = "No valid principal found" )
   void noPrincipalFound();
+
+  @Message( level = MessageLevel.INFO, text = "Every event was read from the 
stream" )

Review Comment:
   The content of these messages should indicate that they're in the context of 
SSE since the logger itself does not give enough context IMO.



##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+    private final HttpAsyncClient asyncClient;
+    private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+    public SSEDispatch(FilterConfig filterConfig) {
+        HttpAsyncClientFactory asyncClientFactory = new 
DefaultHttpAsyncClientFactory();
+        this.asyncClient = 
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+        if (asyncClient instanceof CloseableHttpAsyncClient) {
+            ((CloseableHttpAsyncClient) this.asyncClient).start();
+        }
+    }
+
+    @Override
+    public void doGet(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpGet httpGetRequest = new HttpGet(url);
+        this.commonBaseMethod(httpGetRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPost(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException, URISyntaxException {
+        final HttpPost httpPostRequest = new HttpPost(url);
+        httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPostRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPut(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPut httpPutRequest = new HttpPut(url);
+        httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPutRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPatch(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPatch httpPatchRequest = new HttpPatch(url);
+        httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPatchRequest, inboundRequest, 
outboundResponse);
+    }
+
+    private void commonBaseMethod(HttpUriRequest httpMethod, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws 
IOException {

Review Comment:
   nit: I don't love this method name. Maybe something like doHttpMethod() is 
better?



##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+    private final HttpAsyncClient asyncClient;
+    private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+    public SSEDispatch(FilterConfig filterConfig) {
+        HttpAsyncClientFactory asyncClientFactory = new 
DefaultHttpAsyncClientFactory();
+        this.asyncClient = 
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+        if (asyncClient instanceof CloseableHttpAsyncClient) {
+            ((CloseableHttpAsyncClient) this.asyncClient).start();
+        }
+    }
+
+    @Override
+    public void doGet(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpGet httpGetRequest = new HttpGet(url);
+        this.commonBaseMethod(httpGetRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPost(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException, URISyntaxException {
+        final HttpPost httpPostRequest = new HttpPost(url);
+        httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPostRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPut(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPut httpPutRequest = new HttpPut(url);
+        httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPutRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPatch(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPatch httpPatchRequest = new HttpPatch(url);
+        httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPatchRequest, inboundRequest, 
outboundResponse);
+    }
+
+    private void commonBaseMethod(HttpUriRequest httpMethod, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws 
IOException {
+        this.addAcceptHeader(httpMethod);
+        this.copyRequestHeaderFields(httpMethod, inboundRequest);
+        this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+    }
+
+    private void executeRequest(HttpUriRequest outboundRequest, 
HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+        AsyncContext asyncContext = inboundRequest.startAsync();
+        //No timeout
+        asyncContext.setTimeout(0L);
+
+        HttpAsyncRequestProducer producer = 
HttpAsyncMethods.create(outboundRequest);
+        AsyncCharConsumer<SSEResponse> consumer = new 
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext);
+        this.getSSEConnection(producer, consumer, outboundRequest);
+    }
+
+    private void getSSEConnection(HttpAsyncRequestProducer producer, 
AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) {
+        LOG.dispatchRequest(outboundRequest.getMethod(), 
outboundRequest.getURI());
+        auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), 
ResourceType.URI, ActionOutcome.UNAVAILABLE, 
RES.requestMethod(outboundRequest.getMethod()));
+        asyncClient.execute(producer, consumer, new 
FutureCallback<SSEResponse>() {
+
+            @Override
+            public void completed(final SSEResponse response) {
+                closeProducer(producer);
+                LOG.sseConnectionDone();
+            }
+
+            @Override
+            public void failed(final Exception ex) {
+                closeProducer(producer);
+                LOG.sseConnectionError(ex.getMessage());
+            }
+
+            @Override
+            public void cancelled() {
+                closeProducer(producer);
+                LOG.sseConnectionCancelled();
+            }
+        });
+    }
+
+    private void addAcceptHeader(HttpUriRequest outboundRequest) {
+        outboundRequest.setHeader(HttpHeaders.ACCEPT, 
SSEDispatch.TEXT_EVENT_STREAM_VALUE);
+    }
+
+    private void handleOkResponse(HttpServletResponse outboundResponse, URI 
url, HttpResponse inboundResponse) {
+        this.prepareServletResponse(outboundResponse, 
inboundResponse.getStatusLine().getStatusCode());
+        this.copyResponseHeaderFields(outboundResponse, inboundResponse);
+        auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, 
ActionOutcome.SUCCESS, RES.responseStatus(HttpStatus.SC_OK));
+    }
+
+    private void handleErrorResponse(HttpServletResponse outboundResponse, URI 
url, HttpResponse httpResponse) {
+        int statusCode = httpResponse.getStatusLine().getStatusCode();
+        outboundResponse.setStatus(statusCode);
+        LOG.dispatchResponseStatusCode(statusCode);
+        auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, 
ActionOutcome.FAILURE, RES.responseStatus(statusCode));
+    }
+
+    private void prepareServletResponse(HttpServletResponse outboundResponse, 
int statusCode) {
+        LOG.dispatchResponseStatusCode(statusCode);
+        outboundResponse.setStatus(statusCode);
+        outboundResponse.setCharacterEncoding(StandardCharsets.UTF_8.name());
+    }
+
+    private boolean isSuccessful(int statusCode) {
+        return (statusCode >= HttpStatus.SC_OK && statusCode < 300);
+    }
+
+    private void closeProducer(HttpAsyncRequestProducer producer) {
+        try {
+            producer.close();
+        } catch (IOException e) {
+            LOG.sseProducerCloseError(e);
+        }
+    }
+
+    private class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
+        private SSEResponse sseResponse;
+        private final HttpServletResponse outboundResponse;
+        private final URI url;
+        private final AsyncContext asyncContext;
+
+        SSECharConsumer(HttpServletResponse outboundResponse, URI url, 
AsyncContext asyncContext) {
+            this.outboundResponse = outboundResponse;
+            this.url = url;
+            this.asyncContext = asyncContext;
+        }
+
+        @Override
+        protected void onResponseReceived(final HttpResponse inboundResponse) {
+            this.sseResponse = new SSEResponse(inboundResponse);
+            if (isSuccessful(inboundResponse.getStatusLine().getStatusCode())) 
{
+                handleOkResponse(outboundResponse, url, inboundResponse);

Review Comment:
   isSuccessful() returns true for any 20x status code, not only 200 OK. 
However, the handleOkResponse() method name implies it is only for 200 OK 
responses. Could rather be simply handleResponse(), or even 
handleSuccessResponse() in my opinion.



##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.AbstractHttpEntity;
+
+import javax.servlet.AsyncContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.CharBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class SSEEntity extends AbstractHttpEntity {
+
+    private static final String SSE_DELIMITER = ":";
+
+    private final BlockingQueue<SSEvent> eventQueue;
+    private final StringBuilder eventBuilder = new StringBuilder();
+    private final HttpEntity httpEntity;
+    private char previousChar = '0';
+
+    public SSEEntity(HttpEntity httpEntity) {
+        this.httpEntity = httpEntity;
+        this.eventQueue = new LinkedBlockingQueue<>();
+    }
+
+    public boolean readCharBuffer(CharBuffer charBuffer) {
+        while (charBuffer.hasRemaining()) {
+            processChar(charBuffer.get());
+        }
+        return !this.eventQueue.isEmpty();
+    }
+
+    //Two new line chars (\n\n) after each other means the event is finished 
streaming
+    //We can process it and add it to the event queue
+    private void processChar(char nextChar) {
+        if (isNewLineChar(nextChar) && isNewLineChar(this.previousChar)) {
+            this.processEvent();

Review Comment:
   Is the "this" necessary in these statements?



##########
gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java:
##########
@@ -217,15 +218,28 @@ public void doMethod(Dispatch dispatch, 
HttpServletRequest request, HttpServletR
     }
   }
 
-  private <T> T newInstanceFromName(String dispatchImpl) throws 
ServletException {
+  private <T> T newInstanceFromName(String dispatchImpl, FilterConfig 
filterConfig) throws ServletException {
     try {
       Class<T> clazz = loadClass(dispatchImpl);

Review Comment:
   Can this ever return null? Or in every case, an Exception will be thrown?



##########
gateway-release/home/conf/gateway-site.xml:
##########
@@ -208,4 +208,10 @@ limitations under the License.
         <description>Add service name to x-forward-context header for the list 
of services defined above.</description>
     </property>
 
+    <!

Issue Time Tracking
-------------------

    Worklog Id:     (was: 942558)
    Time Spent: 1h 10m  (was: 1h)

> Support Server Sent Events (SSE) in Knox
> ----------------------------------------
>
>                 Key: KNOX-3065
>                 URL: https://issues.apache.org/jira/browse/KNOX-3065
>             Project: Apache Knox
>          Issue Type: New Feature
>          Components: Server
>    Affects Versions: 2.1.0
>            Reporter: Tamás Hanicz
>            Priority: Major
>          Time Spent: 1h 10m
>  Remaining Estimate: 0h
>
> Currently Knox is not working with SSE. In case of SSE the response header 
> contains Content-Type = text/event-stream, in which case the server will send 
> messages terminated by \n\n. Knox should send these messages to the client as 
> they arrive. Currently Knox collects them all, and once the server closes the 
> connection sends all of them concatenated to each other to the client.
> Requirement:
>  * Full support in Knox for Server Sent Events.
> Definition of done:
>  * Clients can connect to SSE endpoints in the backend via Knox.
>  * Server Sent Events 
> ([https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events])
>  are forwarded by Knox from the server to the client as soon as they are 
> received.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to