[
https://issues.apache.org/jira/browse/KNOX-3065?focusedWorklogId=942746&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-942746
]
ASF GitHub Bot logged work on KNOX-3065:
----------------------------------------
Author: ASF GitHub Bot
Created on: 08/Nov/24 09:55
Start Date: 08/Nov/24 09:55
Worklog Time Spent: 10m
Work Description: hanicz commented on code in PR #947:
URL: https://github.com/apache/knox/pull/947#discussion_r1834047682
##########
gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.knox.gateway.sse;
+
+import org.apache.http.HttpHeaders;
+import org.apache.http.HttpResponse;
+import org.apache.http.HttpStatus;
+import org.apache.http.client.methods.HttpGet;
+import org.apache.http.client.methods.HttpPatch;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.client.methods.HttpPut;
+import org.apache.http.client.methods.HttpUriRequest;
+import org.apache.http.concurrent.FutureCallback;
+import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
+import org.apache.http.nio.IOControl;
+import org.apache.http.nio.client.HttpAsyncClient;
+import org.apache.http.nio.client.methods.AsyncCharConsumer;
+import org.apache.http.nio.client.methods.HttpAsyncMethods;
+import org.apache.http.nio.protocol.HttpAsyncRequestProducer;
+import org.apache.http.protocol.HttpContext;
+import org.apache.knox.gateway.audit.api.Action;
+import org.apache.knox.gateway.audit.api.ActionOutcome;
+import org.apache.knox.gateway.audit.api.ResourceType;
+import org.apache.knox.gateway.dispatch.ConfigurableDispatch;
+import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory;
+import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory;
+
+import javax.servlet.AsyncContext;
+import javax.servlet.FilterConfig;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+
+public class SSEDispatch extends ConfigurableDispatch {
+
+ private final HttpAsyncClient asyncClient;
+ private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream";
+
+ public SSEDispatch(FilterConfig filterConfig) {
+ HttpAsyncClientFactory asyncClientFactory = new
DefaultHttpAsyncClientFactory();
+ this.asyncClient =
asyncClientFactory.createAsyncHttpClient(filterConfig);
+
+ if (asyncClient instanceof CloseableHttpAsyncClient) {
+ ((CloseableHttpAsyncClient) this.asyncClient).start();
+ }
+ }
+
+ @Override
+ public void doGet(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpGet httpGetRequest = new HttpGet(url);
+ this.commonBaseMethod(httpGetRequest, inboundRequest,
outboundResponse);
+ }
+
+ @Override
+ public void doPost(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException, URISyntaxException {
+ final HttpPost httpPostRequest = new HttpPost(url);
+ httpPostRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.commonBaseMethod(httpPostRequest, inboundRequest,
outboundResponse);
+ }
+
+ @Override
+ public void doPut(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpPut httpPutRequest = new HttpPut(url);
+ httpPutRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.commonBaseMethod(httpPutRequest, inboundRequest,
outboundResponse);
+ }
+
+ @Override
+ public void doPatch(URI url, HttpServletRequest inboundRequest,
HttpServletResponse outboundResponse)
+ throws IOException {
+ final HttpPatch httpPatchRequest = new HttpPatch(url);
+ httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest));
+ this.commonBaseMethod(httpPatchRequest, inboundRequest,
outboundResponse);
+ }
+
+ private void commonBaseMethod(HttpUriRequest httpMethod,
HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws
IOException {
+ this.addAcceptHeader(httpMethod);
+ this.copyRequestHeaderFields(httpMethod, inboundRequest);
+ this.executeRequest(httpMethod, outboundResponse, inboundRequest);
+ }
+
+ private void executeRequest(HttpUriRequest outboundRequest,
HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) {
+ AsyncContext asyncContext = inboundRequest.startAsync();
+ //No timeout
+ asyncContext.setTimeout(0L);
+
+ HttpAsyncRequestProducer producer =
HttpAsyncMethods.create(outboundRequest);
+ AsyncCharConsumer<SSEResponse> consumer = new
SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext);
+ this.getSSEConnection(producer, consumer, outboundRequest);
+ }
+
+ private void getSSEConnection(HttpAsyncRequestProducer producer,
AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) {
+ LOG.dispatchRequest(outboundRequest.getMethod(),
outboundRequest.getURI());
+ auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(),
ResourceType.URI, ActionOutcome.UNAVAILABLE,
RES.requestMethod(outboundRequest.getMethod()));
+ asyncClient.execute(producer, consumer, new
FutureCallback<SSEResponse>() {
+
+ @Override
+ public void completed(final SSEResponse response) {
+ closeProducer(producer);
+ LOG.sseConnectionDone();
+ }
+
+ @Override
+ public void failed(final Exception ex) {
+ closeProducer(producer);
+ LOG.sseConnectionError(ex.getMessage());
+ }
+
+ @Override
+ public void cancelled() {
+ closeProducer(producer);
+ LOG.sseConnectionCancelled();
+ }
+ });
+ }
+
+ private void addAcceptHeader(HttpUriRequest outboundRequest) {
+ outboundRequest.setHeader(HttpHeaders.ACCEPT,
SSEDispatch.TEXT_EVENT_STREAM_VALUE);
+ }
+
+ private void handleOkResponse(HttpServletResponse outboundResponse, URI
url, HttpResponse inboundResponse) {
+ this.prepareServletResponse(outboundResponse,
inboundResponse.getStatusLine().getStatusCode());
+ this.copyResponseHeaderFields(outboundResponse, inboundResponse);
+ auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI,
ActionOutcome.SUCCESS, RES.responseStatus(HttpStatus.SC_OK));
+ }
+
+ private void handleErrorResponse(HttpServletResponse outboundResponse, URI
url, HttpResponse httpResponse) {
+ int statusCode = httpResponse.getStatusLine().getStatusCode();
+ outboundResponse.setStatus(statusCode);
+ LOG.dispatchResponseStatusCode(statusCode);
+ auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI,
ActionOutcome.FAILURE, RES.responseStatus(statusCode));
+ }
+
+ private void prepareServletResponse(HttpServletResponse outboundResponse,
int statusCode) {
+ LOG.dispatchResponseStatusCode(statusCode);
+ outboundResponse.setStatus(statusCode);
+ outboundResponse.setCharacterEncoding(StandardCharsets.UTF_8.name());
+ }
+
+ private boolean isSuccessful(int statusCode) {
+ return (statusCode >= HttpStatus.SC_OK && statusCode < 300);
+ }
+
+ private void closeProducer(HttpAsyncRequestProducer producer) {
+ try {
+ producer.close();
+ } catch (IOException e) {
+ LOG.sseProducerCloseError(e);
+ }
+ }
+
+ private class SSECharConsumer extends AsyncCharConsumer<SSEResponse> {
+ private SSEResponse sseResponse;
+ private final HttpServletResponse outboundResponse;
+ private final URI url;
+ private final AsyncContext asyncContext;
+
+ SSECharConsumer(HttpServletResponse outboundResponse, URI url,
AsyncContext asyncContext) {
+ this.outboundResponse = outboundResponse;
+ this.url = url;
+ this.asyncContext = asyncContext;
+ }
+
+ @Override
+ protected void onResponseReceived(final HttpResponse inboundResponse) {
+ this.sseResponse = new SSEResponse(inboundResponse);
+ if (isSuccessful(inboundResponse.getStatusLine().getStatusCode()))
{
+ handleOkResponse(outboundResponse, url, inboundResponse);
Review Comment:
Renamed it to handleSuccessResponse
Issue Time Tracking
-------------------
Worklog Id: (was: 942746)
Time Spent: 2h (was: 1h 50m)
> Support Server Sent Events (SSE) in Knox
> ----------------------------------------
>
> Key: KNOX-3065
> URL: https://issues.apache.org/jira/browse/KNOX-3065
> Project: Apache Knox
> Issue Type: New Feature
> Components: Server
> Affects Versions: 2.1.0
> Reporter: Tamás Hanicz
> Priority: Major
> Time Spent: 2h
> Remaining Estimate: 0h
>
> Currently Knox is not working with SSE. In case of SSE the response header
> contains Content-Type = text/event-stream, in which case the server will send
> messages terminated by \n\n. Knox should send these messages to the client as
> they arrive. Currently Knox collects them all, and once the server closes the
> connection sends all of them concatenated to each other to the client.
> Requirement:
> * Full support in Knox for Server Sent Events.
> Definition of done:
> * Clients can connect to SSE endpoints in the backend via Knox.
> * Server Sent Events
> ([https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events])
> are forwarded by Knox from the server to the client as soon as they are
> received.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)