This is an automated email from the ASF dual-hosted git repository. albumenj pushed a commit to branch 3.3 in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.3 by this push: new 0840c60e54 For HTTP/1 unary mode, use Content-Length instead of chunk (#13979) 0840c60e54 is described below commit 0840c60e54d752ab53bf1517adf178ff260240fb Author: TomlongTK <longqian...@maoyan.com> AuthorDate: Sun Apr 7 13:50:02 2024 +0800 For HTTP/1 unary mode, use Content-Length instead of chunk (#13979) * For HTTP/1 unary mode, use Content-Length instead of chunk * Fix format issue * Http1 unary * Fix unit test * refine * refine * Fix status --------- Co-authored-by: TomlongTK <18779116...@163.com> Co-authored-by: Sean Yang <oxs...@gmail.com> --- .../http12/AbstractServerHttpChannelObserver.java | 174 ++++++++++++--------- .../http12/h1/Http1ServerChannelObserver.java | 2 - .../h1/Http1ServerStreamChannelObserver.java | 10 ++ .../http12/h1/Http1ServerUnaryChannelObserver.java | 58 +++++++ .../DefaultHttp11ServerTransportListener.java | 3 +- 5 files changed, 169 insertions(+), 78 deletions(-) diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java index a4086cc08f..515afb2d50 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java @@ -21,33 +21,27 @@ import org.apache.dubbo.remoting.http12.exception.HttpResultPayloadException; import org.apache.dubbo.remoting.http12.exception.HttpStatusException; import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder; -import java.util.List; -import java.util.Map; - public abstract class AbstractServerHttpChannelObserver implements CustomizableHttpChannelObserver<Object> { + private final HttpChannel httpChannel; + private HeadersCustomizer headersCustomizer = HeadersCustomizer.NO_OP; private TrailersCustomizer trailersCustomizer = TrailersCustomizer.NO_OP; private ErrorResponseCustomizer errorResponseCustomizer = ErrorResponseCustomizer.NO_OP; - private final HttpChannel httpChannel; + private HttpMessageEncoder responseEncoder; private boolean headerSent; - private HttpMessageEncoder responseEncoder; - - public AbstractServerHttpChannelObserver(HttpChannel httpChannel) { + protected AbstractServerHttpChannelObserver(HttpChannel httpChannel) { this.httpChannel = httpChannel; } - public void setResponseEncoder(HttpMessageEncoder responseEncoder) { - this.responseEncoder = responseEncoder; - } - - public HttpMessageEncoder getResponseEncoder() { - return responseEncoder; + @Override + public HttpChannel getHttpChannel() { + return httpChannel; } @Override @@ -65,71 +59,38 @@ public abstract class AbstractServerHttpChannelObserver implements CustomizableH this.errorResponseCustomizer = errorResponseCustomizer; } - protected HeadersCustomizer getHeadersCustomizer() { - return headersCustomizer; + public HttpMessageEncoder getResponseEncoder() { + return responseEncoder; } - protected TrailersCustomizer getTrailersCustomizer() { - return trailersCustomizer; + public void setResponseEncoder(HttpMessageEncoder responseEncoder) { + this.responseEncoder = responseEncoder; } @Override - public void onNext(Object data) { + public final void onNext(Object data) { try { - if (data instanceof HttpResult) { - HttpResult<?> result = (HttpResult<?>) data; - if (!headerSent) { - doSendHeaders(String.valueOf(result.getStatus()), result.getHeaders()); - } - data = result.getBody(); - } else if (!headerSent) { - doSendHeaders(HttpStatus.OK.getStatusString(), null); - } - HttpOutputMessage outputMessage = encodeHttpOutputMessage(data); - preOutputMessage(outputMessage); - responseEncoder.encode(outputMessage.getBody(), data); - getHttpChannel().writeMessage(outputMessage); - postOutputMessage(outputMessage); + doOnNext(data); } catch (Throwable e) { onError(e); } } - protected void preOutputMessage(HttpOutputMessage outputMessage) throws Throwable {} - - protected void postOutputMessage(HttpOutputMessage outputMessage) throws Throwable {} - - protected abstract HttpMetadata encodeHttpMetadata(); - - protected HttpOutputMessage encodeHttpOutputMessage(Object data) { - return getHttpChannel().newOutputMessage(); - } - - protected HttpMetadata encodeTrailers(Throwable throwable) { - return null; + protected void doOnNext(Object data) throws Throwable { + if (!headerSent) { + sendHeader(buildMetadata(resolveStatusCode(data), data, null)); + } + sendMessage(buildMessage(data)); } @Override - public void onError(Throwable throwable) { + public final void onError(Throwable throwable) { if (throwable instanceof HttpResultPayloadException) { onNext(((HttpResultPayloadException) throwable).getResult()); return; } - int httpStatusCode = HttpStatus.INTERNAL_SERVER_ERROR.getCode(); - if (throwable instanceof HttpStatusException) { - httpStatusCode = ((HttpStatusException) throwable).getStatusCode(); - } - if (!headerSent) { - doSendHeaders(String.valueOf(httpStatusCode), null); - } try { - ErrorResponse errorResponse = new ErrorResponse(); - errorResponse.setStatus(String.valueOf(httpStatusCode)); - errorResponse.setMessage(throwable.getMessage()); - errorResponseCustomizer.accept(errorResponse, throwable); - HttpOutputMessage httpOutputMessage = encodeHttpOutputMessage(errorResponse); - responseEncoder.encode(httpOutputMessage.getBody(), errorResponse); - getHttpChannel().writeMessage(httpOutputMessage); + doOnError(throwable); } catch (Throwable ex) { throwable = new EncodeException(ex); } finally { @@ -137,35 +98,98 @@ public abstract class AbstractServerHttpChannelObserver implements CustomizableH } } + protected void doOnError(Throwable throwable) throws Throwable { + String statusCode = resolveStatusCode(throwable); + Object data = buildErrorResponse(statusCode, throwable); + if (!headerSent) { + sendHeader(buildMetadata(statusCode, data, null)); + } + sendMessage(buildMessage(data)); + } + @Override - public void onCompleted() { + public final void onCompleted() { doOnCompleted(null); } - @Override - public HttpChannel getHttpChannel() { - return httpChannel; + protected void doOnCompleted(Throwable throwable) { + HttpMetadata httpMetadata = encodeTrailers(throwable); + if (httpMetadata == null) { + return; + } + trailersCustomizer.accept(httpMetadata.headers(), throwable); + getHttpChannel().writeHeader(httpMetadata); + } + + protected HttpMetadata encodeTrailers(Throwable throwable) { + return null; } - private void doSendHeaders(String statusCode, Map<String, List<String>> additionalHeaders) { + protected HttpOutputMessage encodeHttpOutputMessage(Object data) { + return getHttpChannel().newOutputMessage(); + } + + protected abstract HttpMetadata encodeHttpMetadata(); + + protected void preOutputMessage(HttpOutputMessage outputMessage) throws Throwable {} + + protected void postOutputMessage(HttpOutputMessage outputMessage) throws Throwable {} + + protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMessage) {} + + protected final String resolveStatusCode(Object data) { + return data instanceof HttpResult + ? String.valueOf(((HttpResult<?>) data).getStatus()) + : HttpStatus.OK.getStatusString(); + } + + protected final String resolveStatusCode(Throwable throwable) { + return throwable instanceof HttpStatusException + ? String.valueOf(((HttpStatusException) throwable).getStatusCode()) + : HttpStatus.INTERNAL_SERVER_ERROR.getStatusString(); + } + + protected final ErrorResponse buildErrorResponse(String statusCode, Throwable throwable) { + ErrorResponse errorResponse = new ErrorResponse(); + errorResponse.setStatus(statusCode); + errorResponse.setMessage(throwable.getMessage()); + errorResponseCustomizer.accept(errorResponse, throwable); + return errorResponse; + } + + protected final HttpOutputMessage buildMessage(Object data) throws Throwable { + if (data instanceof HttpResult) { + data = ((HttpResult<?>) data).getBody(); + } + HttpOutputMessage outputMessage = encodeHttpOutputMessage(data); + preOutputMessage(outputMessage); + responseEncoder.encode(outputMessage.getBody(), data); + return outputMessage; + } + + protected final void sendMessage(HttpOutputMessage outputMessage) throws Throwable { + getHttpChannel().writeMessage(outputMessage); + postOutputMessage(outputMessage); + } + + protected final HttpMetadata buildMetadata(String statusCode, Object data, HttpOutputMessage httpOutputMessage) { HttpMetadata httpMetadata = encodeHttpMetadata(); HttpHeaders headers = httpMetadata.headers(); headers.set(HttpHeaderNames.STATUS.getName(), statusCode); headers.set(HttpHeaderNames.CONTENT_TYPE.getName(), responseEncoder.contentType()); - headersCustomizer.accept(headers); - if (additionalHeaders != null) { - headers.putAll(additionalHeaders); + if (data instanceof HttpResult) { + HttpResult<?> result = (HttpResult<?>) data; + if (result.getHeaders() != null) { + headers.putAll(result.getHeaders()); + } } - getHttpChannel().writeHeader(httpMetadata); - headerSent = true; + preMetadata(httpMetadata, httpOutputMessage); + headersCustomizer.accept(headers); + return httpMetadata; } - protected void doOnCompleted(Throwable throwable) { - HttpMetadata httpMetadata = encodeTrailers(throwable); - if (httpMetadata == null) { - return; - } - trailersCustomizer.accept(httpMetadata.headers(), throwable); + protected final void sendHeader(HttpMetadata httpMetadata) { getHttpChannel().writeHeader(httpMetadata); + headerSent = true; } } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java index 6d92b86554..e62fbeb804 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java @@ -19,7 +19,6 @@ package org.apache.dubbo.remoting.http12.h1; import org.apache.dubbo.remoting.http12.AbstractServerHttpChannelObserver; import org.apache.dubbo.remoting.http12.HttpChannel; import org.apache.dubbo.remoting.http12.HttpChannelObserver; -import org.apache.dubbo.remoting.http12.HttpHeaderNames; import org.apache.dubbo.remoting.http12.HttpHeaders; import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; @@ -34,7 +33,6 @@ public class Http1ServerChannelObserver extends AbstractServerHttpChannelObserve @Override protected HttpMetadata encodeHttpMetadata() { HttpHeaders httpHeaders = new HttpHeaders(); - httpHeaders.set(HttpHeaderNames.TRANSFER_ENCODING.getName(), "chunked"); return new Http1Metadata(httpHeaders); } diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java index e5ee7aff44..8c3af9d684 100644 --- a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java @@ -17,6 +17,9 @@ package org.apache.dubbo.remoting.http12.h1; import org.apache.dubbo.remoting.http12.HttpChannel; +import org.apache.dubbo.remoting.http12.HttpHeaderNames; +import org.apache.dubbo.remoting.http12.HttpHeaders; +import org.apache.dubbo.remoting.http12.HttpMetadata; import org.apache.dubbo.remoting.http12.HttpOutputMessage; import java.io.IOException; @@ -33,6 +36,13 @@ public class Http1ServerStreamChannelObserver extends Http1ServerChannelObserver super(httpChannel); } + @Override + protected HttpMetadata encodeHttpMetadata() { + HttpHeaders httpHeaders = new HttpHeaders(); + httpHeaders.set(HttpHeaderNames.TRANSFER_ENCODING.getName(), "chunked"); + return new Http1Metadata(httpHeaders); + } + @Override protected void preOutputMessage(HttpOutputMessage httpMessage) throws IOException { HttpOutputMessage httpOutputMessage = this.getHttpChannel().newOutputMessage(); diff --git a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java new file mode 100644 index 0000000000..8cb2a36edd --- /dev/null +++ b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.dubbo.remoting.http12.h1; + +import org.apache.dubbo.remoting.http12.HttpChannel; +import org.apache.dubbo.remoting.http12.HttpHeaderNames; +import org.apache.dubbo.remoting.http12.HttpMetadata; +import org.apache.dubbo.remoting.http12.HttpOutputMessage; + +import java.io.OutputStream; + +import io.netty.buffer.ByteBufOutputStream; + +public class Http1ServerUnaryChannelObserver extends Http1ServerChannelObserver { + + public Http1ServerUnaryChannelObserver(HttpChannel httpChannel) { + super(httpChannel); + } + + @Override + protected void doOnNext(Object data) throws Throwable { + HttpOutputMessage httpOutputMessage = buildMessage(data); + sendHeader(buildMetadata(resolveStatusCode(data), data, httpOutputMessage)); + sendMessage(httpOutputMessage); + } + + @Override + protected void doOnError(Throwable throwable) throws Throwable { + String statusCode = resolveStatusCode(throwable); + Object data = buildErrorResponse(statusCode, throwable); + HttpOutputMessage httpOutputMessage = buildMessage(data); + sendHeader(buildMetadata(statusCode, data, httpOutputMessage)); + sendMessage(httpOutputMessage); + } + + @Override + protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage outputMessage) { + OutputStream body = outputMessage.getBody(); + if (body instanceof ByteBufOutputStream) { + int contentLength = ((ByteBufOutputStream) body).writtenBytes(); + httpMetadata.headers().set(HttpHeaderNames.CONTENT_LENGTH.getName(), String.valueOf(contentLength)); + } + } +} diff --git a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java index 11f3012c22..950a2d9350 100644 --- a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java +++ b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java @@ -27,6 +27,7 @@ import org.apache.dubbo.remoting.http12.RequestMetadata; import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver; import org.apache.dubbo.remoting.http12.h1.Http1ServerStreamChannelObserver; import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener; +import org.apache.dubbo.remoting.http12.h1.Http1ServerUnaryChannelObserver; import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder; import org.apache.dubbo.remoting.http12.message.MediaType; import org.apache.dubbo.remoting.http12.message.codec.JsonCodec; @@ -58,7 +59,7 @@ public class DefaultHttp11ServerTransportListener executorSupport = ExecutorRepository.getInstance(url.getOrDefaultApplicationModel()) .getExecutorSupport(url); this.httpChannel = httpChannel; - serverChannelObserver = new Http1ServerChannelObserver(httpChannel); + serverChannelObserver = new Http1ServerUnaryChannelObserver(httpChannel); serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE); }