This is an automated email from the ASF dual-hosted git repository.
youling1128 pushed a commit to branch 2.9.x
in repository https://gitbox.apache.org/repos/asf/servicecomb-java-chassis.git
The following commit(s) were added to refs/heads/2.9.x by this push:
new 1e9504520 [#3934] fixed client not support sets
accept:text/event-stream problem (#4935)
1e9504520 is described below
commit 1e95045207a652fecd99ddc4cd421ffd85d27960
Author: Alex <[email protected]>
AuthorDate: Sat Oct 11 16:43:32 2025 +0800
[#3934] fixed client not support sets accept:text/event-stream problem
(#4935)
---
.../common/rest/definition/RestOperationMeta.java | 12 ++---
.../springmvc/client/ReactiveStreamIT.java | 53 ++++++++++++++++++++++
2 files changed, 59 insertions(+), 6 deletions(-)
diff --git
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
index 9da6d06f3..be091d8cd 100644
---
a/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
+++
b/common/common-rest/src/main/java/org/apache/servicecomb/common/rest/definition/RestOperationMeta.java
@@ -31,6 +31,7 @@ import jakarta.ws.rs.core.MediaType;
import org.apache.commons.lang3.StringUtils;
import org.apache.servicecomb.common.rest.codec.RestObjectMapperFactory;
import
org.apache.servicecomb.common.rest.codec.param.FormProcessorCreator.PartProcessor;
+import org.apache.servicecomb.common.rest.codec.produce.ProduceJsonProcessor;
import org.apache.servicecomb.common.rest.codec.produce.ProduceProcessor;
import
org.apache.servicecomb.common.rest.codec.produce.ProduceProcessorManager;
import org.apache.servicecomb.common.rest.definition.path.PathRegExp;
@@ -88,8 +89,6 @@ public class RestOperationMeta {
// 快速构建URL path
private URLPathBuilder pathBuilder;
- protected static final String EVENTS_MEDIA_TYPE =
MediaType.SERVER_SENT_EVENTS;
-
public void init(OperationMeta operationMeta) {
this.operationMeta = operationMeta;
@@ -260,10 +259,11 @@ public class RestOperationMeta {
ProduceProcessorManager.INSTANCE.getOrCreateAcceptMap(serialViewClass));
} else {
for (String produce : produces) {
- if (produce.contains(EVENTS_MEDIA_TYPE)) {
- // When the produce type is event-stream, the
ProduceEventStreamProcessor implementation class corresponding
- // to event-stream is not added, and it is set to the default type
ProduceJsonProcessor.
- // In case of an exception, the response result is parsed.
+ if (produce.contains(MediaType.SERVER_SENT_EVENTS)) {
+ // The event-stream type initializes the ProduceJsonProcessor in
memory to handle parsing results
+ // in exceptional scenarios. If a singleton parsing result is
required in normal scenarios,
+ // adjustments need to be made here.
+ this.produceProcessorMap.put(produce, new ProduceJsonProcessor());
continue;
}
if (produce.contains(";")) {
diff --git
a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java
index 4830f872c..06c61501a 100644
---
a/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java
+++
b/demo/demo-spring-boot-transport/demo-spring-boot-springmvc-client/src/main/java/org/apache/servicecomb/springboot/springmvc/client/ReactiveStreamIT.java
@@ -21,6 +21,7 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.StringUtils;
+import org.apache.servicecomb.provider.springmvc.reference.RestTemplateBuilder;
import org.apache.servicecomb.swagger.invocation.sse.SseEventResponseEntity;
import org.apache.servicecomb.demo.CategorizedTestCase;
import org.apache.servicecomb.demo.TestMgr;
@@ -31,7 +32,14 @@ import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
+import org.springframework.http.HttpEntity;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.ResponseEntity;
import org.springframework.stereotype.Component;
+import org.springframework.web.client.RestTemplate;
+
+import jakarta.ws.rs.core.MediaType;
@Component
public class ReactiveStreamIT implements CategorizedTestCase {
@@ -39,6 +47,8 @@ public class ReactiveStreamIT implements CategorizedTestCase {
@Qualifier("reactiveStreamProvider")
ReactiveStreamClient reactiveStreamProvider;
+ private RestTemplate restTemplate = RestTemplateBuilder.create();
+
@Override
public void testRestTransport() throws Exception {
testSseString(reactiveStreamProvider);
@@ -46,6 +56,7 @@ public class ReactiveStreamIT implements CategorizedTestCase {
testSseModel(reactiveStreamProvider);
testSseResponseEntity(reactiveStreamProvider);
testSseMultipleData(reactiveStreamProvider);
+ sseStringWithAccept();
}
private void testSseString(ReactiveStreamClient client) throws Exception {
@@ -91,6 +102,48 @@ public class ReactiveStreamIT implements
CategorizedTestCase {
return buffer.toString();
}
+ private void sseStringWithAccept() throws Exception {
+ HttpHeaders headers = new HttpHeaders();
+ headers.add(HttpHeaders.ACCEPT, MediaType.SERVER_SENT_EVENTS);
+ HttpEntity<?> requestEntity = new HttpEntity<>(headers);
+ ResponseEntity<Publisher> responseEntity = restTemplate
+ .exchange("cse://springmvc/sseString", HttpMethod.GET, requestEntity,
Publisher.class);
+ Publisher result = responseEntity.getBody();
+ CountDownLatch countDownLatch = new CountDownLatch(1);
+ StringBuilder buffer = new StringBuilder();
+ result.subscribe(new Subscriber<>() {
+ Subscription subscription;
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ subscription = s;
+ subscription.request(1);
+ }
+
+ @Override
+ public void onNext(Object entity) {
+ SseEventResponseEntity<String> response =
(SseEventResponseEntity<String>) entity;
+ for (String str : response.getData()) {
+ buffer.append(str);
+ }
+ subscription.request(1);
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ subscription.cancel();
+ countDownLatch.countDown();
+ }
+
+ @Override
+ public void onComplete() {
+ countDownLatch.countDown();
+ }
+ });
+ countDownLatch.await(10, TimeUnit.SECONDS);
+ TestMgr.check("abc", buffer.toString());
+ }
+
private void testSseModel(ReactiveStreamClient client) throws Exception {
Publisher<Model> result = client.sseModel();
CountDownLatch countDownLatch = new CountDownLatch(1);