pzampino commented on code in PR #947:
URL: https://github.com/apache/knox/pull/947#discussion_r1831518329


##########
gateway-spi/src/main/java/org/apache/knox/gateway/SpiGatewayMessages.java:
##########
@@ -123,4 +123,19 @@ public interface SpiGatewayMessages {
 
   @Message( level = MessageLevel.ERROR, text = "No valid principal found" )
   void noPrincipalFound();
+
+  @Message( level = MessageLevel.INFO, text = "Every event was read from the 
stream" )

Review Comment:
   The content of these messages should indicate that they're in the context of 
SSE since the logger itself does not give enough context IMO.



##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+    private final HttpAsyncClient asyncClient;
+    private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+    public SSEDispatch(FilterConfig filterConfig) {
+        HttpAsyncClientFactory asyncClientFactory = new 
DefaultHttpAsyncClientFactory();
+        this.asyncClient = 
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+        if (asyncClient instanceof CloseableHttpAsyncClient) {
+            ((CloseableHttpAsyncClient) this.asyncClient).start();
+        }
+    }
+
+    @Override
+    public void doGet(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpGet httpGetRequest = new HttpGet(url);
+        this.commonBaseMethod(httpGetRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPost(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException, URISyntaxException {
+        final HttpPost httpPostRequest = new HttpPost(url);
+        httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPostRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPut(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPut httpPutRequest = new HttpPut(url);
+        httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPutRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPatch(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPatch httpPatchRequest = new HttpPatch(url);
+        httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPatchRequest, inboundRequest, 
outboundResponse);
+    }
+
+    private void commonBaseMethod(HttpUriRequest httpMethod, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws 
IOException {

Review Comment:
   nit: I don't love this method name. Maybe something like doHttpMethod() is 
better?



##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+    private final HttpAsyncClient asyncClient;
+    private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+    public SSEDispatch(FilterConfig filterConfig) {
+        HttpAsyncClientFactory asyncClientFactory = new 
DefaultHttpAsyncClientFactory();
+        this.asyncClient = 
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+        if (asyncClient instanceof CloseableHttpAsyncClient) {
+            ((CloseableHttpAsyncClient) this.asyncClient).start();
+        }
+    }
+
+    @Override
+    public void doGet(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpGet httpGetRequest = new HttpGet(url);
+        this.commonBaseMethod(httpGetRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPost(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException, URISyntaxException {
+        final HttpPost httpPostRequest = new HttpPost(url);
+        httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPostRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPut(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPut httpPutRequest = new HttpPut(url);
+        httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPutRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPatch(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPatch httpPatchRequest = new HttpPatch(url);
+        httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPatchRequest, inboundRequest, 
outboundResponse);
+    }
+
+    private void commonBaseMethod(HttpUriRequest httpMethod, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws 
IOException {
+        this.addAcceptHeader(httpMethod);
+        this.copyRequestHeaderFields(httpMethod, inboundRequest);
+        this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+    }
+
+    private void executeRequest(HttpUriRequest outboundRequest, 
HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+        AsyncContext asyncContext = inboundRequest.startAsync();
+        //No timeout
+        asyncContext.setTimeout(0L);
+
+        HttpAsyncRequestProducer producer = 
HttpAsyncMethods.create(outboundRequest);
+        AsyncCharConsumer<SSEResponse> consumer = new 
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext);
+        this.getSSEConnection(producer, consumer, outboundRequest);
+    }
+
+    private void getSSEConnection(HttpAsyncRequestProducer producer, 
AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) {
+        LOG.dispatchRequest(outboundRequest.getMethod(), 
outboundRequest.getURI());
+        auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), 
ResourceType.URI, ActionOutcome.UNAVAILABLE, 
RES.requestMethod(outboundRequest.getMethod()));
+        asyncClient.execute(producer, consumer, new 
FutureCallback<SSEResponse>() {
+
+            @Override
+            public void completed(final SSEResponse response) {
+                closeProducer(producer);
+                LOG.sseConnectionDone();
+            }
+
+            @Override
+            public void failed(final Exception ex) {
+                closeProducer(producer);
+                LOG.sseConnectionError(ex.getMessage());
+            }
+
+            @Override
+            public void cancelled() {
+                closeProducer(producer);
+                LOG.sseConnectionCancelled();
+            }
+        });
+    }
+
+    private void addAcceptHeader(HttpUriRequest outboundRequest) {
+        outboundRequest.setHeader(HttpHeaders.ACCEPT, 
SSEDispatch.TEXT_EVENT_STREAM_VALUE);
+    }
+
+    private void handleOkResponse(HttpServletResponse outboundResponse, URI 
url, HttpResponse inboundResponse) {
+        this.prepareServletResponse(outboundResponse, 
inboundResponse.getStatusLine().getStatusCode());
+        this.copyResponseHeaderFields(outboundResponse, inboundResponse);
+        auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, 
ActionOutcome.SUCCESS, RES.responseStatus(HttpStatus.SC_OK));
+    }
+
+    private void handleErrorResponse(HttpServletResponse outboundResponse, URI 
url, HttpResponse httpResponse) {
+        int statusCode = httpResponse.getStatusLine().getStatusCode();
+        outboundResponse.setStatus(statusCode);
+        LOG.dispatchResponseStatusCode(statusCode);
+        auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, 
ActionOutcome.FAILURE, RES.responseStatus(statusCode));
+    }
+
+    private void prepareServletResponse(HttpServletResponse outboundResponse, 
int statusCode) {
+        LOG.dispatchResponseStatusCode(statusCode);
+        outboundResponse.setStatus(statusCode);
+        outboundResponse.setCharacterEncoding(StandardCharsets.UTF_8.name());
+    }
+
+    private boolean isSuccessful(int statusCode) {
+        return (statusCode >= HttpStatus.SC_OK && statusCode < 300);
+    }
+
+    private void closeProducer(HttpAsyncRequestProducer producer) {
+        try {
+            producer.close();
+        } catch (IOException e) {
+            LOG.sseProducerCloseError(e);
+        }
+    }
+
+    private class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
+        private SSEResponse sseResponse;
+        private final HttpServletResponse outboundResponse;
+        private final URI url;
+        private final AsyncContext asyncContext;
+
+        SSECharConsumer(HttpServletResponse outboundResponse, URI url, 
AsyncContext asyncContext) {
+            this.outboundResponse = outboundResponse;
+            this.url = url;
+            this.asyncContext = asyncContext;
+        }
+
+        @Override
+        protected void onResponseReceived(final HttpResponse inboundResponse) {
+            this.sseResponse = new SSEResponse(inboundResponse);
+            if (isSuccessful(inboundResponse.getStatusLine().getStatusCode())) 
{
+                handleOkResponse(outboundResponse, url, inboundResponse);

Review Comment:
   isSuccessful() returns true for any 20x status code, not only 200 OK. 
However, the handleOkResponse() method name implies it is only for 200 OK 
responses. Could rather be simply handleResponse(), or even 
handleSuccessResponse() in my opinion.



##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.AbstractHttpEntity;
+
+import javax.servlet.AsyncContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.CharBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class SSEEntity extends AbstractHttpEntity {
+
+    private static final String SSE_DELIMITER = ":";
+
+    private final BlockingQueue<SSEvent> eventQueue;
+    private final StringBuilder eventBuilder = new StringBuilder();
+    private final HttpEntity httpEntity;
+    private char previousChar = '0';
+
+    public SSEEntity(HttpEntity httpEntity) {
+        this.httpEntity = httpEntity;
+        this.eventQueue = new LinkedBlockingQueue<>();
+    }
+
+    public boolean readCharBuffer(CharBuffer charBuffer) {
+        while (charBuffer.hasRemaining()) {
+            processChar(charBuffer.get());
+        }
+        return !this.eventQueue.isEmpty();
+    }
+
+    //Two new line chars (\n\n) after each other means the event is finished 
streaming
+    //We can process it and add it to the event queue
+    private void processChar(char nextChar) {
+        if (isNewLineChar(nextChar) && isNewLineChar(this.previousChar)) {
+            this.processEvent();

Review Comment:
   Is the "this" necessary in these statements?



##########
gateway-spi/src/main/java/org/apache/knox/gateway/dispatch/GatewayDispatchFilter.java:
##########
@@ -217,15 +218,28 @@ public void doMethod(Dispatch dispatch, 
HttpServletRequest request, HttpServletR
     }
   }
 
-  private <T> T newInstanceFromName(String dispatchImpl) throws 
ServletException {
+  private <T> T newInstanceFromName(String dispatchImpl, FilterConfig 
filterConfig) throws ServletException {
     try {
       Class<T> clazz = loadClass(dispatchImpl);

Review Comment:
   Can this ever return null? Or in every case, an Exception will be thrown?



##########
gateway-release/home/conf/gateway-site.xml:
##########
@@ -208,4 +208,10 @@ limitations under the License.
         <description>Add service name to x-forward-context header for the list 
of services defined above.</description>
     </property>
 
+    <!-- Async support required for SSE -->

Review Comment:
   Why adding this explicitly disabled? Does it not default to false?



##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+    private final HttpAsyncClient asyncClient;
+    private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+    public SSEDispatch(FilterConfig filterConfig) {
+        HttpAsyncClientFactory asyncClientFactory = new 
DefaultHttpAsyncClientFactory();
+        this.asyncClient = 
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+        if (asyncClient instanceof CloseableHttpAsyncClient) {
+            ((CloseableHttpAsyncClient) this.asyncClient).start();
+        }
+    }
+
+    @Override
+    public void doGet(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpGet httpGetRequest = new HttpGet(url);
+        this.commonBaseMethod(httpGetRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPost(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException, URISyntaxException {
+        final HttpPost httpPostRequest = new HttpPost(url);
+        httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPostRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPut(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPut httpPutRequest = new HttpPut(url);
+        httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPutRequest, inboundRequest, 
outboundResponse);
+    }
+
+    @Override
+    public void doPatch(URI url, HttpServletRequest inboundRequest, 
HttpServletResponse outboundResponse)
+            throws IOException {
+        final HttpPatch httpPatchRequest = new HttpPatch(url);
+        httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+        this.commonBaseMethod(httpPatchRequest, inboundRequest, 
outboundResponse);
+    }
+
+    private void commonBaseMethod(HttpUriRequest httpMethod, 
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws 
IOException {
+        this.addAcceptHeader(httpMethod);
+        this.copyRequestHeaderFields(httpMethod, inboundRequest);
+        this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+    }
+
+    private void executeRequest(HttpUriRequest outboundRequest, 
HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+        AsyncContext asyncContext = inboundRequest.startAsync();
+        //No timeout
+        asyncContext.setTimeout(0L);
+
+        HttpAsyncRequestProducer producer = 
HttpAsyncMethods.create(outboundRequest);
+        AsyncCharConsumer<SSEResponse> consumer = new 
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext);
+        this.getSSEConnection(producer, consumer, outboundRequest);
+    }
+
+    private void getSSEConnection(HttpAsyncRequestProducer producer, 
AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) {

Review Comment:
   This method is named getSSEConnection(), but it does not return a connection 
(or anything). Maybe executeAsyncRequest()?



##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEEntity.java:
##########
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpEntity;
+import org.apache.http.entity.AbstractHttpEntity;
+
+import javax.servlet.AsyncContext;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.CharBuffer;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+public class SSEEntity extends AbstractHttpEntity {
+
+    private static final String SSE_DELIMITER = ":";
+
+    private final BlockingQueue<SSEvent> eventQueue;
+    private final StringBuilder eventBuilder = new StringBuilder();
+    private final HttpEntity httpEntity;
+    private char previousChar = '0';
+
+    public SSEEntity(HttpEntity httpEntity) {
+        this.httpEntity = httpEntity;
+        this.eventQueue = new LinkedBlockingQueue<>();
+    }
+
+    public boolean readCharBuffer(CharBuffer charBuffer) {
+        while (charBuffer.hasRemaining()) {
+            processChar(charBuffer.get());
+        }
+        return !this.eventQueue.isEmpty();

Review Comment:
   Is the "this" keyword necessary here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@knox.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to