This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.3
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.3 by this push:
     new 0840c60e54 For HTTP/1 unary mode, use Content-Length instead of chunk 
(#13979)
0840c60e54 is described below

commit 0840c60e54d752ab53bf1517adf178ff260240fb
Author: TomlongTK <longqian...@maoyan.com>
AuthorDate: Sun Apr 7 13:50:02 2024 +0800

    For HTTP/1 unary mode, use Content-Length instead of chunk (#13979)
    
    * For HTTP/1 unary mode, use Content-Length instead of chunk
    
    * Fix format issue
    
    * Http1 unary
    
    * Fix unit test
    
    * refine
    
    * refine
    
    * Fix status
    
    ---------
    
    Co-authored-by: TomlongTK <18779116...@163.com>
    Co-authored-by: Sean Yang <oxs...@gmail.com>
---
 .../http12/AbstractServerHttpChannelObserver.java  | 174 ++++++++++++---------
 .../http12/h1/Http1ServerChannelObserver.java      |   2 -
 .../h1/Http1ServerStreamChannelObserver.java       |  10 ++
 .../http12/h1/Http1ServerUnaryChannelObserver.java |  58 +++++++
 .../DefaultHttp11ServerTransportListener.java      |   3 +-
 5 files changed, 169 insertions(+), 78 deletions(-)

diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
index a4086cc08f..515afb2d50 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/AbstractServerHttpChannelObserver.java
@@ -21,33 +21,27 @@ import 
org.apache.dubbo.remoting.http12.exception.HttpResultPayloadException;
 import org.apache.dubbo.remoting.http12.exception.HttpStatusException;
 import org.apache.dubbo.remoting.http12.message.HttpMessageEncoder;
 
-import java.util.List;
-import java.util.Map;
-
 public abstract class AbstractServerHttpChannelObserver implements 
CustomizableHttpChannelObserver<Object> {
 
+    private final HttpChannel httpChannel;
+
     private HeadersCustomizer headersCustomizer = HeadersCustomizer.NO_OP;
 
     private TrailersCustomizer trailersCustomizer = TrailersCustomizer.NO_OP;
 
     private ErrorResponseCustomizer errorResponseCustomizer = 
ErrorResponseCustomizer.NO_OP;
 
-    private final HttpChannel httpChannel;
+    private HttpMessageEncoder responseEncoder;
 
     private boolean headerSent;
 
-    private HttpMessageEncoder responseEncoder;
-
-    public AbstractServerHttpChannelObserver(HttpChannel httpChannel) {
+    protected AbstractServerHttpChannelObserver(HttpChannel httpChannel) {
         this.httpChannel = httpChannel;
     }
 
-    public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
-        this.responseEncoder = responseEncoder;
-    }
-
-    public HttpMessageEncoder getResponseEncoder() {
-        return responseEncoder;
+    @Override
+    public HttpChannel getHttpChannel() {
+        return httpChannel;
     }
 
     @Override
@@ -65,71 +59,38 @@ public abstract class AbstractServerHttpChannelObserver 
implements CustomizableH
         this.errorResponseCustomizer = errorResponseCustomizer;
     }
 
-    protected HeadersCustomizer getHeadersCustomizer() {
-        return headersCustomizer;
+    public HttpMessageEncoder getResponseEncoder() {
+        return responseEncoder;
     }
 
-    protected TrailersCustomizer getTrailersCustomizer() {
-        return trailersCustomizer;
+    public void setResponseEncoder(HttpMessageEncoder responseEncoder) {
+        this.responseEncoder = responseEncoder;
     }
 
     @Override
-    public void onNext(Object data) {
+    public final void onNext(Object data) {
         try {
-            if (data instanceof HttpResult) {
-                HttpResult<?> result = (HttpResult<?>) data;
-                if (!headerSent) {
-                    doSendHeaders(String.valueOf(result.getStatus()), 
result.getHeaders());
-                }
-                data = result.getBody();
-            } else if (!headerSent) {
-                doSendHeaders(HttpStatus.OK.getStatusString(), null);
-            }
-            HttpOutputMessage outputMessage = encodeHttpOutputMessage(data);
-            preOutputMessage(outputMessage);
-            responseEncoder.encode(outputMessage.getBody(), data);
-            getHttpChannel().writeMessage(outputMessage);
-            postOutputMessage(outputMessage);
+            doOnNext(data);
         } catch (Throwable e) {
             onError(e);
         }
     }
 
-    protected void preOutputMessage(HttpOutputMessage outputMessage) throws 
Throwable {}
-
-    protected void postOutputMessage(HttpOutputMessage outputMessage) throws 
Throwable {}
-
-    protected abstract HttpMetadata encodeHttpMetadata();
-
-    protected HttpOutputMessage encodeHttpOutputMessage(Object data) {
-        return getHttpChannel().newOutputMessage();
-    }
-
-    protected HttpMetadata encodeTrailers(Throwable throwable) {
-        return null;
+    protected void doOnNext(Object data) throws Throwable {
+        if (!headerSent) {
+            sendHeader(buildMetadata(resolveStatusCode(data), data, null));
+        }
+        sendMessage(buildMessage(data));
     }
 
     @Override
-    public void onError(Throwable throwable) {
+    public final void onError(Throwable throwable) {
         if (throwable instanceof HttpResultPayloadException) {
             onNext(((HttpResultPayloadException) throwable).getResult());
             return;
         }
-        int httpStatusCode = HttpStatus.INTERNAL_SERVER_ERROR.getCode();
-        if (throwable instanceof HttpStatusException) {
-            httpStatusCode = ((HttpStatusException) throwable).getStatusCode();
-        }
-        if (!headerSent) {
-            doSendHeaders(String.valueOf(httpStatusCode), null);
-        }
         try {
-            ErrorResponse errorResponse = new ErrorResponse();
-            errorResponse.setStatus(String.valueOf(httpStatusCode));
-            errorResponse.setMessage(throwable.getMessage());
-            errorResponseCustomizer.accept(errorResponse, throwable);
-            HttpOutputMessage httpOutputMessage = 
encodeHttpOutputMessage(errorResponse);
-            responseEncoder.encode(httpOutputMessage.getBody(), errorResponse);
-            getHttpChannel().writeMessage(httpOutputMessage);
+            doOnError(throwable);
         } catch (Throwable ex) {
             throwable = new EncodeException(ex);
         } finally {
@@ -137,35 +98,98 @@ public abstract class AbstractServerHttpChannelObserver 
implements CustomizableH
         }
     }
 
+    protected void doOnError(Throwable throwable) throws Throwable {
+        String statusCode = resolveStatusCode(throwable);
+        Object data = buildErrorResponse(statusCode, throwable);
+        if (!headerSent) {
+            sendHeader(buildMetadata(statusCode, data, null));
+        }
+        sendMessage(buildMessage(data));
+    }
+
     @Override
-    public void onCompleted() {
+    public final void onCompleted() {
         doOnCompleted(null);
     }
 
-    @Override
-    public HttpChannel getHttpChannel() {
-        return httpChannel;
+    protected void doOnCompleted(Throwable throwable) {
+        HttpMetadata httpMetadata = encodeTrailers(throwable);
+        if (httpMetadata == null) {
+            return;
+        }
+        trailersCustomizer.accept(httpMetadata.headers(), throwable);
+        getHttpChannel().writeHeader(httpMetadata);
+    }
+
+    protected HttpMetadata encodeTrailers(Throwable throwable) {
+        return null;
     }
 
-    private void doSendHeaders(String statusCode, Map<String, List<String>> 
additionalHeaders) {
+    protected HttpOutputMessage encodeHttpOutputMessage(Object data) {
+        return getHttpChannel().newOutputMessage();
+    }
+
+    protected abstract HttpMetadata encodeHttpMetadata();
+
+    protected void preOutputMessage(HttpOutputMessage outputMessage) throws 
Throwable {}
+
+    protected void postOutputMessage(HttpOutputMessage outputMessage) throws 
Throwable {}
+
+    protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage 
outputMessage) {}
+
+    protected final String resolveStatusCode(Object data) {
+        return data instanceof HttpResult
+                ? String.valueOf(((HttpResult<?>) data).getStatus())
+                : HttpStatus.OK.getStatusString();
+    }
+
+    protected final String resolveStatusCode(Throwable throwable) {
+        return throwable instanceof HttpStatusException
+                ? String.valueOf(((HttpStatusException) 
throwable).getStatusCode())
+                : HttpStatus.INTERNAL_SERVER_ERROR.getStatusString();
+    }
+
+    protected final ErrorResponse buildErrorResponse(String statusCode, 
Throwable throwable) {
+        ErrorResponse errorResponse = new ErrorResponse();
+        errorResponse.setStatus(statusCode);
+        errorResponse.setMessage(throwable.getMessage());
+        errorResponseCustomizer.accept(errorResponse, throwable);
+        return errorResponse;
+    }
+
+    protected final HttpOutputMessage buildMessage(Object data) throws 
Throwable {
+        if (data instanceof HttpResult) {
+            data = ((HttpResult<?>) data).getBody();
+        }
+        HttpOutputMessage outputMessage = encodeHttpOutputMessage(data);
+        preOutputMessage(outputMessage);
+        responseEncoder.encode(outputMessage.getBody(), data);
+        return outputMessage;
+    }
+
+    protected final void sendMessage(HttpOutputMessage outputMessage) throws 
Throwable {
+        getHttpChannel().writeMessage(outputMessage);
+        postOutputMessage(outputMessage);
+    }
+
+    protected final HttpMetadata buildMetadata(String statusCode, Object data, 
HttpOutputMessage httpOutputMessage) {
         HttpMetadata httpMetadata = encodeHttpMetadata();
         HttpHeaders headers = httpMetadata.headers();
         headers.set(HttpHeaderNames.STATUS.getName(), statusCode);
         headers.set(HttpHeaderNames.CONTENT_TYPE.getName(), 
responseEncoder.contentType());
-        headersCustomizer.accept(headers);
-        if (additionalHeaders != null) {
-            headers.putAll(additionalHeaders);
+        if (data instanceof HttpResult) {
+            HttpResult<?> result = (HttpResult<?>) data;
+            if (result.getHeaders() != null) {
+                headers.putAll(result.getHeaders());
+            }
         }
-        getHttpChannel().writeHeader(httpMetadata);
-        headerSent = true;
+        preMetadata(httpMetadata, httpOutputMessage);
+        headersCustomizer.accept(headers);
+        return httpMetadata;
     }
 
-    protected void doOnCompleted(Throwable throwable) {
-        HttpMetadata httpMetadata = encodeTrailers(throwable);
-        if (httpMetadata == null) {
-            return;
-        }
-        trailersCustomizer.accept(httpMetadata.headers(), throwable);
+    protected final void sendHeader(HttpMetadata httpMetadata) {
         getHttpChannel().writeHeader(httpMetadata);
+        headerSent = true;
     }
 }
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java
index 6d92b86554..e62fbeb804 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerChannelObserver.java
@@ -19,7 +19,6 @@ package org.apache.dubbo.remoting.http12.h1;
 import org.apache.dubbo.remoting.http12.AbstractServerHttpChannelObserver;
 import org.apache.dubbo.remoting.http12.HttpChannel;
 import org.apache.dubbo.remoting.http12.HttpChannelObserver;
-import org.apache.dubbo.remoting.http12.HttpHeaderNames;
 import org.apache.dubbo.remoting.http12.HttpHeaders;
 import org.apache.dubbo.remoting.http12.HttpMetadata;
 import org.apache.dubbo.remoting.http12.HttpOutputMessage;
@@ -34,7 +33,6 @@ public class Http1ServerChannelObserver extends 
AbstractServerHttpChannelObserve
     @Override
     protected HttpMetadata encodeHttpMetadata() {
         HttpHeaders httpHeaders = new HttpHeaders();
-        httpHeaders.set(HttpHeaderNames.TRANSFER_ENCODING.getName(), 
"chunked");
         return new Http1Metadata(httpHeaders);
     }
 
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
index e5ee7aff44..8c3af9d684 100644
--- 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerStreamChannelObserver.java
@@ -17,6 +17,9 @@
 package org.apache.dubbo.remoting.http12.h1;
 
 import org.apache.dubbo.remoting.http12.HttpChannel;
+import org.apache.dubbo.remoting.http12.HttpHeaderNames;
+import org.apache.dubbo.remoting.http12.HttpHeaders;
+import org.apache.dubbo.remoting.http12.HttpMetadata;
 import org.apache.dubbo.remoting.http12.HttpOutputMessage;
 
 import java.io.IOException;
@@ -33,6 +36,13 @@ public class Http1ServerStreamChannelObserver extends 
Http1ServerChannelObserver
         super(httpChannel);
     }
 
+    @Override
+    protected HttpMetadata encodeHttpMetadata() {
+        HttpHeaders httpHeaders = new HttpHeaders();
+        httpHeaders.set(HttpHeaderNames.TRANSFER_ENCODING.getName(), 
"chunked");
+        return new Http1Metadata(httpHeaders);
+    }
+
     @Override
     protected void preOutputMessage(HttpOutputMessage httpMessage) throws 
IOException {
         HttpOutputMessage httpOutputMessage = 
this.getHttpChannel().newOutputMessage();
diff --git 
a/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java
 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java
new file mode 100644
index 0000000000..8cb2a36edd
--- /dev/null
+++ 
b/dubbo-remoting/dubbo-remoting-http12/src/main/java/org/apache/dubbo/remoting/http12/h1/Http1ServerUnaryChannelObserver.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.remoting.http12.h1;
+
+import org.apache.dubbo.remoting.http12.HttpChannel;
+import org.apache.dubbo.remoting.http12.HttpHeaderNames;
+import org.apache.dubbo.remoting.http12.HttpMetadata;
+import org.apache.dubbo.remoting.http12.HttpOutputMessage;
+
+import java.io.OutputStream;
+
+import io.netty.buffer.ByteBufOutputStream;
+
+public class Http1ServerUnaryChannelObserver extends 
Http1ServerChannelObserver {
+
+    public Http1ServerUnaryChannelObserver(HttpChannel httpChannel) {
+        super(httpChannel);
+    }
+
+    @Override
+    protected void doOnNext(Object data) throws Throwable {
+        HttpOutputMessage httpOutputMessage = buildMessage(data);
+        sendHeader(buildMetadata(resolveStatusCode(data), data, 
httpOutputMessage));
+        sendMessage(httpOutputMessage);
+    }
+
+    @Override
+    protected void doOnError(Throwable throwable) throws Throwable {
+        String statusCode = resolveStatusCode(throwable);
+        Object data = buildErrorResponse(statusCode, throwable);
+        HttpOutputMessage httpOutputMessage = buildMessage(data);
+        sendHeader(buildMetadata(statusCode, data, httpOutputMessage));
+        sendMessage(httpOutputMessage);
+    }
+
+    @Override
+    protected void preMetadata(HttpMetadata httpMetadata, HttpOutputMessage 
outputMessage) {
+        OutputStream body = outputMessage.getBody();
+        if (body instanceof ByteBufOutputStream) {
+            int contentLength = ((ByteBufOutputStream) body).writtenBytes();
+            
httpMetadata.headers().set(HttpHeaderNames.CONTENT_LENGTH.getName(), 
String.valueOf(contentLength));
+        }
+    }
+}
diff --git 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
index 11f3012c22..950a2d9350 100644
--- 
a/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
+++ 
b/dubbo-rpc/dubbo-rpc-triple/src/main/java/org/apache/dubbo/rpc/protocol/tri/h12/http1/DefaultHttp11ServerTransportListener.java
@@ -27,6 +27,7 @@ import org.apache.dubbo.remoting.http12.RequestMetadata;
 import org.apache.dubbo.remoting.http12.h1.Http1ServerChannelObserver;
 import org.apache.dubbo.remoting.http12.h1.Http1ServerStreamChannelObserver;
 import org.apache.dubbo.remoting.http12.h1.Http1ServerTransportListener;
+import org.apache.dubbo.remoting.http12.h1.Http1ServerUnaryChannelObserver;
 import org.apache.dubbo.remoting.http12.message.DefaultListeningDecoder;
 import org.apache.dubbo.remoting.http12.message.MediaType;
 import org.apache.dubbo.remoting.http12.message.codec.JsonCodec;
@@ -58,7 +59,7 @@ public class DefaultHttp11ServerTransportListener
         executorSupport = 
ExecutorRepository.getInstance(url.getOrDefaultApplicationModel())
                 .getExecutorSupport(url);
         this.httpChannel = httpChannel;
-        serverChannelObserver = new Http1ServerChannelObserver(httpChannel);
+        serverChannelObserver = new 
Http1ServerUnaryChannelObserver(httpChannel);
         serverChannelObserver.setResponseEncoder(JsonCodec.INSTANCE);
     }
 

Reply via email to