[ https://issues.apache.org/jira/browse/KNOX-3065?focusedWorklogId=942746&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-942746 ]
ASF GitHub Bot logged work on KNOX-3065: ---------------------------------------- Author: ASF GitHub Bot Created on: 08/Nov/24 09:55 Start Date: 08/Nov/24 09:55 Worklog Time Spent: 10m Work Description: hanicz commented on code in PR #947: URL: https://github.com/apache/knox/pull/947#discussion_r1834047682 ########## gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.sse; + +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPatch; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.client.HttpAsyncClient; +import org.apache.http.nio.client.methods.AsyncCharConsumer; +import org.apache.http.nio.client.methods.HttpAsyncMethods; +import org.apache.http.nio.protocol.HttpAsyncRequestProducer; +import org.apache.http.protocol.HttpContext; +import org.apache.knox.gateway.audit.api.Action; +import org.apache.knox.gateway.audit.api.ActionOutcome; +import org.apache.knox.gateway.audit.api.ResourceType; +import org.apache.knox.gateway.dispatch.ConfigurableDispatch; +import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory; +import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory; + +import javax.servlet.AsyncContext; +import javax.servlet.FilterConfig; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; + +public class SSEDispatch extends ConfigurableDispatch { + + private final HttpAsyncClient asyncClient; + private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream"; + + public SSEDispatch(FilterConfig filterConfig) { + HttpAsyncClientFactory asyncClientFactory = new DefaultHttpAsyncClientFactory(); + this.asyncClient = asyncClientFactory.createAsyncHttpClient(filterConfig); + + if (asyncClient instanceof CloseableHttpAsyncClient) { + ((CloseableHttpAsyncClient) this.asyncClient).start(); + } + } + + @Override + public void doGet(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpGet httpGetRequest = new HttpGet(url); + this.commonBaseMethod(httpGetRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPost(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException, URISyntaxException { + final HttpPost httpPostRequest = new HttpPost(url); + httpPostRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.commonBaseMethod(httpPostRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPut(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpPut httpPutRequest = new HttpPut(url); + httpPutRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.commonBaseMethod(httpPutRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPatch(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpPatch httpPatchRequest = new HttpPatch(url); + httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.commonBaseMethod(httpPatchRequest, inboundRequest, outboundResponse); + } + + private void commonBaseMethod(HttpUriRequest httpMethod, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException { + this.addAcceptHeader(httpMethod); + this.copyRequestHeaderFields(httpMethod, inboundRequest); + this.executeRequest(httpMethod, outboundResponse, inboundRequest); + } + + private void executeRequest(HttpUriRequest outboundRequest, HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) { + AsyncContext asyncContext = inboundRequest.startAsync(); + //No timeout + asyncContext.setTimeout(0L); + + HttpAsyncRequestProducer producer = HttpAsyncMethods.create(outboundRequest); + AsyncCharConsumer<SSEResponse> consumer = new SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext); + this.getSSEConnection(producer, consumer, outboundRequest); + } + + private void getSSEConnection(HttpAsyncRequestProducer producer, AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) { + LOG.dispatchRequest(outboundRequest.getMethod(), outboundRequest.getURI()); + auditor.audit(Action.DISPATCH, outboundRequest.getURI().toString(), ResourceType.URI, ActionOutcome.UNAVAILABLE, RES.requestMethod(outboundRequest.getMethod())); + asyncClient.execute(producer, consumer, new FutureCallback<SSEResponse>() { + + @Override + public void completed(final SSEResponse response) { + closeProducer(producer); + LOG.sseConnectionDone(); + } + + @Override + public void failed(final Exception ex) { + closeProducer(producer); + LOG.sseConnectionError(ex.getMessage()); + } + + @Override + public void cancelled() { + closeProducer(producer); + LOG.sseConnectionCancelled(); + } + }); + } + + private void addAcceptHeader(HttpUriRequest outboundRequest) { + outboundRequest.setHeader(HttpHeaders.ACCEPT, SSEDispatch.TEXT_EVENT_STREAM_VALUE); + } + + private void handleOkResponse(HttpServletResponse outboundResponse, URI url, HttpResponse inboundResponse) { + this.prepareServletResponse(outboundResponse, inboundResponse.getStatusLine().getStatusCode()); + this.copyResponseHeaderFields(outboundResponse, inboundResponse); + auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, ActionOutcome.SUCCESS, RES.responseStatus(HttpStatus.SC_OK)); + } + + private void handleErrorResponse(HttpServletResponse outboundResponse, URI url, HttpResponse httpResponse) { + int statusCode = httpResponse.getStatusLine().getStatusCode(); + outboundResponse.setStatus(statusCode); + LOG.dispatchResponseStatusCode(statusCode); + auditor.audit(Action.DISPATCH, url.toString(), ResourceType.URI, ActionOutcome.FAILURE, RES.responseStatus(statusCode)); + } + + private void prepareServletResponse(HttpServletResponse outboundResponse, int statusCode) { + LOG.dispatchResponseStatusCode(statusCode); + outboundResponse.setStatus(statusCode); + outboundResponse.setCharacterEncoding(StandardCharsets.UTF_8.name()); + } + + private boolean isSuccessful(int statusCode) { + return (statusCode >= HttpStatus.SC_OK && statusCode < 300); + } + + private void closeProducer(HttpAsyncRequestProducer producer) { + try { + producer.close(); + } catch (IOException e) { + LOG.sseProducerCloseError(e); + } + } + + private class SSECharConsumer extends AsyncCharConsumer<SSEResponse> { + private SSEResponse sseResponse; + private final HttpServletResponse outboundResponse; + private final URI url; + private final AsyncContext asyncContext; + + SSECharConsumer(HttpServletResponse outboundResponse, URI url, AsyncContext asyncContext) { + this.outboundResponse = outboundResponse; + this.url = url; + this.asyncContext = asyncContext; + } + + @Override + protected void onResponseReceived(final HttpResponse inboundResponse) { + this.sseResponse = new SSEResponse(inboundResponse); + if (isSuccessful(inboundResponse.getStatusLine().getStatusCode())) { + handleOkResponse(outboundResponse, url, inboundResponse); Review Comment: Renamed it to handleSuccessResponse Issue Time Tracking ------------------- Worklog Id: (was: 942746) Time Spent: 2h (was: 1h 50m) > Support Server Sent Events (SSE) in Knox > ---------------------------------------- > > Key: KNOX-3065 > URL: https://issues.apache.org/jira/browse/KNOX-3065 > Project: Apache Knox > Issue Type: New Feature > Components: Server > Affects Versions: 2.1.0 > Reporter: Tamás Hanicz > Priority: Major > Time Spent: 2h > Remaining Estimate: 0h > > Currently Knox is not working with SSE. In case of SSE the response header > contains Content-Type = text/event-stream, in which case the server will send > messages terminated by \n\n. Knox should send these messages to the client as > they arrive. Currently Knox collects them all, and once the server closes the > connection sends all of them concatenated to each other to the client. > Requirement: > * Full support in Knox for Server Sent Events. > Definition of done: > * Clients can connect to SSE endpoints in the backend via Knox. > * Server Sent Events > ([https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events]) > are forwarded by Knox from the server to the client as soon as they are > received. -- This message was sent by Atlassian Jira (v8.20.10#820010)