This is an automated email from the ASF dual-hosted git repository.

xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git


The following commit(s) were added to refs/heads/master by this push:
     new 29cb757  [type: bug] fix 2225, optimize client response builder. 
(#2302)
29cb757 is described below

commit 29cb757d23306e2f7dda6f71f45ac2e76545e709
Author: Qicz <[email protected]>
AuthorDate: Fri Nov 5 10:36:56 2021 +0800

    [type: bug] fix 2225, optimize client response builder. (#2302)
    
    * [type: bug] fix 2225, optimize client response builder.
    
    * optimize client response utils.
---
 .../plugin/base/utils/ClientResponseUtils.java     | 69 ++++++++++++++++++++++
 .../cryptor/decorator/ResponseDecorator.java       | 48 +++++++++++++--
 .../cryptor/response/CryptorResponsePlugin.java    | 45 +-------------
 .../modify/response/ModifyResponsePlugin.java      | 22 ++-----
 4 files changed, 120 insertions(+), 64 deletions(-)

diff --git 
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/utils/ClientResponseUtils.java
 
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/utils/ClientResponseUtils.java
new file mode 100644
index 0000000..2e7b482
--- /dev/null
+++ 
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/utils/ClientResponseUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.base.utils;
+
+import org.apache.shenyu.plugin.base.support.CachedBodyOutputMessage;
+import org.reactivestreams.Publisher;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.codec.ServerCodecConfigurer;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Objects;
+
+/**
+ * ClientResponseUtils.
+ */
+public final class ClientResponseUtils {
+
+    /**
+     * build client response with current response data.
+     *
+     * @param response current response
+     * @param body current response body
+     * @return the client respone
+     */
+    public static ClientResponse buildClientResponse(final ServerHttpResponse 
response,
+                                                     final Publisher<? extends 
DataBuffer> body) {
+        ClientResponse.Builder builder = 
ClientResponse.create(Objects.requireNonNull(response.getStatusCode()),
+                ServerCodecConfigurer.create().getReaders());
+        return builder.headers(headers -> 
headers.putAll(response.getHeaders())).body(Flux.from(body)).build();
+    }
+
+    /**
+     * fix the body message.
+     *
+     * @param response current response
+     * @param outputMessage cache message
+     * @return fixed body message
+     */
+    public static Mono<DataBuffer> fixBodyMessage(final ServerHttpResponse 
response,
+                                                  final 
CachedBodyOutputMessage outputMessage) {
+        Mono<DataBuffer> messageBody = 
DataBufferUtils.join(outputMessage.getBody());
+        HttpHeaders headers = response.getHeaders();
+        if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
+                || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
+            messageBody = messageBody.doOnNext(data -> 
headers.setContentLength(data.readableByteCount()));
+        }
+        return messageBody;
+    }
+}
diff --git 
a/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/decorator/ResponseDecorator.java
 
b/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/decorator/ResponseDecorator.java
index 2a80bf8..aa2b1d9 100644
--- 
a/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/decorator/ResponseDecorator.java
+++ 
b/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/decorator/ResponseDecorator.java
@@ -17,30 +17,70 @@
 
 package org.apache.shenyu.plugin.cryptor.decorator;
 
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.plugin.base.support.BodyInserterContext;
 import org.apache.shenyu.plugin.base.support.CachedBodyOutputMessage;
+import org.apache.shenyu.plugin.base.utils.ClientResponseUtils;
+import org.apache.shenyu.plugin.cryptor.dto.CryptorRuleHandle;
+import org.apache.shenyu.plugin.cryptor.strategy.CryptorStrategyFactory;
+import org.apache.shenyu.plugin.cryptor.utils.HttpUtil;
+import org.apache.shenyu.plugin.cryptor.utils.JsonUtil;
 import org.reactivestreams.Publisher;
 import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.ReactiveHttpOutputMessage;
 import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
+import org.springframework.web.reactive.function.BodyInserter;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.ClientResponse;
 import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Mono;
 import reactor.util.annotation.NonNull;
 
+import java.util.function.Function;
+
 /**
  * Build and modify the response class.
  */
 public class ResponseDecorator extends ServerHttpResponseDecorator {
 
-    private final CachedBodyOutputMessage cachedBodyOutputMessage;
+    private final ServerWebExchange exchange;
+
+    private final CryptorRuleHandle ruleHandle;
 
     public ResponseDecorator(final ServerWebExchange exchange,
-                             final CachedBodyOutputMessage 
cachedBodyOutputMessage) {
+                             final CryptorRuleHandle ruleHandle) {
         super(exchange.getResponse());
-        this.cachedBodyOutputMessage = cachedBodyOutputMessage;
+        this.exchange = exchange;
+        this.ruleHandle = ruleHandle;
     }
 
     @Override
     @NonNull
+    @SuppressWarnings("unchecked")
     public Mono<Void> writeWith(@NonNull final Publisher<? extends DataBuffer> 
body) {
-        return getDelegate().writeWith(cachedBodyOutputMessage.getBody());
+        ClientResponse clientResponse = 
ClientResponseUtils.buildClientResponse(this.getDelegate(), body);
+        Mono<String> mono = 
clientResponse.bodyToMono(String.class).flatMap(originalBody ->
+                        strategyMatch(originalBody, this.ruleHandle, 
this.exchange));
+        BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter = 
BodyInserters.fromPublisher(mono, String.class);
+        CachedBodyOutputMessage outputMessage = 
HttpUtil.newCachedBodyOutputMessage(exchange);
+        return bodyInserter.insert(outputMessage, new BodyInserterContext())
+                .then(Mono.defer(() -> {
+                    Mono<DataBuffer> messageBody = 
ClientResponseUtils.fixBodyMessage(this.getDelegate(), outputMessage);
+                    
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, clientResponse);
+                    return getDelegate().writeWith(messageBody);
+                })).onErrorResume((Function<Throwable, Mono<Void>>) throwable 
-> HttpUtil.release(outputMessage, throwable));
+    }
+
+    @SuppressWarnings("rawtypes")
+    private Mono strategyMatch(final String originalBody, final 
CryptorRuleHandle ruleHandle, final ServerWebExchange exchange) {
+        String parseBody = JsonUtil.parser(originalBody, 
ruleHandle.getFieldNames());
+        if (parseBody == null) {
+            return Mono.just(originalBody);
+        }
+        String modifiedBody = CryptorStrategyFactory.match(ruleHandle, 
parseBody);
+        if (modifiedBody == null) {
+            return HttpUtil.fail(ruleHandle.getWay(), exchange);
+        }
+        return HttpUtil.success(originalBody, modifiedBody, 
ruleHandle.getWay(), ruleHandle.getFieldNames());
     }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/response/CryptorResponsePlugin.java
 
b/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/response/CryptorResponsePlugin.java
index f88217e..88ce0e4 100644
--- 
a/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/response/CryptorResponsePlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/response/CryptorResponsePlugin.java
@@ -17,36 +17,25 @@
 
 package org.apache.shenyu.plugin.cryptor.response;
 
-import org.apache.shenyu.common.constant.Constants;
 import org.apache.shenyu.common.dto.RuleData;
 import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.plugin.cryptor.dto.CryptorRuleHandle;
 import org.apache.shenyu.common.enums.PluginEnum;
 import org.apache.shenyu.plugin.api.ShenyuPluginChain;
 import org.apache.shenyu.plugin.api.result.ShenyuResultEnum;
 import org.apache.shenyu.plugin.api.result.ShenyuResultWrap;
 import org.apache.shenyu.plugin.api.utils.WebFluxResultUtils;
 import org.apache.shenyu.plugin.base.AbstractShenyuPlugin;
-import org.apache.shenyu.plugin.base.support.BodyInserterContext;
-import org.apache.shenyu.plugin.base.support.CachedBodyOutputMessage;
 import org.apache.shenyu.plugin.base.utils.CacheKeyUtils;
 import org.apache.shenyu.plugin.cryptor.decorator.ResponseDecorator;
+import org.apache.shenyu.plugin.cryptor.dto.CryptorRuleHandle;
 import 
org.apache.shenyu.plugin.cryptor.handler.CryptorResponsePluginDataHandler;
-import org.apache.shenyu.plugin.cryptor.strategy.CryptorStrategyFactory;
-import org.apache.shenyu.plugin.cryptor.utils.HttpUtil;
 import org.apache.shenyu.plugin.cryptor.utils.JsonUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import org.springframework.http.ReactiveHttpOutputMessage;
-import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
-import org.springframework.web.reactive.function.BodyInserter;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.ClientResponse;
 import org.springframework.web.server.ServerWebExchange;
 import reactor.core.publisher.Mono;
 
 import java.util.Objects;
-import java.util.function.Function;
 
 /**
  * Cryptor response plugin.
@@ -56,7 +45,6 @@ public class CryptorResponsePlugin extends 
AbstractShenyuPlugin {
     private static final Logger LOG = 
LoggerFactory.getLogger(CryptorResponsePlugin.class);
 
     @Override
-    @SuppressWarnings("unchecked")
     protected Mono<Void> doExecute(final ServerWebExchange exchange, final 
ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
         CryptorRuleHandle ruleHandle = 
CryptorResponsePluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));
         if (Objects.isNull(ruleHandle)) {
@@ -68,22 +56,8 @@ public class CryptorResponsePlugin extends 
AbstractShenyuPlugin {
                     
ShenyuResultEnum.CRYPTOR_RESPONSE_ERROR_CONFIGURATION.getMsg() + "[" + 
JsonUtil.getErrorCollector() + "]", null);
             return WebFluxResultUtils.result(exchange, error);
         }
-
-        CachedBodyOutputMessage outputMessage = 
HttpUtil.newCachedBodyOutputMessage(exchange);
-        ClientResponse clientResponse = 
exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
-        if (clientResponse == null) {
-            return Mono.empty();
-        }
-        Mono<String> mono = clientResponse.bodyToMono(String.class)
-                .flatMap(originalBody ->
-                        strategyMatch(originalBody, ruleHandle, exchange));
-        BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter = 
BodyInserters.fromPublisher(mono, String.class);
-        return bodyInserter.insert(outputMessage, new BodyInserterContext())
-                .then(Mono.defer(() -> {
-                    ServerHttpResponseDecorator decorator = new 
ResponseDecorator(exchange, outputMessage);
-                    return 
chain.execute(exchange.mutate().response(decorator).build());
-                })).onErrorResume((Function<Throwable, Mono<Void>>) throwable 
-> HttpUtil.release(outputMessage, throwable));
-
+        return chain.execute(exchange.mutate()
+                .response(new ResponseDecorator(exchange, 
ruleHandle)).build());
     }
 
     @Override
@@ -95,17 +69,4 @@ public class CryptorResponsePlugin extends 
AbstractShenyuPlugin {
     public String named() {
         return PluginEnum.CRYPTOR_RESPONSE.getName();
     }
-    
-    @SuppressWarnings("rawtypes")
-    private Mono strategyMatch(final String originalBody, final 
CryptorRuleHandle ruleHandle, final ServerWebExchange exchange) {
-        String parseBody = JsonUtil.parser(originalBody, 
ruleHandle.getFieldNames());
-        if (parseBody == null) {
-            return Mono.just(originalBody);
-        }
-        String modifiedBody = CryptorStrategyFactory.match(ruleHandle, 
parseBody);
-        if (modifiedBody == null) {
-            return HttpUtil.fail(ruleHandle.getWay(), exchange);
-        }
-        return HttpUtil.success(originalBody, modifiedBody, 
ruleHandle.getWay(), ruleHandle.getFieldNames());
-    }
 }
diff --git 
a/shenyu-plugin/shenyu-plugin-modify-response/src/main/java/org/apache/shenyu/plugin/modify/response/ModifyResponsePlugin.java
 
b/shenyu-plugin/shenyu-plugin-modify-response/src/main/java/org/apache/shenyu/plugin/modify/response/ModifyResponsePlugin.java
index bba4aa1..78ae208 100644
--- 
a/shenyu-plugin/shenyu-plugin-modify-response/src/main/java/org/apache/shenyu/plugin/modify/response/ModifyResponsePlugin.java
+++ 
b/shenyu-plugin/shenyu-plugin-modify-response/src/main/java/org/apache/shenyu/plugin/modify/response/ModifyResponsePlugin.java
@@ -32,10 +32,10 @@ import org.apache.shenyu.plugin.base.AbstractShenyuPlugin;
 import org.apache.shenyu.plugin.base.support.BodyInserterContext;
 import org.apache.shenyu.plugin.base.support.CachedBodyOutputMessage;
 import org.apache.shenyu.plugin.base.utils.CacheKeyUtils;
+import org.apache.shenyu.plugin.base.utils.ClientResponseUtils;
 import 
org.apache.shenyu.plugin.modify.response.handler.ModifyResponsePluginDataHandler;
 import org.reactivestreams.Publisher;
 import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.DataBufferUtils;
 import org.springframework.http.HttpHeaders;
 import org.springframework.http.HttpStatus;
 import org.springframework.http.ReactiveHttpOutputMessage;
@@ -104,7 +104,6 @@ public class ModifyResponsePlugin extends 
AbstractShenyuPlugin {
                     .build();
             exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, 
modifyResponse);
         }
-
         return chain.execute(exchange.mutate()
                 .response(new ModifyServerHttpResponse(exchange, 
modifyResponseRuleHandle)).build());
     }
@@ -134,10 +133,7 @@ public class ModifyResponsePlugin extends 
AbstractShenyuPlugin {
         @Override
         @NonNull
         public Mono<Void> writeWith(@NonNull final Publisher<? extends 
DataBuffer> body) {
-            ClientResponse clientResponse = 
exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
-            if (Objects.isNull(clientResponse)) {
-                clientResponse = prepareClientResponse(body, 
this.getHeaders());
-            }
+            ClientResponse clientResponse = 
ClientResponseUtils.buildClientResponse(this.getDelegate(), body);
             Mono<byte[]> modifiedBody = clientResponse.bodyToMono(byte[].class)
                     .flatMap(originalBody -> 
Mono.just(updateResponse(originalBody)));
 
@@ -146,12 +142,8 @@ public class ModifyResponsePlugin extends 
AbstractShenyuPlugin {
             CachedBodyOutputMessage outputMessage = new 
CachedBodyOutputMessage(exchange,
                     exchange.getResponse().getHeaders());
             return bodyInserter.insert(outputMessage, new 
BodyInserterContext()).then(Mono.defer(() -> {
-                Mono<DataBuffer> messageBody = 
DataBufferUtils.join(outputMessage.getBody());
-                HttpHeaders headers = getDelegate().getHeaders();
-                if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
-                        || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
-                    messageBody = messageBody.doOnNext(data -> 
headers.setContentLength(data.readableByteCount()));
-                }
+                Mono<DataBuffer> messageBody = 
ClientResponseUtils.fixBodyMessage(this.getDelegate(), outputMessage);
+                exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, 
clientResponse);
                 return getDelegate().writeWith(messageBody);
             }));
 
@@ -183,11 +175,5 @@ public class ModifyResponsePlugin extends 
AbstractShenyuPlugin {
             }
             return context.jsonString();
         }
-
-        private ClientResponse prepareClientResponse(final Publisher<? extends 
DataBuffer> body, final HttpHeaders httpHeaders) {
-            ClientResponse.Builder builder;
-            builder = 
ClientResponse.create(this.getDelegate().getStatusCode(), 
ServerCodecConfigurer.create().getReaders());
-            return builder.headers(headers -> 
headers.putAll(httpHeaders)).body(Flux.from(body)).build();
-        }
     }
 }

Reply via email to