[ https://issues.apache.org/jira/browse/KNOX-3065?focusedWorklogId=942744&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-942744 ]
ASF GitHub Bot logged work on KNOX-3065: ---------------------------------------- Author: ASF GitHub Bot Created on: 08/Nov/24 09:55 Start Date: 08/Nov/24 09:55 Worklog Time Spent: 10m Work Description: hanicz commented on code in PR #947: URL: https://github.com/apache/knox/pull/947#discussion_r1834046591 ########## gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.sse; + +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPatch; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.client.HttpAsyncClient; +import org.apache.http.nio.client.methods.AsyncCharConsumer; +import org.apache.http.nio.client.methods.HttpAsyncMethods; +import org.apache.http.nio.protocol.HttpAsyncRequestProducer; +import org.apache.http.protocol.HttpContext; +import org.apache.knox.gateway.audit.api.Action; +import org.apache.knox.gateway.audit.api.ActionOutcome; +import org.apache.knox.gateway.audit.api.ResourceType; +import org.apache.knox.gateway.dispatch.ConfigurableDispatch; +import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory; +import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory; + +import javax.servlet.AsyncContext; +import javax.servlet.FilterConfig; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; + +public class SSEDispatch extends ConfigurableDispatch { + + private final HttpAsyncClient asyncClient; + private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream"; + + public SSEDispatch(FilterConfig filterConfig) { + HttpAsyncClientFactory asyncClientFactory = new DefaultHttpAsyncClientFactory(); + this.asyncClient = asyncClientFactory.createAsyncHttpClient(filterConfig); + + if (asyncClient instanceof CloseableHttpAsyncClient) { + ((CloseableHttpAsyncClient) this.asyncClient).start(); + } + } + + @Override + public void doGet(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpGet httpGetRequest = new HttpGet(url); + this.commonBaseMethod(httpGetRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPost(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException, URISyntaxException { + final HttpPost httpPostRequest = new HttpPost(url); + httpPostRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.commonBaseMethod(httpPostRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPut(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpPut httpPutRequest = new HttpPut(url); + httpPutRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.commonBaseMethod(httpPutRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPatch(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpPatch httpPatchRequest = new HttpPatch(url); + httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.commonBaseMethod(httpPatchRequest, inboundRequest, outboundResponse); + } + + private void commonBaseMethod(HttpUriRequest httpMethod, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException { Review Comment: Changed it. ########## gateway-spi/src/main/java/org/apache/knox/gateway/sse/SSEDispatch.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.knox.gateway.sse; + +import org.apache.http.HttpHeaders; +import org.apache.http.HttpResponse; +import org.apache.http.HttpStatus; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPatch; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.concurrent.FutureCallback; +import org.apache.http.impl.nio.client.CloseableHttpAsyncClient; +import org.apache.http.nio.IOControl; +import org.apache.http.nio.client.HttpAsyncClient; +import org.apache.http.nio.client.methods.AsyncCharConsumer; +import org.apache.http.nio.client.methods.HttpAsyncMethods; +import org.apache.http.nio.protocol.HttpAsyncRequestProducer; +import org.apache.http.protocol.HttpContext; +import org.apache.knox.gateway.audit.api.Action; +import org.apache.knox.gateway.audit.api.ActionOutcome; +import org.apache.knox.gateway.audit.api.ResourceType; +import org.apache.knox.gateway.dispatch.ConfigurableDispatch; +import org.apache.knox.gateway.dispatch.DefaultHttpAsyncClientFactory; +import org.apache.knox.gateway.dispatch.HttpAsyncClientFactory; + +import javax.servlet.AsyncContext; +import javax.servlet.FilterConfig; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; +import java.io.IOException; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.CharBuffer; +import java.nio.charset.StandardCharsets; + +public class SSEDispatch extends ConfigurableDispatch { + + private final HttpAsyncClient asyncClient; + private static final String TEXT_EVENT_STREAM_VALUE = "text/event-stream"; + + public SSEDispatch(FilterConfig filterConfig) { + HttpAsyncClientFactory asyncClientFactory = new DefaultHttpAsyncClientFactory(); + this.asyncClient = asyncClientFactory.createAsyncHttpClient(filterConfig); + + if (asyncClient instanceof CloseableHttpAsyncClient) { + ((CloseableHttpAsyncClient) this.asyncClient).start(); + } + } + + @Override + public void doGet(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpGet httpGetRequest = new HttpGet(url); + this.commonBaseMethod(httpGetRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPost(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException, URISyntaxException { + final HttpPost httpPostRequest = new HttpPost(url); + httpPostRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.commonBaseMethod(httpPostRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPut(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpPut httpPutRequest = new HttpPut(url); + httpPutRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.commonBaseMethod(httpPutRequest, inboundRequest, outboundResponse); + } + + @Override + public void doPatch(URI url, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) + throws IOException { + final HttpPatch httpPatchRequest = new HttpPatch(url); + httpPatchRequest.setEntity(this.createRequestEntity(inboundRequest)); + this.commonBaseMethod(httpPatchRequest, inboundRequest, outboundResponse); + } + + private void commonBaseMethod(HttpUriRequest httpMethod, HttpServletRequest inboundRequest, HttpServletResponse outboundResponse) throws IOException { + this.addAcceptHeader(httpMethod); + this.copyRequestHeaderFields(httpMethod, inboundRequest); + this.executeRequest(httpMethod, outboundResponse, inboundRequest); + } + + private void executeRequest(HttpUriRequest outboundRequest, HttpServletResponse outboundResponse, HttpServletRequest inboundRequest) { + AsyncContext asyncContext = inboundRequest.startAsync(); + //No timeout + asyncContext.setTimeout(0L); + + HttpAsyncRequestProducer producer = HttpAsyncMethods.create(outboundRequest); + AsyncCharConsumer<SSEResponse> consumer = new SSECharConsumer(outboundResponse, outboundRequest.getURI(), asyncContext); + this.getSSEConnection(producer, consumer, outboundRequest); + } + + private void getSSEConnection(HttpAsyncRequestProducer producer, AsyncCharConsumer<SSEResponse> consumer, HttpUriRequest outboundRequest) { Review Comment: Changed it. Issue Time Tracking ------------------- Worklog Id: (was: 942744) Time Spent: 1h 40m (was: 1.5h) > Support Server Sent Events (SSE) in Knox > ---------------------------------------- > > Key: KNOX-3065 > URL: https://issues.apache.org/jira/browse/KNOX-3065 > Project: Apache Knox > Issue Type: New Feature > Components: Server > Affects Versions: 2.1.0 > Reporter: Tamás Hanicz > Priority: Major > Time Spent: 1h 40m > Remaining Estimate: 0h > > Currently Knox is not working with SSE. In case of SSE the response header > contains Content-Type = text/event-stream, in which case the server will send > messages terminated by \n\n. Knox should send these messages to the client as > they arrive. Currently Knox collects them all, and once the server closes the > connection sends all of them concatenated to each other to the client. > Requirement: > * Full support in Knox for Server Sent Events. > Definition of done: > * Clients can connect to SSE endpoints in the backend via Knox. > * Server Sent Events > ([https://html.spec.whatwg.org/multipage/server-sent-events.html#server-sent-events]) > are forwarded by Knox from the server to the client as soon as they are > received. -- This message was sent by Atlassian Jira (v8.20.10#820010)