This is an automated email from the ASF dual-hosted git repository.
xiaoyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-shenyu.git
The following commit(s) were added to refs/heads/master by this push:
new 29cb757 [type: bug] fix 2225, optimize client response builder.
(#2302)
29cb757 is described below
commit 29cb757d23306e2f7dda6f71f45ac2e76545e709
Author: Qicz <[email protected]>
AuthorDate: Fri Nov 5 10:36:56 2021 +0800
[type: bug] fix 2225, optimize client response builder. (#2302)
* [type: bug] fix 2225, optimize client response builder.
* optimize client response utils.
---
.../plugin/base/utils/ClientResponseUtils.java | 69 ++++++++++++++++++++++
.../cryptor/decorator/ResponseDecorator.java | 48 +++++++++++++--
.../cryptor/response/CryptorResponsePlugin.java | 45 +-------------
.../modify/response/ModifyResponsePlugin.java | 22 ++-----
4 files changed, 120 insertions(+), 64 deletions(-)
diff --git
a/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/utils/ClientResponseUtils.java
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/utils/ClientResponseUtils.java
new file mode 100644
index 0000000..2e7b482
--- /dev/null
+++
b/shenyu-plugin/shenyu-plugin-base/src/main/java/org/apache/shenyu/plugin/base/utils/ClientResponseUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.shenyu.plugin.base.utils;
+
+import org.apache.shenyu.plugin.base.support.CachedBodyOutputMessage;
+import org.reactivestreams.Publisher;
+import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.core.io.buffer.DataBufferUtils;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.codec.ServerCodecConfigurer;
+import org.springframework.http.server.reactive.ServerHttpResponse;
+import org.springframework.web.reactive.function.client.ClientResponse;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.Objects;
+
+/**
+ * ClientResponseUtils.
+ */
+public final class ClientResponseUtils {
+
+ /**
+ * build client response with current response data.
+ *
+ * @param response current response
+ * @param body current response body
+ * @return the client respone
+ */
+ public static ClientResponse buildClientResponse(final ServerHttpResponse
response,
+ final Publisher<? extends
DataBuffer> body) {
+ ClientResponse.Builder builder =
ClientResponse.create(Objects.requireNonNull(response.getStatusCode()),
+ ServerCodecConfigurer.create().getReaders());
+ return builder.headers(headers ->
headers.putAll(response.getHeaders())).body(Flux.from(body)).build();
+ }
+
+ /**
+ * fix the body message.
+ *
+ * @param response current response
+ * @param outputMessage cache message
+ * @return fixed body message
+ */
+ public static Mono<DataBuffer> fixBodyMessage(final ServerHttpResponse
response,
+ final
CachedBodyOutputMessage outputMessage) {
+ Mono<DataBuffer> messageBody =
DataBufferUtils.join(outputMessage.getBody());
+ HttpHeaders headers = response.getHeaders();
+ if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
+ || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
+ messageBody = messageBody.doOnNext(data ->
headers.setContentLength(data.readableByteCount()));
+ }
+ return messageBody;
+ }
+}
diff --git
a/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/decorator/ResponseDecorator.java
b/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/decorator/ResponseDecorator.java
index 2a80bf8..aa2b1d9 100644
---
a/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/decorator/ResponseDecorator.java
+++
b/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/decorator/ResponseDecorator.java
@@ -17,30 +17,70 @@
package org.apache.shenyu.plugin.cryptor.decorator;
+import org.apache.shenyu.common.constant.Constants;
+import org.apache.shenyu.plugin.base.support.BodyInserterContext;
import org.apache.shenyu.plugin.base.support.CachedBodyOutputMessage;
+import org.apache.shenyu.plugin.base.utils.ClientResponseUtils;
+import org.apache.shenyu.plugin.cryptor.dto.CryptorRuleHandle;
+import org.apache.shenyu.plugin.cryptor.strategy.CryptorStrategyFactory;
+import org.apache.shenyu.plugin.cryptor.utils.HttpUtil;
+import org.apache.shenyu.plugin.cryptor.utils.JsonUtil;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
+import org.springframework.http.ReactiveHttpOutputMessage;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
+import org.springframework.web.reactive.function.BodyInserter;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import reactor.util.annotation.NonNull;
+import java.util.function.Function;
+
/**
* Build and modify the response class.
*/
public class ResponseDecorator extends ServerHttpResponseDecorator {
- private final CachedBodyOutputMessage cachedBodyOutputMessage;
+ private final ServerWebExchange exchange;
+
+ private final CryptorRuleHandle ruleHandle;
public ResponseDecorator(final ServerWebExchange exchange,
- final CachedBodyOutputMessage
cachedBodyOutputMessage) {
+ final CryptorRuleHandle ruleHandle) {
super(exchange.getResponse());
- this.cachedBodyOutputMessage = cachedBodyOutputMessage;
+ this.exchange = exchange;
+ this.ruleHandle = ruleHandle;
}
@Override
@NonNull
+ @SuppressWarnings("unchecked")
public Mono<Void> writeWith(@NonNull final Publisher<? extends DataBuffer>
body) {
- return getDelegate().writeWith(cachedBodyOutputMessage.getBody());
+ ClientResponse clientResponse =
ClientResponseUtils.buildClientResponse(this.getDelegate(), body);
+ Mono<String> mono =
clientResponse.bodyToMono(String.class).flatMap(originalBody ->
+ strategyMatch(originalBody, this.ruleHandle,
this.exchange));
+ BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter =
BodyInserters.fromPublisher(mono, String.class);
+ CachedBodyOutputMessage outputMessage =
HttpUtil.newCachedBodyOutputMessage(exchange);
+ return bodyInserter.insert(outputMessage, new BodyInserterContext())
+ .then(Mono.defer(() -> {
+ Mono<DataBuffer> messageBody =
ClientResponseUtils.fixBodyMessage(this.getDelegate(), outputMessage);
+
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR, clientResponse);
+ return getDelegate().writeWith(messageBody);
+ })).onErrorResume((Function<Throwable, Mono<Void>>) throwable
-> HttpUtil.release(outputMessage, throwable));
+ }
+
+ @SuppressWarnings("rawtypes")
+ private Mono strategyMatch(final String originalBody, final
CryptorRuleHandle ruleHandle, final ServerWebExchange exchange) {
+ String parseBody = JsonUtil.parser(originalBody,
ruleHandle.getFieldNames());
+ if (parseBody == null) {
+ return Mono.just(originalBody);
+ }
+ String modifiedBody = CryptorStrategyFactory.match(ruleHandle,
parseBody);
+ if (modifiedBody == null) {
+ return HttpUtil.fail(ruleHandle.getWay(), exchange);
+ }
+ return HttpUtil.success(originalBody, modifiedBody,
ruleHandle.getWay(), ruleHandle.getFieldNames());
}
}
diff --git
a/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/response/CryptorResponsePlugin.java
b/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/response/CryptorResponsePlugin.java
index f88217e..88ce0e4 100644
---
a/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/response/CryptorResponsePlugin.java
+++
b/shenyu-plugin/shenyu-plugin-cryptor/src/main/java/org/apache/shenyu/plugin/cryptor/response/CryptorResponsePlugin.java
@@ -17,36 +17,25 @@
package org.apache.shenyu.plugin.cryptor.response;
-import org.apache.shenyu.common.constant.Constants;
import org.apache.shenyu.common.dto.RuleData;
import org.apache.shenyu.common.dto.SelectorData;
-import org.apache.shenyu.plugin.cryptor.dto.CryptorRuleHandle;
import org.apache.shenyu.common.enums.PluginEnum;
import org.apache.shenyu.plugin.api.ShenyuPluginChain;
import org.apache.shenyu.plugin.api.result.ShenyuResultEnum;
import org.apache.shenyu.plugin.api.result.ShenyuResultWrap;
import org.apache.shenyu.plugin.api.utils.WebFluxResultUtils;
import org.apache.shenyu.plugin.base.AbstractShenyuPlugin;
-import org.apache.shenyu.plugin.base.support.BodyInserterContext;
-import org.apache.shenyu.plugin.base.support.CachedBodyOutputMessage;
import org.apache.shenyu.plugin.base.utils.CacheKeyUtils;
import org.apache.shenyu.plugin.cryptor.decorator.ResponseDecorator;
+import org.apache.shenyu.plugin.cryptor.dto.CryptorRuleHandle;
import
org.apache.shenyu.plugin.cryptor.handler.CryptorResponsePluginDataHandler;
-import org.apache.shenyu.plugin.cryptor.strategy.CryptorStrategyFactory;
-import org.apache.shenyu.plugin.cryptor.utils.HttpUtil;
import org.apache.shenyu.plugin.cryptor.utils.JsonUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.springframework.http.ReactiveHttpOutputMessage;
-import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
-import org.springframework.web.reactive.function.BodyInserter;
-import org.springframework.web.reactive.function.BodyInserters;
-import org.springframework.web.reactive.function.client.ClientResponse;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.Objects;
-import java.util.function.Function;
/**
* Cryptor response plugin.
@@ -56,7 +45,6 @@ public class CryptorResponsePlugin extends
AbstractShenyuPlugin {
private static final Logger LOG =
LoggerFactory.getLogger(CryptorResponsePlugin.class);
@Override
- @SuppressWarnings("unchecked")
protected Mono<Void> doExecute(final ServerWebExchange exchange, final
ShenyuPluginChain chain, final SelectorData selector, final RuleData rule) {
CryptorRuleHandle ruleHandle =
CryptorResponsePluginDataHandler.CACHED_HANDLE.get().obtainHandle(CacheKeyUtils.INST.getKey(rule));
if (Objects.isNull(ruleHandle)) {
@@ -68,22 +56,8 @@ public class CryptorResponsePlugin extends
AbstractShenyuPlugin {
ShenyuResultEnum.CRYPTOR_RESPONSE_ERROR_CONFIGURATION.getMsg() + "[" +
JsonUtil.getErrorCollector() + "]", null);
return WebFluxResultUtils.result(exchange, error);
}
-
- CachedBodyOutputMessage outputMessage =
HttpUtil.newCachedBodyOutputMessage(exchange);
- ClientResponse clientResponse =
exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
- if (clientResponse == null) {
- return Mono.empty();
- }
- Mono<String> mono = clientResponse.bodyToMono(String.class)
- .flatMap(originalBody ->
- strategyMatch(originalBody, ruleHandle, exchange));
- BodyInserter<Mono<String>, ReactiveHttpOutputMessage> bodyInserter =
BodyInserters.fromPublisher(mono, String.class);
- return bodyInserter.insert(outputMessage, new BodyInserterContext())
- .then(Mono.defer(() -> {
- ServerHttpResponseDecorator decorator = new
ResponseDecorator(exchange, outputMessage);
- return
chain.execute(exchange.mutate().response(decorator).build());
- })).onErrorResume((Function<Throwable, Mono<Void>>) throwable
-> HttpUtil.release(outputMessage, throwable));
-
+ return chain.execute(exchange.mutate()
+ .response(new ResponseDecorator(exchange,
ruleHandle)).build());
}
@Override
@@ -95,17 +69,4 @@ public class CryptorResponsePlugin extends
AbstractShenyuPlugin {
public String named() {
return PluginEnum.CRYPTOR_RESPONSE.getName();
}
-
- @SuppressWarnings("rawtypes")
- private Mono strategyMatch(final String originalBody, final
CryptorRuleHandle ruleHandle, final ServerWebExchange exchange) {
- String parseBody = JsonUtil.parser(originalBody,
ruleHandle.getFieldNames());
- if (parseBody == null) {
- return Mono.just(originalBody);
- }
- String modifiedBody = CryptorStrategyFactory.match(ruleHandle,
parseBody);
- if (modifiedBody == null) {
- return HttpUtil.fail(ruleHandle.getWay(), exchange);
- }
- return HttpUtil.success(originalBody, modifiedBody,
ruleHandle.getWay(), ruleHandle.getFieldNames());
- }
}
diff --git
a/shenyu-plugin/shenyu-plugin-modify-response/src/main/java/org/apache/shenyu/plugin/modify/response/ModifyResponsePlugin.java
b/shenyu-plugin/shenyu-plugin-modify-response/src/main/java/org/apache/shenyu/plugin/modify/response/ModifyResponsePlugin.java
index bba4aa1..78ae208 100644
---
a/shenyu-plugin/shenyu-plugin-modify-response/src/main/java/org/apache/shenyu/plugin/modify/response/ModifyResponsePlugin.java
+++
b/shenyu-plugin/shenyu-plugin-modify-response/src/main/java/org/apache/shenyu/plugin/modify/response/ModifyResponsePlugin.java
@@ -32,10 +32,10 @@ import org.apache.shenyu.plugin.base.AbstractShenyuPlugin;
import org.apache.shenyu.plugin.base.support.BodyInserterContext;
import org.apache.shenyu.plugin.base.support.CachedBodyOutputMessage;
import org.apache.shenyu.plugin.base.utils.CacheKeyUtils;
+import org.apache.shenyu.plugin.base.utils.ClientResponseUtils;
import
org.apache.shenyu.plugin.modify.response.handler.ModifyResponsePluginDataHandler;
import org.reactivestreams.Publisher;
import org.springframework.core.io.buffer.DataBuffer;
-import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ReactiveHttpOutputMessage;
@@ -104,7 +104,6 @@ public class ModifyResponsePlugin extends
AbstractShenyuPlugin {
.build();
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR,
modifyResponse);
}
-
return chain.execute(exchange.mutate()
.response(new ModifyServerHttpResponse(exchange,
modifyResponseRuleHandle)).build());
}
@@ -134,10 +133,7 @@ public class ModifyResponsePlugin extends
AbstractShenyuPlugin {
@Override
@NonNull
public Mono<Void> writeWith(@NonNull final Publisher<? extends
DataBuffer> body) {
- ClientResponse clientResponse =
exchange.getAttribute(Constants.CLIENT_RESPONSE_ATTR);
- if (Objects.isNull(clientResponse)) {
- clientResponse = prepareClientResponse(body,
this.getHeaders());
- }
+ ClientResponse clientResponse =
ClientResponseUtils.buildClientResponse(this.getDelegate(), body);
Mono<byte[]> modifiedBody = clientResponse.bodyToMono(byte[].class)
.flatMap(originalBody ->
Mono.just(updateResponse(originalBody)));
@@ -146,12 +142,8 @@ public class ModifyResponsePlugin extends
AbstractShenyuPlugin {
CachedBodyOutputMessage outputMessage = new
CachedBodyOutputMessage(exchange,
exchange.getResponse().getHeaders());
return bodyInserter.insert(outputMessage, new
BodyInserterContext()).then(Mono.defer(() -> {
- Mono<DataBuffer> messageBody =
DataBufferUtils.join(outputMessage.getBody());
- HttpHeaders headers = getDelegate().getHeaders();
- if (!headers.containsKey(HttpHeaders.TRANSFER_ENCODING)
- || headers.containsKey(HttpHeaders.CONTENT_LENGTH)) {
- messageBody = messageBody.doOnNext(data ->
headers.setContentLength(data.readableByteCount()));
- }
+ Mono<DataBuffer> messageBody =
ClientResponseUtils.fixBodyMessage(this.getDelegate(), outputMessage);
+ exchange.getAttributes().put(Constants.CLIENT_RESPONSE_ATTR,
clientResponse);
return getDelegate().writeWith(messageBody);
}));
@@ -183,11 +175,5 @@ public class ModifyResponsePlugin extends
AbstractShenyuPlugin {
}
return context.jsonString();
}
-
- private ClientResponse prepareClientResponse(final Publisher<? extends
DataBuffer> body, final HttpHeaders httpHeaders) {
- ClientResponse.Builder builder;
- builder =
ClientResponse.create(this.getDelegate().getStatusCode(),
ServerCodecConfigurer.create().getReaders());
- return builder.headers(headers ->
headers.putAll(httpHeaders)).body(Flux.from(body)).build();
- }
}
}