[ 
https://issues.apache.org/jira/browse/KNOX-3065?focusedWorklogId=942744&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-942744
 ]

ASF GitHub Bot logged work on KNOX-3065:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 08/Nov/24 09:55
            Start Date: 08/Nov/24 09:55
    Worklog Time Spent: 10m 
      Work Description: hanicz commented on code in PR #947:
URL: https://github.com/apache/knox/pull/947#discussion_r1834046591


##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+    private final HttpAsyncClient asyncClient;
+    private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+    public SSEDispatch(FilterConfig filterConfig) {
+        HttpAsyncClientFactory asyncClientFactory = new 
DefaultHttpAsyncClientFactory();
+        this.asyncClient = 
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+        if (asyncClient instanceof CloseableHttpAsyncClient) {
+            ((CloseableHttpAsyncClient) this.asyncClient).start();
+        }
+    }
+
+    @Override
+    public void doGet(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpGet httpGetRequest = new HttpGet(url);
+        this.commonBaseMethod(httpGetRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPost(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException, URISyntaxException {
+        final HttpPost httpPostRequest = new HttpPost(url);
+        httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPostRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPut(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPut httpPutRequest = new HttpPut(url);
+        httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPutRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPatch(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPatch httpPatchRequest = new HttpPatch(url);
+        httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPatchRequest, inboundRequest, 
outboundResponse);
+    }
+
+    private void commonBaseMethod(HttpUriRequest httpMethod, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws 
IOException {

Review Comment:
   Changed it.



##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+    private final HttpAsyncClient asyncClient;
+    private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+    public SSEDispatch(FilterConfig filterConfig) {
+        HttpAsyncClientFactory asyncClientFactory = new 
DefaultHttpAsyncClientFactory();
+        this.asyncClient = 
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+        if (asyncClient instanceof CloseableHttpAsyncClient) {
+            ((CloseableHttpAsyncClient) this.asyncClient).start();
+        }
+    }
+
+    @Override
+    public void doGet(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpGet httpGetRequest = new HttpGet(url);
+        this.commonBaseMethod(httpGetRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPost(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException, URISyntaxException {
+        final HttpPost httpPostRequest = new HttpPost(url);
+        httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPostRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPut(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPut httpPutRequest = new HttpPut(url);
+        httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPutRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPatch(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPatch httpPatchRequest = new HttpPatch(url);
+        httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPatchRequest, inboundRequest, 
outboundResponse);
+    }
+
+    private void commonBaseMethod(HttpUriRequest httpMethod, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws 
IOException {
+        this.addAcceptHeader(httpMethod);
+        this.copyRequestHeaderFields(httpMethod, inboundRequest);
+        this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+    }
+
+    private void executeRequest(HttpUriRequest outboundRequest, 
HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+        AsyncContext asyncContext = inboundRequest.startAsync();
+        //No timeout
+        asyncContext.setTimeout(0L);
+
+        HttpAsyncRequestProducer producer = 
HttpAsyncMethods.create(outboundRequest);
+        AsyncCharConsumer<SSEResponse> consumer = new 
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext);
+        this.getSSEConnection(producer, consumer, outboundRequest);
+    }
+
+    private void getSSEConnection(HttpAsyncRequestProducer producer, 
AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) {

Review Comment:
   Changed it.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 942744)
    Time Spent: 1h 40m  (was: 1.5h)

> Support Server Sent Events (SSE) in Knox
> ----------------------------------------
>
>                 Key: KNOX-3065
>                 URL: https://issues.apache.org/jira/browse/KNOX-3065
>             Project: Apache Knox
>          Issue Type: New Feature
>          Components: Server
>    Affects Versions: 2.1.0
>            Reporter: Tamás Hanicz
>            Priority: Major
>          Time Spent: 1h 40m
>  Remaining Estimate: 0h
>
> Currently Knox is not working with SSE. In case of SSE the response header 
> contains Content-Type = text/event-stream, in which case the server will send 
> messages terminated by \n\n. Knox should send these messages to the client as 
> they arrive. Currently Knox collects them all, and once the server closes the 
> connection sends all of them concatenated to each other to the client.
> Requirement:
>  * Full support in Knox for Server Sent Events.
> Definition of done:
>  * Clients can connect to SSE endpoints in the backend via Knox.
>  * Server Sent Events 
> ([https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events])
>  are forwarded by Knox from the server to the client as soon as they are 
> received.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to