This is an automated email from the ASF dual-hosted git repository.
earthchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-samples.git
The following commit(s) were added to refs/heads/master by this push:
new fb2f15186 sample for triple backpressure (#1278)
fb2f15186 is described below
commit fb2f151860abe5b4985c15924ad58538912bc841
Author: Wang Chengming <[email protected]>
AuthorDate: Thu Jan 8 17:56:20 2026 +0800
sample for triple backpressure (#1278)
* sample for triple backpressure
* sample for triple backpressure
* sample for triple backpressure
* sample for triple backpressure
* sample for triple backpressure
* sample for triple backpressure
* Add dubbo version comment in pom.xml
Updated dubbo.version to 3.3.7-SNAPSHOT in pom.xml
* Migrate log4j.properties to log4j2.xml format
* add it
* change grpc pattern api
* filter build version
* filter build version
* filter build version
---------
Co-authored-by: chengming.wang <[email protected]>
Co-authored-by: zrlw <[email protected]>
Co-authored-by: earthchen <[email protected]>
Co-authored-by: earthchen <[email protected]>
---
.github/workflows/dubbo-3_2.yml | 8 +-
.github/workflows/dubbo-3_3.yml | 8 +-
.../case-configuration.yml | 48 ++
.../case-versions.conf | 23 +
.../dubbo-samples-triple-backpressure/pom.xml | 117 ++++
.../samples/backpressure/BackpressureConsumer.java | 212 ++++++
.../samples/backpressure/BackpressureProvider.java | 56 ++
.../backpressure/api/BackpressureService.java | 60 ++
.../dubbo/samples/backpressure/api/DataChunk.java | 82 +++
.../samples/backpressure/api/StreamRequest.java | 68 ++
.../samples/backpressure/api/StreamResponse.java | 82 +++
.../backpressure/impl/BackpressureServiceImpl.java | 363 +++++++++++
.../src/main/resources/log4j2.xml | 31 +
.../dubbo/samples/backpressure/BackpressureIT.java | 720 +++++++++++++++++++++
2-advanced/pom.xml | 1 +
test/scripts/filter-build-modules.sh | 303 +++++++++
16 files changed, 2180 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/dubbo-3_2.yml b/.github/workflows/dubbo-3_2.yml
index 647073140..1d38e91ab 100644
--- a/.github/workflows/dubbo-3_2.yml
+++ b/.github/workflows/dubbo-3_2.yml
@@ -59,9 +59,15 @@ jobs:
with:
distribution: 'zulu'
java-version: 21
+ - name: Filter modules by Dubbo version
+ id: filter-modules
+ run: |
+ EXCLUDED=$(bash ./test/scripts/filter-build-modules.sh ${{
needs.build-dubbo.outputs.version }})
+ echo "excluded_modules=$EXCLUDED" >> $GITHUB_OUTPUT
+ echo "Excluded modules: $EXCLUDED"
- name: Build with Maven
run: |
- ./mvnw $BUILD_OPTS
+ ./mvnw $BUILD_OPTS ${{ steps.filter-modules.outputs.excluded_modules
}}
- name: Clean with Maven
run: |
./mvnw --batch-mode --no-snapshot-updates --no-transfer-progress
--settings ${{github.workspace}}/.mvn/settings.xml clean
diff --git a/.github/workflows/dubbo-3_3.yml b/.github/workflows/dubbo-3_3.yml
index 85ef3ff35..6f56312be 100644
--- a/.github/workflows/dubbo-3_3.yml
+++ b/.github/workflows/dubbo-3_3.yml
@@ -62,9 +62,15 @@ jobs:
with:
distribution: 'zulu'
java-version: 21
+ - name: Filter modules by Dubbo version
+ id: filter-modules
+ run: |
+ EXCLUDED=$(bash ./test/scripts/filter-build-modules.sh ${{
needs.build-dubbo.outputs.version }})
+ echo "excluded_modules=$EXCLUDED" >> $GITHUB_OUTPUT
+ echo "Excluded modules: $EXCLUDED"
- name: Build with Maven
run: |
- ./mvnw $BUILD_OPTS
+ ./mvnw $BUILD_OPTS ${{ steps.filter-modules.outputs.excluded_modules
}}
- name: Clean with Maven
run: |
./mvnw --batch-mode --no-snapshot-updates --no-transfer-progress
--settings ${{github.workspace}}/.mvn/settings.xml clean
diff --git
a/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml
b/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml
new file mode 100644
index 000000000..396ffb97b
--- /dev/null
+++ b/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+services:
+ zookeeper:
+ image: zookeeper:latest
+
+ dubbo-samples-triple-backpressure:
+ type: app
+ basedir: .
+ mainClass: org.apache.dubbo.samples.backpressure.BackpressureProvider
+ systemProps:
+ - zookeeper.address=zookeeper
+ - zookeeper.port=2181
+ waitPortsBeforeRun:
+ - zookeeper:2181
+ checkPorts:
+ - 50051
+ checkLog: "BackpressureProvider started"
+
+ dubbo-samples-triple-backpressure-test:
+ type: test
+ basedir: .
+ tests:
+ - "**/*IT.class"
+ systemProps:
+ - zookeeper.address=zookeeper
+ - zookeeper.port=2181
+ - dubbo.address=dubbo-samples-triple-backpressure
+ - dubbo.port=50051
+ waitPortsBeforeRun:
+ - zookeeper:2181
+ - dubbo-samples-triple-backpressure:50051
+ depends_on:
+ - dubbo-samples-triple-backpressure
diff --git a/2-advanced/dubbo-samples-triple-backpressure/case-versions.conf
b/2-advanced/dubbo-samples-triple-backpressure/case-versions.conf
new file mode 100644
index 000000000..709a6f8dc
--- /dev/null
+++ b/2-advanced/dubbo-samples-triple-backpressure/case-versions.conf
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Supported component versions of the test case
+
+# Spring app
+dubbo.version=[ > 3.3.6 ]
+spring.version=6.*
+java.version= [>= 8]
diff --git a/2-advanced/dubbo-samples-triple-backpressure/pom.xml
b/2-advanced/dubbo-samples-triple-backpressure/pom.xml
new file mode 100644
index 000000000..c38992247
--- /dev/null
+++ b/2-advanced/dubbo-samples-triple-backpressure/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <groupId>org.apache</groupId>
+ <artifactId>apache</artifactId>
+ <version>23</version>
+ <relativePath/>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>dubbo-samples-triple-backpressure</artifactId>
+
+ <properties>
+ <source.level>1.8</source.level>
+ <target.level>1.8</target.level>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <!-- it should be replaced by release version after dubbo 3.3.7 is
released. -->
+ <dubbo.version>3.3.7-SNAPSHOT</dubbo.version>
+ <log4j2.version>2.20.0</log4j2.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo</artifactId>
+ <version>${dubbo.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.dubbo</groupId>
+ <artifactId>dubbo-zookeeper-curator5-spring-boot-starter</artifactId>
+ <version>${dubbo.version}</version>
+ </dependency>
+ <!-- Embedded Zookeeper Server Dependencies -->
+ <dependency>
+ <groupId>io.dropwizard.metrics</groupId>
+ <artifactId>metrics-core</artifactId>
+ <version>4.1.12.1</version>
+ </dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>1.1.10.5</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.logging.log4j</groupId>
+ <artifactId>log4j-slf4j-impl</artifactId>
+ <version>${log4j2.version}</version>
+ </dependency>
+ <!-- Test dependencies -->
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <version>4.13.2</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <!-- For jdk 11 above JavaEE annotation -->
+ <profile>
+ <id>javax.annotation</id>
+ <activation>
+ <jdk>[1.11,)</jdk>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>javax.annotation</groupId>
+ <artifactId>javax.annotation-api</artifactId>
+ <version>1.3.2</version>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <version>3.8.1</version>
+ <configuration>
+ <source>${source.level}</source>
+ <target>${target.level}</target>
+ </configuration>
+ </plugin>
+ <!-- Skip remote-resources-plugin from parent pom -->
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-remote-resources-plugin</artifactId>
+ <configuration>
+ <skip>true</skip>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
+
diff --git
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureConsumer.java
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureConsumer.java
new file mode 100644
index 000000000..d49de51a1
--- /dev/null
+++
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureConsumer.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.ReferenceConfig;
+import org.apache.dubbo.common.stream.ClientCallStreamObserver;
+import org.apache.dubbo.config.RegistryConfig;
+import org.apache.dubbo.config.bootstrap.DubboBootstrap;
+import org.apache.dubbo.samples.backpressure.api.BackpressureService;
+import org.apache.dubbo.samples.backpressure.api.DataChunk;
+import org.apache.dubbo.samples.backpressure.api.StreamRequest;
+import org.apache.dubbo.samples.backpressure.api.StreamResponse;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BackpressureConsumer {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BackpressureConsumer.class);
+
+ private static final String ZOOKEEPER_HOST =
System.getProperty("zookeeper.address", "127.0.0.1");
+ private static final String ZOOKEEPER_PORT =
System.getProperty("zookeeper.port", "2181");
+
+ public static void main(String[] args) throws Exception {
+ ReferenceConfig<BackpressureService> reference = new
ReferenceConfig<>();
+ reference.setInterface(BackpressureService.class);
+ reference.setProtocol("tri");
+
+ String zkAddress = "zookeeper://" + ZOOKEEPER_HOST + ":" +
ZOOKEEPER_PORT;
+ LOGGER.info("Using ZooKeeper: {}", zkAddress);
+
+ DubboBootstrap bootstrap = DubboBootstrap.getInstance();
+ bootstrap.application(new ApplicationConfig("backpressure-consumer"))
+ .registry(new RegistryConfig(zkAddress))
+ .reference(reference)
+ .start();
+
+ BackpressureService service = reference.get();
+
+ LOGGER.info("=== Test 1: Basic Echo ===");
+ testEcho(service);
+
+ LOGGER.info("\n=== Test 2: Client-side Backpressure (clientStream)
===");
+ testClientSideBackpressure(service);
+
+ LOGGER.info("\n=== Test 3: Server-side Backpressure (serverStream)
===");
+ testServerSideBackpressure(service);
+
+ LOGGER.info("\n✅ All tests completed!");
+ System.exit(0);
+ }
+
+ /**
+ * Test basic echo functionality.
+ */
+ public static void testEcho(BackpressureService service) {
+ String response = service.echo("Hello Backpressure");
+ LOGGER.info("Echo response: {}", response);
+ }
+
+ /**
+ * Test server-side backpressure.
+ * Server uses isReady() and setOnReadyHandler() to control sending rate.
+ */
+ public static void testServerSideBackpressure(BackpressureService service)
throws InterruptedException {
+ final int expectedCount = 50;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger received = new AtomicInteger(0);
+
+ StreamRequest request = new StreamRequest(expectedCount, 1024);
+
+ LOGGER.info("[Server-Backpressure] Requesting {} chunks from server",
expectedCount);
+
+ service.serverStream(request, new StreamObserver<DataChunk>() {
+ @Override
+ public void onNext(DataChunk chunk) {
+ int count = received.incrementAndGet();
+ if (count % 10 == 0) {
+ LOGGER.info("[Server-Backpressure] Received {} chunks",
count);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[Server-Backpressure] Error: {}",
throwable.getMessage());
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("[Server-Backpressure] Stream completed, total:
{}", received.get());
+ latch.countDown();
+ }
+ });
+
+ latch.await(60, TimeUnit.SECONDS);
+ LOGGER.info("[Server-Backpressure] Test finished, received: {}",
received.get());
+ }
+
+ /**
+ * Test client-side backpressure.
+ *
+ * <p>Client uses CallStreamObserver's isReady() and setOnReadyHandler()
+ * to control sending rate based on transport layer capacity.
+ */
+ public static void testClientSideBackpressure(BackpressureService service)
throws InterruptedException {
+ final int sendCount = 100;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger sent = new AtomicInteger(0);
+ final AtomicBoolean completed = new AtomicBoolean(false);
+ final byte[] data = new byte[1024];
+
+ LOGGER.info("[Client-Backpressure] Sending {} chunks to server using
backpressure", sendCount);
+
+ // Response observer
+ StreamObserver<StreamResponse> responseObserver = new
StreamObserver<StreamResponse>() {
+ @Override
+ public void onNext(StreamResponse response) {
+ LOGGER.info("[Client-Backpressure] Server received: {} chunks,
{} bytes in {}ms",
+ response.getTotalChunks(), response.getTotalBytes(),
response.getDurationMs());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[Client-Backpressure] Error: {}",
throwable.getMessage());
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("[Client-Backpressure] Response completed");
+ latch.countDown();
+ }
+ };
+
+ // Get request StreamObserver
+ StreamObserver<DataChunk> requestObserver =
service.clientStream(responseObserver);
+
+ // Cast to ClientCallStreamObserver interface to use backpressure API
for client side
+ ClientCallStreamObserver<DataChunk> callObserver =
(ClientCallStreamObserver<DataChunk>) requestObserver;
+
+ // Disable auto flow control for manual backpressure
+ callObserver.disableAutoFlowControl();
+ LOGGER.info("[Client-Backpressure] Disabled auto flow control");
+
+ // Set ready callback - called when stream becomes writable
+ callObserver.setOnReadyHandler(() -> {
+ LOGGER.info("[Client-Backpressure] onReadyHandler triggered, sent
so far: {}", sent.get());
+
+ // Send data while stream is ready and we have more to send
+ while (callObserver.isReady() && sent.get() < sendCount &&
!completed.get()) {
+ int seq = sent.getAndIncrement();
+ DataChunk chunk = new DataChunk(seq, data,
System.currentTimeMillis());
+ callObserver.onNext(chunk);
+
+ if ((seq + 1) % 20 == 0) {
+ LOGGER.info("[Client-Backpressure] Sent {} chunks
(isReady={})", seq + 1, callObserver.isReady());
+ }
+ }
+
+ // Complete if all data sent
+ if (sent.get() >= sendCount && !completed.getAndSet(true)) {
+ callObserver.onCompleted();
+ LOGGER.info("[Client-Backpressure] Completed via
onReadyHandler, total: {}", sent.get());
+ }
+ });
+
+ // Initial send (if writable)
+ LOGGER.info("[Client-Backpressure] Initial send, isReady={}",
callObserver.isReady());
+ while (callObserver.isReady() && sent.get() < sendCount) {
+ int seq = sent.getAndIncrement();
+ DataChunk chunk = new DataChunk(seq, data,
System.currentTimeMillis());
+ callObserver.onNext(chunk);
+
+ if ((seq + 1) % 20 == 0) {
+ LOGGER.info("[Client-Backpressure] Initial sent {} chunks",
seq + 1);
+ }
+ }
+
+ // Complete if all sent in initial phase
+ if (sent.get() >= sendCount && !completed.getAndSet(true)) {
+ callObserver.onCompleted();
+ LOGGER.info("[Client-Backpressure] All sent in initial phase,
total: {}", sent.get());
+ } else {
+ LOGGER.info("[Client-Backpressure] Paused at {} chunks, waiting
for onReadyHandler", sent.get());
+ }
+
+ latch.await(120, TimeUnit.SECONDS);
+ LOGGER.info("[Client-Backpressure] Test finished, total sent: {}",
sent.get());
+ }
+}
diff --git
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureProvider.java
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureProvider.java
new file mode 100644
index 000000000..a4f214a8f
--- /dev/null
+++
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureProvider.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.ProtocolConfig;
+import org.apache.dubbo.config.RegistryConfig;
+import org.apache.dubbo.config.ServiceConfig;
+import org.apache.dubbo.config.bootstrap.DubboBootstrap;
+import org.apache.dubbo.samples.backpressure.api.BackpressureService;
+import org.apache.dubbo.samples.backpressure.impl.BackpressureServiceImpl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BackpressureProvider {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BackpressureProvider.class);
+
+ private static final String ZOOKEEPER_HOST =
System.getProperty("zookeeper.address", "127.0.0.1");
+ private static final String ZOOKEEPER_PORT =
System.getProperty("zookeeper.port", "2181");
+
+ public static void main(String[] args) {
+ ServiceConfig<BackpressureService> service = new ServiceConfig<>();
+ service.setInterface(BackpressureService.class);
+ service.setRef(new BackpressureServiceImpl());
+
+ String zkAddress = "zookeeper://" + ZOOKEEPER_HOST + ":" +
ZOOKEEPER_PORT;
+ LOGGER.info("Using ZooKeeper: {}", zkAddress);
+
+ DubboBootstrap bootstrap = DubboBootstrap.getInstance();
+ bootstrap.application(new ApplicationConfig("backpressure-provider"))
+ .registry(new RegistryConfig(zkAddress))
+ .protocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051))
+ .service(service)
+ .start();
+
+ LOGGER.info("BackpressureProvider started on port 50051, waiting for
requests...");
+ bootstrap.await();
+ }
+}
diff --git
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/BackpressureService.java
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/BackpressureService.java
new file mode 100644
index 000000000..f97b63414
--- /dev/null
+++
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/BackpressureService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure.api;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+
+/**
+ * Service interface for demonstrating backpressure with Triple protocol.
+ */
+public interface BackpressureService {
+
+ /**
+ * Unary call - for verifying basic functionality is not affected.
+ */
+ String echo(String message);
+
+ /**
+ * Server streaming - server sends multiple responses.
+ * Used to test client-side backpressure.
+ *
+ * @param request stream request containing count and chunk size
+ * @param responseObserver observer to receive chunks
+ */
+ void serverStream(StreamRequest request, StreamObserver<DataChunk>
responseObserver);
+
+ /**
+ * Client streaming - client sends multiple requests.
+ * Used to test server-side backpressure.
+ *
+ * @param responseObserver observer to receive final response
+ * @return observer to send chunks
+ */
+ StreamObserver<DataChunk> clientStream(StreamObserver<StreamResponse>
responseObserver);
+
+ /**
+ * Bidirectional streaming - both sides send and receive.
+ * Used to test bidirectional backpressure.
+ *
+ * @param responseObserver observer to receive response chunks
+ * @return observer to send request chunks
+ */
+ StreamObserver<DataChunk> biStream(StreamObserver<DataChunk>
responseObserver);
+
+}
+
+
diff --git
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/DataChunk.java
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/DataChunk.java
new file mode 100644
index 000000000..45d04cd17
--- /dev/null
+++
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/DataChunk.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure.api;
+
+import java.io.Serializable;
+
+/**
+ * A chunk of data in the stream.
+ */
+public class DataChunk implements Serializable {
+
+ private static final long serialVersionUID = -1205830810422530386L;
+
+ /**
+ * Sequence number of this chunk.
+ */
+ private int sequenceNumber;
+
+ /**
+ * The data payload.
+ */
+ private byte[] data;
+
+ /**
+ * Timestamp when this chunk was created (for latency measurement).
+ */
+ private long timestamp;
+
+ public DataChunk() {
+ }
+
+ public DataChunk(int sequenceNumber, byte[] data, long timestamp) {
+ this.sequenceNumber = sequenceNumber;
+ this.data = data;
+ this.timestamp = timestamp;
+ }
+
+ public int getSequenceNumber() {
+ return sequenceNumber;
+ }
+
+ public void setSequenceNumber(int sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ public byte[] getData() {
+ return data;
+ }
+
+ public void setData(byte[] data) {
+ this.data = data;
+ }
+
+ public long getTimestamp() {
+ return timestamp;
+ }
+
+ public void setTimestamp(long timestamp) {
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public String toString() {
+ return "DataChunk{seq=" + sequenceNumber + ", dataLen=" + (data !=
null ? data.length : 0) + ", ts=" + timestamp + "}";
+ }
+}
+
+
diff --git
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamRequest.java
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamRequest.java
new file mode 100644
index 000000000..98835894a
--- /dev/null
+++
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamRequest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure.api;
+
+import java.io.Serializable;
+
+/**
+ * Request to start a stream.
+ */
+public class StreamRequest implements Serializable {
+
+ private static final long serialVersionUID = -7146758652012698927L;
+
+ /**
+ * Number of chunks to send.
+ */
+ private int count;
+
+ /**
+ * Size of each chunk data in bytes.
+ */
+ private int chunkSize;
+
+ public StreamRequest() {
+ }
+
+ public StreamRequest(int count, int chunkSize) {
+ this.count = count;
+ this.chunkSize = chunkSize;
+ }
+
+ public int getCount() {
+ return count;
+ }
+
+ public void setCount(int count) {
+ this.count = count;
+ }
+
+ public int getChunkSize() {
+ return chunkSize;
+ }
+
+ public void setChunkSize(int chunkSize) {
+ this.chunkSize = chunkSize;
+ }
+
+ @Override
+ public String toString() {
+ return "StreamRequest{count=" + count + ", chunkSize=" + chunkSize +
"}";
+ }
+}
+
+
diff --git
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamResponse.java
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamResponse.java
new file mode 100644
index 000000000..c77279d08
--- /dev/null
+++
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamResponse.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure.api;
+
+import java.io.Serializable;
+
+/**
+ * Response after stream completes.
+ */
+public class StreamResponse implements Serializable {
+
+ private static final long serialVersionUID = -5423429601473151879L;
+
+ /**
+ * Total number of chunks received.
+ */
+ private int totalChunks;
+
+ /**
+ * Total bytes received.
+ */
+ private long totalBytes;
+
+ /**
+ * Duration in milliseconds.
+ */
+ private long durationMs;
+
+ public StreamResponse() {
+ }
+
+ public StreamResponse(int totalChunks, long totalBytes, long durationMs) {
+ this.totalChunks = totalChunks;
+ this.totalBytes = totalBytes;
+ this.durationMs = durationMs;
+ }
+
+ public int getTotalChunks() {
+ return totalChunks;
+ }
+
+ public void setTotalChunks(int totalChunks) {
+ this.totalChunks = totalChunks;
+ }
+
+ public long getTotalBytes() {
+ return totalBytes;
+ }
+
+ public void setTotalBytes(long totalBytes) {
+ this.totalBytes = totalBytes;
+ }
+
+ public long getDurationMs() {
+ return durationMs;
+ }
+
+ public void setDurationMs(long durationMs) {
+ this.durationMs = durationMs;
+ }
+
+ @Override
+ public String toString() {
+ return "StreamResponse{totalChunks=" + totalChunks + ", totalBytes=" +
totalBytes + ", durationMs=" + durationMs + "}";
+ }
+}
+
+
diff --git
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/impl/BackpressureServiceImpl.java
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/impl/BackpressureServiceImpl.java
new file mode 100644
index 000000000..b8b892af8
--- /dev/null
+++
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/impl/BackpressureServiceImpl.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure.impl;
+
+import org.apache.dubbo.common.stream.CallStreamObserver;
+import org.apache.dubbo.common.stream.ServerCallStreamObserver;
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.samples.backpressure.api.BackpressureService;
+import org.apache.dubbo.samples.backpressure.api.DataChunk;
+import org.apache.dubbo.samples.backpressure.api.StreamRequest;
+import org.apache.dubbo.samples.backpressure.api.StreamResponse;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of BackpressureService demonstrating streaming with
backpressure.
+ */
+public class BackpressureServiceImpl implements BackpressureService {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BackpressureServiceImpl.class);
+
+ @Override
+ public String echo(String message) {
+ LOGGER.info("[Server] Echo received: {}", message);
+ return "Echo: " + message;
+ }
+
+ /**
+ * Server streaming with SERVER-SIDE backpressure.
+ */
+ @Override
+ public void serverStream(StreamRequest request, StreamObserver<DataChunk>
responseObserver) {
+ int totalCount = request.getCount();
+ int chunkSize = request.getChunkSize();
+ LOGGER.info("[Server-Backpressure] Starting server stream, count={},
chunkSize={}", totalCount, chunkSize);
+
+ // Use ServerCallStreamObserver interface for backpressure
+ if (responseObserver instanceof ServerCallStreamObserver) {
+ ServerCallStreamObserver<DataChunk> serverObserver =
+ (ServerCallStreamObserver<DataChunk>) responseObserver;
+ final AtomicInteger sentCount = new AtomicInteger(0);
+ final AtomicBoolean completed = new AtomicBoolean(false);
+ final byte[] data = new byte[chunkSize];
+
+ // Set ready callback - called when the stream becomes writable
+ serverObserver.setOnReadyHandler(() -> {
+ LOGGER.info("[Server-Backpressure] onReadyHandler triggered,
sent so far: {}", sentCount.get());
+
+ // Send data while stream is ready and we have more to send
+ while (serverObserver.isReady() && sentCount.get() <
totalCount && !completed.get()) {
+ int seq = sentCount.getAndIncrement();
+ DataChunk chunk = new DataChunk(seq, data,
System.currentTimeMillis());
+ serverObserver.onNext(chunk);
+
+ if ((seq + 1) % 10 == 0) {
+ LOGGER.info("[Server-Backpressure] Sent {} chunks
(isReady={})", seq + 1, serverObserver.isReady());
+ }
+ }
+
+ // Complete if all data sent
+ if (sentCount.get() >= totalCount &&
!completed.getAndSet(true)) {
+ serverObserver.onCompleted();
+ LOGGER.info("[Server-Backpressure] Completed via
onReadyHandler, total: {}", sentCount.get());
+ }
+ });
+
+ // Initial send (if writable)
+ LOGGER.info("[Server-Backpressure] Initial send, isReady={}",
serverObserver.isReady());
+ while (serverObserver.isReady() && sentCount.get() < totalCount) {
+ int seq = sentCount.getAndIncrement();
+ DataChunk chunk = new DataChunk(seq, data,
System.currentTimeMillis());
+ serverObserver.onNext(chunk);
+
+ if ((seq + 1) % 10 == 0) {
+ LOGGER.info("[Server-Backpressure] Initial sent {}
chunks", seq + 1);
+ }
+ }
+
+ // Complete if all sent in initial phase
+ if (sentCount.get() >= totalCount && !completed.getAndSet(true)) {
+ serverObserver.onCompleted();
+ LOGGER.info("[Server-Backpressure] All sent in initial phase,
total: {}", sentCount.get());
+ } else {
+ LOGGER.info("[Server-Backpressure] Paused at {} chunks,
waiting for onReadyHandler", sentCount.get());
+ }
+ }
+
+ }
+
+ /**
+ * Client streaming with SERVER-SIDE receive backpressure.
+ *
+ * <p>This demonstrates how server controls the rate of receiving data
from client
+ * using {@code disableAutoRequest()} and {@code request(int)} APIs
+ *
+ * <p>Dubbo API reference (aligned with gRPC):
+ * <ul>
+ * <li>{@code ServerCallStreamObserver.disableAutoRequest()} - Disable
automatic message requesting</li>
+ * <li>{@code ServerCallStreamObserver.request(int)} - Manually request
messages from client</li>
+ * </ul>
+ */
+ @Override
+ public StreamObserver<DataChunk>
clientStream(StreamObserver<StreamResponse> responseObserver) {
+ LOGGER.info("[Server-ReceiveBackpressure] Client stream started -
receiving data from client with backpressure");
+ final long startTime = System.currentTimeMillis();
+ final AtomicInteger chunkCount = new AtomicInteger(0);
+ final AtomicLong totalBytes = new AtomicLong(0);
+
+ // Cast to ServerCallStreamObserver for receive backpressure control ()
+ if (responseObserver instanceof ServerCallStreamObserver) {
+ ServerCallStreamObserver<StreamResponse> serverObserver =
+ (ServerCallStreamObserver<StreamResponse>)
responseObserver;
+
+ // Disable automatic message requesting - server will manually
control receive rate
+ serverObserver.disableAutoRequest();
+ LOGGER.info("[Server-ReceiveBackpressure] Disabled auto request,
server will control receive rate");
+
+ // Request initial batch of messages
+ final int initialRequest = 5;
+ serverObserver.request(initialRequest);
+ LOGGER.info("[Server-ReceiveBackpressure] Requested initial {}
messages", initialRequest);
+
+ return new StreamObserver<DataChunk>() {
+ @Override
+ public void onNext(DataChunk chunk) {
+ int count = chunkCount.incrementAndGet();
+ if (chunk.getData() != null) {
+ totalBytes.addAndGet(chunk.getData().length);
+ }
+
+ if (count % 10 == 0) {
+ LOGGER.info("[Server-ReceiveBackpressure] Received {}
chunks from client", count);
+ }
+
+ // After processing each message, request the next one
+ // This is the key to server-side receive backpressure
control
+ serverObserver.request(1);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[Server-ReceiveBackpressure] Client stream
error: {}", throwable.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+ long duration = System.currentTimeMillis() - startTime;
+ StreamResponse response = new
StreamResponse(chunkCount.get(), totalBytes.get(), duration);
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ LOGGER.info("[Server-ReceiveBackpressure] Client stream
completed: {} chunks, {} bytes in {}ms",
+ chunkCount.get(), totalBytes.get(), duration);
+ }
+ };
+ }
+
+ // Fallback: basic implementation without backpressure control
+ LOGGER.warn("[Server] Fallback to basic implementation without receive
backpressure");
+ return new StreamObserver<DataChunk>() {
+ @Override
+ public void onNext(DataChunk chunk) {
+ int count = chunkCount.incrementAndGet();
+ if (chunk.getData() != null) {
+ totalBytes.addAndGet(chunk.getData().length);
+ }
+ if (count % 10 == 0) {
+ LOGGER.info("[Server] Received {} chunks from client",
count);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[Server] Client stream error: {}",
throwable.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+ long duration = System.currentTimeMillis() - startTime;
+ StreamResponse response = new StreamResponse(chunkCount.get(),
totalBytes.get(), duration);
+ responseObserver.onNext(response);
+ responseObserver.onCompleted();
+ LOGGER.info("[Server] Client stream completed: {} chunks, {}
bytes in {}ms",
+ chunkCount.get(), totalBytes.get(), duration);
+ }
+ };
+ }
+
+ /**
+ * Bidirectional streaming with SERVER-SIDE send and receive backpressure.
+ *
+ * <p>This demonstrates complete server-side backpressure control:
+ * <ul>
+ * <li>Send backpressure: Using {@code setOnReadyHandler()} and {@code
isReady()} to control sending rate</li>
+ * <li>Receive backpressure: Using {@code disableAutoRequest()} and
{@code request(int)} to control receiving rate</li>
+ * </ul>
+ *
+ * <p>Dubbo API reference (aligned with gRPC):
+ * <ul>
+ * <li>{@code ServerCallStreamObserver.setOnReadyHandler(Runnable)} -
Callback when stream becomes writable</li>
+ * <li>{@code ServerCallStreamObserver.isReady()} - Check if stream is
ready to accept more data</li>
+ * <li>{@code ServerCallStreamObserver.disableAutoRequest()} - Disable
automatic message requesting</li>
+ * <li>{@code ServerCallStreamObserver.request(int)} - Manually request
messages from client</li>
+ * </ul>
+ */
+ @Override
+ public StreamObserver<DataChunk> biStream(StreamObserver<DataChunk>
responseObserver) {
+ LOGGER.info("[Server-BiStream-Backpressure] BiStream started -
bidirectional streaming with full backpressure control");
+ final AtomicInteger receivedCount = new AtomicInteger(0);
+ final AtomicInteger sentCount = new AtomicInteger(0);
+
+ // Cast to ServerCallStreamObserver for full backpressure control
+ if (responseObserver instanceof ServerCallStreamObserver) {
+ ServerCallStreamObserver<DataChunk> serverObserver =
+ (ServerCallStreamObserver<DataChunk>) responseObserver;
+
+ // Queue to buffer chunks that couldn't be sent immediately due to
backpressure
+ java.util.concurrent.ConcurrentLinkedQueue<DataChunk>
pendingChunks =
+ new java.util.concurrent.ConcurrentLinkedQueue<>();
+ final AtomicBoolean streamCompleted = new AtomicBoolean(false);
+
+ // === Send Backpressure Control ===
+ // Set onReadyHandler to send pending chunks when stream becomes
writable
+ serverObserver.setOnReadyHandler(() -> {
+ LOGGER.info("[Server-BiStream-Backpressure] onReadyHandler
triggered, isReady={}, pending={}",
+ serverObserver.isReady(), pendingChunks.size());
+
+ // Send pending chunks while stream is ready
+ while (serverObserver.isReady() && !pendingChunks.isEmpty()) {
+ DataChunk chunk = pendingChunks.poll();
+ if (chunk != null) {
+ serverObserver.onNext(chunk);
+ int sent = sentCount.incrementAndGet();
+ if (sent % 10 == 0) {
+ LOGGER.info("[Server-BiStream-Backpressure] Sent
{} response chunks via onReadyHandler", sent);
+ }
+ }
+ }
+
+ // Complete the stream if all chunks sent and stream is
completed
+ if (streamCompleted.get() && pendingChunks.isEmpty()) {
+ serverObserver.onCompleted();
+ LOGGER.info("[Server-BiStream-Backpressure] Completed via
onReadyHandler: received={}, sent={}",
+ receivedCount.get(), sentCount.get());
+ }
+ });
+
+ // === Receive Backpressure Control ===
+ // Disable automatic message requesting - server will manually
control receive rate
+ serverObserver.disableAutoRequest();
+ LOGGER.info("[Server-BiStream-Backpressure] Disabled auto request
for receive backpressure");
+
+ // Request initial batch of messages
+ final int initialRequest = 5;
+ serverObserver.request(initialRequest);
+ LOGGER.info("[Server-BiStream-Backpressure] Requested initial {}
messages, configured send backpressure with setOnReadyHandler", initialRequest);
+
+ return new StreamObserver<DataChunk>() {
+ @Override
+ public void onNext(DataChunk chunk) {
+ int count = receivedCount.incrementAndGet();
+ if (count % 10 == 0) {
+ LOGGER.info("[Server-BiStream-Backpressure] Received
{} chunks, isReady={}",
+ count, serverObserver.isReady());
+ }
+
+ // Prepare response chunk
+ DataChunk response = new DataChunk(
+ chunk.getSequenceNumber(),
+ chunk.getData(),
+ System.currentTimeMillis()
+ );
+
+ // Try to send immediately if stream is ready, otherwise
buffer
+ if (serverObserver.isReady()) {
+ serverObserver.onNext(response);
+ int sent = sentCount.incrementAndGet();
+ if (sent % 10 == 0) {
+ LOGGER.info("[Server-BiStream-Backpressure] Sent
{} response chunks directly", sent);
+ }
+ } else {
+ // Buffer the chunk - onReadyHandler will send it when
stream becomes writable
+ pendingChunks.offer(response);
+ LOGGER.debug("[Server-BiStream-Backpressure] Buffered
chunk, pending={}", pendingChunks.size());
+ }
+
+ // Request next message (receive backpressure control)
+ serverObserver.request(1);
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[Server-BiStream-Backpressure] Error: {}",
throwable.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+ streamCompleted.set(true);
+ // If all pending chunks sent, complete now; otherwise
onReadyHandler will complete
+ if (pendingChunks.isEmpty()) {
+ serverObserver.onCompleted();
+ LOGGER.info("[Server-BiStream-Backpressure] Completed
immediately: received={}, sent={}",
+ receivedCount.get(), sentCount.get());
+ } else {
+ LOGGER.info("[Server-BiStream-Backpressure] Waiting
for pending chunks to be sent: {}",
+ pendingChunks.size());
+ }
+ }
+ };
+ }
+
+ // Fallback: basic implementation without backpressure control
+ LOGGER.warn("[Server-BiStream] Fallback to basic implementation
without backpressure");
+ CallStreamObserver<DataChunk> callObserver =
(CallStreamObserver<DataChunk>) responseObserver;
+
+ return new StreamObserver<DataChunk>() {
+ @Override
+ public void onNext(DataChunk chunk) {
+ int count = receivedCount.incrementAndGet();
+ if (count % 10 == 0) {
+ LOGGER.info("[Server-BiStream] Received {} chunks", count);
+ }
+ DataChunk response = new DataChunk(
+ chunk.getSequenceNumber(),
+ chunk.getData(),
+ System.currentTimeMillis()
+ );
+ callObserver.onNext(response);
+ sentCount.incrementAndGet();
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[Server-BiStream] Error: {}",
throwable.getMessage());
+ }
+
+ @Override
+ public void onCompleted() {
+ callObserver.onCompleted();
+ LOGGER.info("[Server-BiStream] Completed: received={},
sent={}",
+ receivedCount.get(), sentCount.get());
+ }
+ };
+ }
+}
diff --git
a/2-advanced/dubbo-samples-triple-backpressure/src/main/resources/log4j2.xml
b/2-advanced/dubbo-samples-triple-backpressure/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..06efcb47f
--- /dev/null
+++ b/2-advanced/dubbo-samples-triple-backpressure/src/main/resources/log4j2.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT" follow="true">
+ <PatternLayout pattern="%style{%d{HH:mm:ss.SSS}}{Magenta}
%style{|-}{White}%highlight{%-5p} [%t] %style{%40.40c}{Cyan}:%style{%-3L}{Blue}
%style{-|}{White}
%m%n%rEx{filters(jdk.internal.reflect,java.lang.reflect,sun.reflect)}"
disableAnsi="false" charset="UTF-8"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="info">
+ <AppenderRef ref="Console"/>
+ </Root>
+ </Loggers>
+</Configuration>
+
+
diff --git
a/2-advanced/dubbo-samples-triple-backpressure/src/test/java/org/apache/dubbo/samples/backpressure/BackpressureIT.java
b/2-advanced/dubbo-samples-triple-backpressure/src/test/java/org/apache/dubbo/samples/backpressure/BackpressureIT.java
new file mode 100644
index 000000000..752e61415
--- /dev/null
+++
b/2-advanced/dubbo-samples-triple-backpressure/src/test/java/org/apache/dubbo/samples/backpressure/BackpressureIT.java
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure;
+
+import org.apache.dubbo.common.stream.ClientCallStreamObserver;
+import org.apache.dubbo.common.stream.ClientResponseObserver;
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.ProtocolConfig;
+import org.apache.dubbo.config.ReferenceConfig;
+import org.apache.dubbo.config.RegistryConfig;
+import org.apache.dubbo.config.ServiceConfig;
+import org.apache.dubbo.config.bootstrap.DubboBootstrap;
+import org.apache.dubbo.samples.backpressure.api.BackpressureService;
+import org.apache.dubbo.samples.backpressure.api.DataChunk;
+import org.apache.dubbo.samples.backpressure.api.StreamRequest;
+import org.apache.dubbo.samples.backpressure.api.StreamResponse;
+import org.apache.dubbo.samples.backpressure.impl.BackpressureServiceImpl;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for Triple protocol backpressure functionality.
+ *
+ * <p>This test covers all streaming scenarios with backpressure APIs:
+ *
+ * <h3>High-Level API (setOnReadyHandler) - Controlling Send Rate</h3>
+ * <ul>
+ * <li>{@link #testServerStreamWithOnReadyHandler()} - Server uses isReady()
+ setOnReadyHandler() to control sending</li>
+ * <li>{@link #testClientStreamWithOnReadyHandler()} - Client uses isReady()
+ setOnReadyHandler() to control sending</li>
+ * <li>{@link #testBiStreamWithOnReadyHandler()} - Both sides use
setOnReadyHandler()</li>
+ * </ul>
+ *
+ * <h3>Low-Level API (disableAutoRequestWithInitial) - Controlling Receive
Rate (Client-Side)</h3>
+ * <ul>
+ * <li>{@link #testServerStreamWithManualRequest()} - Client uses
ClientResponseObserver.beforeStart() to control receive rate</li>
+ * <li>{@link #testClientStreamWithDisableAutoRequestWithInitial()} - Client
uses ClientResponseObserver.beforeStart()</li>
+ * <li>{@link #testBiStreamWithDisableAutoRequestWithInitial()} - Client
uses ClientResponseObserver.beforeStart()</li>
+ * </ul>
+ *
+ * <h3>Server-Side Backpressure API (disableAutoRequest) - Controlling Receive
Rate (Server-Side)</h3>
+ * <ul>
+ * <li>{@link #testClientStreamWithServerReceiveBackpressure()} - Server
uses disableAutoRequest() + request() to control receive rate</li>
+ * <li>{@link #testBiStreamWithServerFullBackpressure()} - Server uses full
backpressure control (send + receive)</li>
+ * </ul>
+ *
+ * <p>Note: For client-side, Dubbo uses {@code
disableAutoRequestWithInitial(int)} which combines gRPC's
+ * {@code disableAutoRequest()} and {@code request(int)} into a single method
call.
+ * For server-side, Dubbo uses {@code disableAutoRequest()} followed by {@code
request(int)} separately.
+ */
+public class BackpressureIT {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(BackpressureIT.class);
+
+ // Use random port to avoid conflicts between consecutive test runs
+ private static final int PORT = 50052 + (int)(Math.random() * 1000);
+
+ private static BackpressureService service;
+ private static DubboBootstrap bootstrap;
+
+ @BeforeClass
+ public static void setup() {
+ // Provider config
+ ServiceConfig<BackpressureService> serviceConfig = new
ServiceConfig<>();
+ serviceConfig.setInterface(BackpressureService.class);
+ serviceConfig.setRef(new BackpressureServiceImpl());
+
+ // Consumer config with direct connection (no registry needed)
+ ReferenceConfig<BackpressureService> reference = new
ReferenceConfig<>();
+ reference.setInterface(BackpressureService.class);
+ reference.setUrl("tri://127.0.0.1:" + PORT);
+ reference.setTimeout(60000);
+
+ // Start both provider and consumer using newInstance to avoid
singleton pollution
+ bootstrap = DubboBootstrap.getInstance();
+ bootstrap.application(new ApplicationConfig("backpressure-test"))
+ .registry(new RegistryConfig("N/A")) // No registry needed
+ .protocol(new ProtocolConfig("tri", PORT))
+ .service(serviceConfig)
+ .reference(reference)
+ .start();
+
+ service = reference.get();
+ LOGGER.info("Provider and Consumer started on port {}", PORT);
+
+ // Warm up the connection to ensure resources are properly initialized
+ // This prevents timing issues when running individual tests
+ try {
+ service.echo("warmup");
+ LOGGER.info("Connection warmup completed");
+ } catch (Exception e) {
+ LOGGER.warn("Warmup failed, but continuing: {}", e.getMessage());
+ }
+ }
+
+ @AfterClass
+ public static void teardown() {
+ if (bootstrap != null) {
+ bootstrap.stop();
+ }
+ }
+
+ // ==================== Basic Test ====================
+
+ @Test
+ public void testEcho() {
+ String result = service.echo("Hello Backpressure");
+ LOGGER.info("Echo response: {}", result);
+ Assert.assertNotNull(result);
+ Assert.assertTrue(result.contains("Hello Backpressure"));
+ }
+
+ // ==================== Server Stream Tests ====================
+
+ /**
+ * Test server streaming with HIGH-LEVEL API (setOnReadyHandler).
+ * Server uses isReady() and setOnReadyHandler() to control sending rate.
+ * This demonstrates SERVER-SIDE send backpressure.
+ */
+ @Test
+ public void testServerStreamWithOnReadyHandler() throws
InterruptedException {
+ final int requestCount = 30;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger receivedCount = new AtomicInteger(0);
+
+ StreamRequest request = new StreamRequest(requestCount, 1024);
+
+ service.serverStream(request, new StreamObserver<DataChunk>() {
+ @Override
+ public void onNext(DataChunk chunk) {
+ int count = receivedCount.incrementAndGet();
+ if (count % 10 == 0) {
+ LOGGER.info("[ServerStream-OnReady] Received {} chunks",
count);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[ServerStream-OnReady] Error: {}",
throwable.getMessage());
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("[ServerStream-OnReady] Completed, received: {}",
receivedCount.get());
+ latch.countDown();
+ }
+ });
+
+ boolean completed = latch.await(30, TimeUnit.SECONDS);
+ Assert.assertTrue("Stream should complete within timeout", completed);
+ Assert.assertEquals("Should receive all chunks", requestCount,
receivedCount.get());
+ LOGGER.info("✅ Server stream with onReadyHandler test passed!");
+ }
+
+ /**
+ * Test server streaming with LOW-LEVEL API
(disableAutoRequestWithInitial).
+ * Client uses ClientResponseObserver.beforeStart() to configure receive
backpressure
+ * BEFORE the stream starts, following gRPC's pattern.
+ * This demonstrates CLIENT-SIDE receive backpressure.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testServerStreamWithManualRequest() throws
InterruptedException {
+ final int requestCount = 30;
+ final int initialRequest = 5;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger receivedCount = new AtomicInteger(0);
+
+ StreamRequest request = new StreamRequest(requestCount, 1024);
+
+ // Use ClientResponseObserver to configure receive backpressure BEFORE
the stream starts ()
+ ClientResponseObserver<StreamRequest, DataChunk> responseObserver =
+ new ClientResponseObserver<StreamRequest, DataChunk>() {
+ // Member variable to hold the stream observer for manual request
+ private ClientCallStreamObserver<StreamRequest> requestStream;
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver<StreamRequest>
requestStream) {
+ this.requestStream = requestStream;
+ // Configure receive backpressure - disable auto request and
set initial request
+ requestStream.disableAutoRequestWithInitial(initialRequest);
+ LOGGER.info("[ServerStream-ManualRequest] beforeStart:
configured receive backpressure, initialRequest={}",
+ initialRequest);
+ }
+
+ @Override
+ public void onNext(DataChunk chunk) {
+ int count = receivedCount.incrementAndGet();
+ if (count % 5 == 0) {
+ LOGGER.info("[ServerStream-ManualRequest] Received {}
chunks", count);
+ }
+
+ // After processing each chunk, request more data
+ // This simulates controlled consumption rate
+ if (requestStream != null) {
+ requestStream.request(1);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[ServerStream-ManualRequest] Error: {}",
throwable.getMessage());
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("[ServerStream-ManualRequest] Completed, received:
{}", receivedCount.get());
+ latch.countDown();
+ }
+ };
+
+ // Start the stream - beforeStart() is called inside
+ service.serverStream(request, responseObserver);
+
+ boolean completed = latch.await(60, TimeUnit.SECONDS);
+ Assert.assertTrue("Stream should complete within timeout", completed);
+ Assert.assertEquals("Should receive all chunks", requestCount,
receivedCount.get());
+ LOGGER.info("✅ Server stream with manual request test passed!");
+ }
+
+ // ==================== Client Stream Tests ====================
+
+ /**
+ * Test client streaming with HIGH-LEVEL API (setOnReadyHandler).
+ * Uses ClientResponseObserver.beforeStart() to configure send backpressure
+ * BEFORE the stream starts, following gRPC's pattern.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testClientStreamWithOnReadyHandler() throws
InterruptedException {
+ final int sendCount = 50;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger sent = new AtomicInteger(0);
+ final AtomicBoolean sendCompleted = new AtomicBoolean(false);
+ final AtomicInteger serverReceivedChunks = new AtomicInteger(0);
+ final byte[] data = new byte[1024];
+
+ // Use ClientResponseObserver to configure backpressure BEFORE the
stream starts ()
+ ClientResponseObserver<DataChunk, StreamResponse> responseObserver =
+ new ClientResponseObserver<DataChunk, StreamResponse>() {
+ @Override
+ public void beforeStart(ClientCallStreamObserver<DataChunk>
requestStream) {
+ LOGGER.info("[ClientStream-OnReady] beforeStart called");
+
+ // Disable auto flow control for manual send control
+ requestStream.disableAutoFlowControl();
+
+ // Set onReadyHandler BEFORE the stream starts
+ requestStream.setOnReadyHandler(() -> {
+ LOGGER.info("[ClientStream-OnReady] onReadyHandler
triggered, isReady={}, sent={}",
+ requestStream.isReady(), sent.get());
+ while (requestStream.isReady() && sent.get() < sendCount
&& !sendCompleted.get()) {
+ int seq = sent.getAndIncrement();
+ requestStream.onNext(new DataChunk(seq, data,
System.currentTimeMillis()));
+ }
+
+ if (sent.get() >= sendCount &&
!sendCompleted.getAndSet(true)) {
+ requestStream.onCompleted();
+ LOGGER.info("[ClientStream-OnReady] onCompleted
called");
+ }
+ });
+ }
+
+ @Override
+ public void onNext(StreamResponse response) {
+ serverReceivedChunks.set(response.getTotalChunks());
+ LOGGER.info("[ClientStream-OnReady] Server received: {}
chunks", response.getTotalChunks());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[ClientStream-OnReady] Error: {}",
throwable.getMessage());
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("[ClientStream-OnReady] Completed");
+ latch.countDown();
+ }
+ };
+
+ // Start the stream - beforeStart() is called inside
+ StreamObserver<DataChunk> requestObserver =
service.clientStream(responseObserver);
+
+ // Note: We don't configure anything here because it's already done in
beforeStart()
+
+ boolean completed = latch.await(60, TimeUnit.SECONDS);
+ Assert.assertTrue("Stream should complete within timeout", completed);
+ Assert.assertEquals("Server should receive all chunks", sendCount,
serverReceivedChunks.get());
+ LOGGER.info("✅ Client stream with onReadyHandler test passed!");
+ }
+
+ /**
+ * Test client streaming with LOW-LEVEL API
(disableAutoRequestWithInitial).
+ * Uses ClientResponseObserver.beforeStart() to set up both send and
receive backpressure
+ * BEFORE the stream starts, following gRPC's pattern.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testClientStreamWithDisableAutoRequestWithInitial() throws
InterruptedException {
+ final int sendCount = 30;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger sent = new AtomicInteger(0);
+ final AtomicInteger serverReceivedChunks = new AtomicInteger(0);
+ final AtomicBoolean responseReceived = new AtomicBoolean(false);
+ final byte[] data = new byte[1024];
+
+ // Use ClientResponseObserver to configure backpressure BEFORE the
stream starts ()
+ ClientResponseObserver<DataChunk, StreamResponse> responseObserver =
+ new ClientResponseObserver<DataChunk, StreamResponse>() {
+ @Override
+ public void beforeStart(ClientCallStreamObserver<DataChunk>
requestStream) {
+ LOGGER.info("[ClientStream-DisableAutoRequestWithInitial]
beforeStart called");
+
+ // Configure receive backpressure
+ requestStream.disableAutoRequestWithInitial(10);
+
+ // Configure send backpressure
+ requestStream.disableAutoFlowControl();
+
+ // Set onReadyHandler BEFORE the stream starts
+ requestStream.setOnReadyHandler(() -> {
+ LOGGER.info("[ClientStream-DisableAutoRequestWithInitial]
onReadyHandler triggered, isReady={}, sent={}",
+ requestStream.isReady(), sent.get());
+ while (requestStream.isReady() && sent.get() < sendCount) {
+ int seq = sent.getAndIncrement();
+ requestStream.onNext(new DataChunk(seq, data,
System.currentTimeMillis()));
+ }
+
+ if (sent.get() >= sendCount) {
+ requestStream.onCompleted();
+
LOGGER.info("[ClientStream-DisableAutoRequestWithInitial] onCompleted called");
+ }
+ });
+ }
+
+ @Override
+ public void onNext(StreamResponse response) {
+ serverReceivedChunks.set(response.getTotalChunks());
+ responseReceived.set(true);
+ LOGGER.info("[ClientStream-DisableAutoRequestWithInitial]
Response: {} chunks",
+ response.getTotalChunks());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[ClientStream-DisableAutoRequestWithInitial]
Error: {}", throwable.getMessage());
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("[ClientStream-DisableAutoRequestWithInitial]
Completed");
+ latch.countDown();
+ }
+ };
+
+ // Start the stream - beforeStart() is called inside
+ StreamObserver<DataChunk> requestObserver =
service.clientStream(responseObserver);
+
+ // Note: We don't configure anything here because it's already done in
beforeStart()
+
+ boolean completed = latch.await(60, TimeUnit.SECONDS);
+ Assert.assertTrue("Stream should complete within timeout", completed);
+ Assert.assertEquals("Server should receive all chunks", sendCount,
serverReceivedChunks.get());
+ Assert.assertTrue("Should receive response", responseReceived.get());
+ LOGGER.info("✅ Client stream with disableAutoRequestWithInitial test
passed!");
+ }
+
+ /**
+ * Test client streaming with SERVER-SIDE receive backpressure.
+ * Server uses disableAutoRequest() and request(int) to control receiving
rate.
+ * This demonstrates SERVER-SIDE receive backpressure ().
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testClientStreamWithServerReceiveBackpressure() throws
InterruptedException {
+ final int sendCount = 50;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger sent = new AtomicInteger(0);
+ final AtomicBoolean sendCompleted = new AtomicBoolean(false);
+ final AtomicInteger serverReceivedChunks = new AtomicInteger(0);
+ final byte[] data = new byte[1024];
+
+ // Use ClientResponseObserver to configure send backpressure
+ ClientResponseObserver<DataChunk, StreamResponse> responseObserver =
+ new ClientResponseObserver<DataChunk, StreamResponse>() {
+ @Override
+ public void beforeStart(ClientCallStreamObserver<DataChunk>
requestStream) {
+ LOGGER.info("[ClientStream-ServerReceiveBackpressure]
beforeStart called");
+
+ // Disable auto flow control for manual send control
+ requestStream.disableAutoFlowControl();
+
+ // Set onReadyHandler to send data when ready
+ requestStream.setOnReadyHandler(() -> {
+ LOGGER.info("[ClientStream-ServerReceiveBackpressure]
onReadyHandler triggered, isReady={}, sent={}",
+ requestStream.isReady(), sent.get());
+ while (requestStream.isReady() && sent.get() < sendCount
&& !sendCompleted.get()) {
+ int seq = sent.getAndIncrement();
+ requestStream.onNext(new DataChunk(seq, data,
System.currentTimeMillis()));
+ }
+
+ if (sent.get() >= sendCount &&
!sendCompleted.getAndSet(true)) {
+ requestStream.onCompleted();
+ LOGGER.info("[ClientStream-ServerReceiveBackpressure]
onCompleted called");
+ }
+ });
+ }
+
+ @Override
+ public void onNext(StreamResponse response) {
+ serverReceivedChunks.set(response.getTotalChunks());
+ LOGGER.info("[ClientStream-ServerReceiveBackpressure] Server
received: {} chunks, {} bytes in {}ms",
+ response.getTotalChunks(), response.getTotalBytes(),
response.getDurationMs());
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[ClientStream-ServerReceiveBackpressure] Error:
{}", throwable.getMessage());
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("[ClientStream-ServerReceiveBackpressure]
Completed");
+ latch.countDown();
+ }
+ };
+
+ // Start the stream - server will use disableAutoRequest() to control
receive rate
+ StreamObserver<DataChunk> requestObserver =
service.clientStream(responseObserver);
+
+ boolean completed = latch.await(60, TimeUnit.SECONDS);
+ Assert.assertTrue("Stream should complete within timeout", completed);
+ Assert.assertEquals("Server should receive all chunks", sendCount,
serverReceivedChunks.get());
+ LOGGER.info("✅ Client stream with server receive backpressure test
passed!");
+ }
+
+ // ==================== Bidirectional Stream Tests ====================
+
+ /**
+ * Test bidirectional streaming with HIGH-LEVEL API (setOnReadyHandler).
+ * Uses ClientResponseObserver.beforeStart() to configure send backpressure
+ * BEFORE the stream starts, following gRPC's pattern.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testBiStreamWithOnReadyHandler() throws InterruptedException {
+ final int sendCount = 30;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger sent = new AtomicInteger(0);
+ final AtomicInteger received = new AtomicInteger(0);
+ final AtomicBoolean sendCompleted = new AtomicBoolean(false);
+ final byte[] data = new byte[1024];
+
+ // Use AtomicReference to access the request stream from the
onReadyHandler
+ final AtomicReference<ClientCallStreamObserver<DataChunk>>
requestStreamRef = new AtomicReference<>();
+
+ // Use ClientResponseObserver to configure backpressure BEFORE the
stream starts ()
+ ClientResponseObserver<DataChunk, DataChunk> responseObserver =
+ new ClientResponseObserver<DataChunk, DataChunk>() {
+ @Override
+ public void beforeStart(ClientCallStreamObserver<DataChunk>
requestStream) {
+ requestStreamRef.set(requestStream);
+
+ // Disable auto flow control for manual send control
+ requestStream.disableAutoFlowControl();
+
+ // Set onReadyHandler BEFORE the stream starts
+ requestStream.setOnReadyHandler(() -> {
+ LOGGER.info("[BiStream-OnReady] onReadyHandler triggered,
isReady={}, sent={}",
+ requestStream.isReady(), sent.get());
+ while (requestStream.isReady() && sent.get() < sendCount
&& !sendCompleted.get()) {
+ int seq = sent.getAndIncrement();
+ requestStream.onNext(new DataChunk(seq, data,
System.currentTimeMillis()));
+ }
+
+ if (sent.get() >= sendCount &&
!sendCompleted.getAndSet(true)) {
+ requestStream.onCompleted();
+ LOGGER.info("[BiStream-OnReady] onCompleted called");
+ }
+ });
+ LOGGER.info("[BiStream-OnReady] beforeStart: configured send
backpressure");
+ }
+
+ @Override
+ public void onNext(DataChunk chunk) {
+ int count = received.incrementAndGet();
+ if (count % 10 == 0) {
+ LOGGER.info("[BiStream-OnReady] Received {} response
chunks", count);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[BiStream-OnReady] Error: {}",
throwable.getMessage());
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("[BiStream-OnReady] Completed, sent: {}, received:
{}",
+ sent.get(), received.get());
+ latch.countDown();
+ }
+ };
+
+ // Start the stream - beforeStart() is called inside
+ StreamObserver<DataChunk> requestObserver =
service.biStream(responseObserver);
+
+ // Note: We don't need to set onReadyHandler here because it's already
set in beforeStart()
+ // The onReadyHandler will be triggered by the framework when the
stream becomes ready
+
+ boolean completed = latch.await(60, TimeUnit.SECONDS);
+ Assert.assertTrue("BiStream should complete within timeout",
completed);
+ Assert.assertEquals("Should send all chunks", sendCount, sent.get());
+ Assert.assertTrue("Should receive response chunks", received.get() >
0);
+ LOGGER.info("✅ BiStream with onReadyHandler test passed! sent={},
received={}",
+ sent.get(), received.get());
+ }
+
+ /**
+ * Test bidirectional streaming with LOW-LEVEL API
(disableAutoRequestWithInitial).
+ * Uses ClientResponseObserver.beforeStart() to set up BOTH send and
receive backpressure
+ * BEFORE the stream starts, following gRPC's pattern.
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testBiStreamWithDisableAutoRequestWithInitial() throws
InterruptedException {
+ final int sendCount = 30;
+ final int initialRequest = 10;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger sent = new AtomicInteger(0);
+ final AtomicInteger received = new AtomicInteger(0);
+ final AtomicBoolean sendCompleted = new AtomicBoolean(false);
+ final byte[] data = new byte[1024];
+
+ // Use ClientResponseObserver to configure BOTH send and receive
backpressure BEFORE the stream starts ()
+ ClientResponseObserver<DataChunk, DataChunk> responseObserver =
+ new ClientResponseObserver<DataChunk, DataChunk>() {
+ // Member variable to hold the request stream - same pattern as
gRPC
+ private ClientCallStreamObserver<DataChunk> requestStream;
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver<DataChunk>
requestStream) {
+ this.requestStream = requestStream;
+
+ // Configure receive backpressure (LOW-LEVEL API)
+ requestStream.disableAutoRequestWithInitial(initialRequest);
+
+ // Configure send backpressure (HIGH-LEVEL API)
+ requestStream.disableAutoFlowControl();
+
+ // Set onReadyHandler BEFORE the stream starts ()
+ requestStream.setOnReadyHandler(() -> {
+ LOGGER.info("[BiStream-DisableAutoRequestWithInitial]
onReadyHandler triggered, isReady={}, sent={}",
+ requestStream.isReady(), sent.get());
+ while (requestStream.isReady() && sent.get() < sendCount
&& !sendCompleted.get()) {
+ int seq = sent.getAndIncrement();
+ requestStream.onNext(new DataChunk(seq, data,
System.currentTimeMillis()));
+ }
+
+ if (sent.get() >= sendCount &&
!sendCompleted.getAndSet(true)) {
+ requestStream.onCompleted();
+ LOGGER.info("[BiStream-DisableAutoRequestWithInitial]
onCompleted called");
+ }
+ });
+
+ LOGGER.info("[BiStream-DisableAutoRequestWithInitial]
beforeStart: configured both send and receive backpressure");
+ }
+
+ @Override
+ public void onNext(DataChunk chunk) {
+ int count = received.incrementAndGet();
+ if (count % 10 == 0) {
+ LOGGER.info("[BiStream-DisableAutoRequestWithInitial]
Received {} chunks", count);
+ }
+
+ // Request more after processing - using member variable
directly
+ if (requestStream != null) {
+ requestStream.request(1);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[BiStream-DisableAutoRequestWithInitial] Error:
{}", throwable.getMessage());
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("[BiStream-DisableAutoRequestWithInitial]
Completed, sent: {}, received: {}",
+ sent.get(), received.get());
+ latch.countDown();
+ }
+ };
+
+ // Start the stream - beforeStart() is called inside, which sets up
all backpressure configuration
+ StreamObserver<DataChunk> requestObserver =
service.biStream(responseObserver);
+
+ // Note: We don't configure anything here because it's already done in
beforeStart()
+
+ boolean completed = latch.await(60, TimeUnit.SECONDS);
+ Assert.assertTrue("BiStream should complete within timeout",
completed);
+ Assert.assertEquals("Should send all chunks", sendCount, sent.get());
+ LOGGER.info("✅ BiStream with disableAutoRequestWithInitial test
passed! sent={}, received={}",
+ sent.get(), received.get());
+ }
+
+ /**
+ * Test bidirectional streaming with SERVER-SIDE full backpressure control.
+ * Server uses both send backpressure (setOnReadyHandler) and receive
backpressure (disableAutoRequest).
+ * This demonstrates the complete SERVER-SIDE backpressure pattern ().
+ */
+ @Test
+ @SuppressWarnings("unchecked")
+ public void testBiStreamWithServerFullBackpressure() throws
InterruptedException {
+ final int sendCount = 50;
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicInteger sent = new AtomicInteger(0);
+ final AtomicInteger received = new AtomicInteger(0);
+ final AtomicBoolean sendCompleted = new AtomicBoolean(false);
+ final byte[] data = new byte[1024];
+
+ // Use ClientResponseObserver to configure client-side send
backpressure
+ ClientResponseObserver<DataChunk, DataChunk> responseObserver =
+ new ClientResponseObserver<DataChunk, DataChunk>() {
+ private ClientCallStreamObserver<DataChunk> requestStream;
+
+ @Override
+ public void beforeStart(ClientCallStreamObserver<DataChunk>
requestStream) {
+ this.requestStream = requestStream;
+
+ // Disable auto flow control for manual send control
+ requestStream.disableAutoFlowControl();
+
+ // Set onReadyHandler for client-side send backpressure
+ requestStream.setOnReadyHandler(() -> {
+ LOGGER.info("[BiStream-ServerFullBackpressure] Client
onReadyHandler triggered, isReady={}, sent={}",
+ requestStream.isReady(), sent.get());
+ while (requestStream.isReady() && sent.get() < sendCount
&& !sendCompleted.get()) {
+ int seq = sent.getAndIncrement();
+ requestStream.onNext(new DataChunk(seq, data,
System.currentTimeMillis()));
+ }
+
+ if (sent.get() >= sendCount &&
!sendCompleted.getAndSet(true)) {
+ requestStream.onCompleted();
+ LOGGER.info("[BiStream-ServerFullBackpressure] Client
onCompleted called");
+ }
+ });
+
+ LOGGER.info("[BiStream-ServerFullBackpressure] Client
beforeStart configured");
+ }
+
+ @Override
+ public void onNext(DataChunk chunk) {
+ int count = received.incrementAndGet();
+ if (count % 10 == 0) {
+ LOGGER.info("[BiStream-ServerFullBackpressure] Client
received {} response chunks", count);
+ }
+ }
+
+ @Override
+ public void onError(Throwable throwable) {
+ LOGGER.error("[BiStream-ServerFullBackpressure] Error: {}",
throwable.getMessage());
+ latch.countDown();
+ }
+
+ @Override
+ public void onCompleted() {
+ LOGGER.info("[BiStream-ServerFullBackpressure] Completed,
sent: {}, received: {}",
+ sent.get(), received.get());
+ latch.countDown();
+ }
+ };
+
+ // Start the stream - server will use full backpressure control:
+ // - setOnReadyHandler() for send backpressure
+ // - disableAutoRequest() + request() for receive backpressure
+ StreamObserver<DataChunk> requestObserver =
service.biStream(responseObserver);
+
+ boolean completed = latch.await(60, TimeUnit.SECONDS);
+ Assert.assertTrue("BiStream should complete within timeout",
completed);
+ Assert.assertEquals("Should send all chunks", sendCount, sent.get());
+ Assert.assertTrue("Should receive response chunks", received.get() >
0);
+ LOGGER.info("✅ BiStream with server full backpressure test passed!
sent={}, received={}",
+ sent.get(), received.get());
+ }
+}
diff --git a/2-advanced/pom.xml b/2-advanced/pom.xml
index 5cefff4d7..eb8c61cb7 100644
--- a/2-advanced/pom.xml
+++ b/2-advanced/pom.xml
@@ -73,5 +73,6 @@
<module>dubbo-samples-multiple-protocols</module>
<module>dubbo-samples-triple-websocket</module>
<module>dubbo-samples-broadcast</module>
+ <module>dubbo-samples-triple-backpressure</module>
</modules>
</project>
diff --git a/test/scripts/filter-build-modules.sh
b/test/scripts/filter-build-modules.sh
new file mode 100755
index 000000000..c385dc6e5
--- /dev/null
+++ b/test/scripts/filter-build-modules.sh
@@ -0,0 +1,303 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script filters modules based on dubbo.version configuration
+# in case-versions.conf files. The logic is consistent with Java
VersionMatcher.
+#
+# Usage: ./filter-build-modules.sh <dubbo_version>
+# Output: Maven -pl exclude list (e.g., "-pl !module1,!module2")
+#
+# Supported dubbo.version formats (same as Java VersionMatcher):
+# dubbo.version=2.7*, 3.* -> wildcard match
+# dubbo.version=[ >= 3.3.6 ] -> range match
+# dubbo.version=3.3.* -> wildcard match
+# dubbo.version=!2.7.8* -> exclusion
+# dubbo.version=>=2.7.7 <3.0 -> combined range
+
+set -e
+
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+BASE_DIR="$( cd "$SCRIPT_DIR/../.." && pwd )"
+
+DUBBO_VERSION=$1
+
+if [ -z "$DUBBO_VERSION" ]; then
+ echo "Usage: $0 <dubbo_version>" >&2
+ echo "Example: $0 3.2.6" >&2
+ exit 1
+fi
+
+# Trim version suffix like '-SNAPSHOT'
+trim_version() {
+ local version=$1
+ echo "$version" | sed 's/-.*$//'
+}
+
+# Convert version string to comparable array (e.g., "3.2.6" -> "3 2 6 0")
+# Returns space-separated integers for comparison
+version_to_ints() {
+ local version=$1
+ version=$(trim_version "$version")
+
+ local IFS='.'
+ read -ra parts <<< "$version"
+
+ local result=""
+ for i in 0 1 2 3; do
+ if [ $i -lt ${#parts[@]} ]; then
+ # Extract numeric part only
+ local num=$(echo "${parts[$i]}" | grep -oE '^[0-9]+' || echo "0")
+ result="$result $num"
+ else
+ result="$result 0"
+ fi
+ done
+ echo $result
+}
+
+# Compare two versions: returns 0 if v1 > v2, 1 if v1 < v2, 2 if equal
+compare_versions() {
+ local v1_ints=($1)
+ local v2_ints=($2)
+
+ for i in 0 1 2 3; do
+ if [ "${v1_ints[$i]}" -gt "${v2_ints[$i]}" ]; then
+ return 0 # v1 > v2
+ elif [ "${v1_ints[$i]}" -lt "${v2_ints[$i]}" ]; then
+ return 1 # v1 < v2
+ fi
+ done
+ return 2 # equal
+}
+
+# Check if version matches a wildcard pattern (e.g., "3.*", "2.7*")
+match_wildcard() {
+ local version=$1
+ local pattern=$2
+
+ # Convert wildcard pattern to regex
+ # "3.*" -> "^3\..*$", "2.7*" -> "^2\.7.*$"
+ local regex=$(echo "$pattern" | sed 's/\./\\./g' | sed 's/\*/.*/g')
+ regex="^${regex}$"
+
+ if [[ "$version" =~ $regex ]]; then
+ return 0 # match
+ else
+ return 1 # no match
+ fi
+}
+
+# Check if version satisfies a range condition (e.g., ">=3.3.6", "<3.0")
+match_range() {
+ local version=$1
+ local operator=$2
+ local target=$3
+
+ local v_ints=$(version_to_ints "$version")
+ local t_ints=$(version_to_ints "$target")
+
+ compare_versions "$v_ints" "$t_ints"
+ local cmp=$?
+
+ case "$operator" in
+ ">=")
+ [ $cmp -eq 0 ] || [ $cmp -eq 2 ]
+ return $?
+ ;;
+ ">")
+ [ $cmp -eq 0 ]
+ return $?
+ ;;
+ "<=")
+ [ $cmp -eq 1 ] || [ $cmp -eq 2 ]
+ return $?
+ ;;
+ "<")
+ [ $cmp -eq 1 ]
+ return $?
+ ;;
+ esac
+ return 1
+}
+
+# Parse and check a single rule against a version
+# Returns: 0=included, 1=excluded, 2=no match
+check_rule() {
+ local version=$1
+ local rule=$2
+ local excluded=0
+
+ # Trim leading/trailing whitespace first
+ rule=$(echo "$rule" | xargs)
+
+ # Check for exclusion prefix
+ if [[ "$rule" == !* ]]; then
+ excluded=1
+ rule="${rule:1}"
+ rule=$(echo "$rule" | xargs) # trim again after removing !
+ fi
+
+ # Check for range operators (>=, >, <=, <)
+ if [[ "$rule" =~ ^(>=|>|<=|<)[[:space:]]*([0-9]+\.[0-9]+(\.[0-9]+)?) ]];
then
+ local operator="${BASH_REMATCH[1]}"
+ local target="${BASH_REMATCH[2]}"
+
+ if match_range "$version" "$operator" "$target"; then
+ [ $excluded -eq 1 ] && return 1 || return 0
+ fi
+ return 2
+ fi
+
+ # Check for wildcard
+ if [[ "$rule" == *"*"* ]]; then
+ if match_wildcard "$version" "$rule"; then
+ [ $excluded -eq 1 ] && return 1 || return 0
+ fi
+ return 2
+ fi
+
+ # Plain version match
+ local trimmed_version=$(trim_version "$version")
+ if [ "$trimmed_version" == "$rule" ]; then
+ [ $excluded -eq 1 ] && return 1 || return 0
+ fi
+
+ return 2
+}
+
+# Check if a version matches a set of rules (from dubbo.version line)
+# Returns: 0 if version is included, 1 if excluded or no match
+check_version_rules() {
+ local version=$1
+ local rules_str=$2
+
+ # Trim leading/trailing whitespace
+ rules_str=$(echo "$rules_str" | xargs)
+
+ # Remove brackets [ ]
+ rules_str=$(echo "$rules_str" | sed 's/^\[//' | sed 's/\]$//' | xargs)
+
+ # Handle combined range rules like ">=2.7.7 <3.0" (space-separated without
comma)
+ # First, check if it's a combined range (contains space but no comma
between operators)
+ if [[ "$rules_str" =~
^[[:space:]]*(\>=|\>|\<=|\<)[^,]+[[:space:]]+(\>=|\>|\<=|\<) ]]; then
+ # Combined range rule - all conditions must match
+ local all_match=true
+ while [[ "$rules_str" =~ (\>=|\>|\<=|\<)([^[:space:]\>=\<]+) ]]; do
+ local operator="${BASH_REMATCH[1]}"
+ local target="${BASH_REMATCH[2]}"
+ # Remove quotes
+ target=$(echo "$target" | tr -d "\"'")
+
+ if ! match_range "$version" "$operator" "$target"; then
+ all_match=false
+ break
+ fi
+ # Remove matched part
+ rules_str="${rules_str#*${BASH_REMATCH[0]}}"
+ done
+
+ if $all_match; then
+ return 0
+ fi
+ return 1
+ fi
+
+ # Split by comma for multiple rules
+ local included=false
+
+ IFS=',' read -ra rules <<< "$rules_str"
+ for rule in "${rules[@]}"; do
+ # Remove quotes
+ rule=$(echo "$rule" | tr -d "\"'" | xargs)
+
+ # Skip empty rules
+ [ -z "$rule" ] && continue
+
+ # Check for combined range within a single rule (e.g., ">=2.7.7 <3.0")
+ if [[ "$rule" =~
^[[:space:]]*(\>=|\>|\<=|\<).+[[:space:]]+(\>=|\>|\<=|\<) ]]; then
+ local all_match=true
+ local temp_rule="$rule"
+ while [[ "$temp_rule" =~ (\>=|\>|\<=|\<)([^[:space:]\>=\<]+) ]]; do
+ local operator="${BASH_REMATCH[1]}"
+ local target="${BASH_REMATCH[2]}"
+
+ if ! match_range "$version" "$operator" "$target"; then
+ all_match=false
+ break
+ fi
+ temp_rule="${temp_rule#*${BASH_REMATCH[0]}}"
+ done
+
+ if $all_match; then
+ included=true
+ fi
+ continue
+ fi
+
+ check_rule "$version" "$rule"
+ local result=$?
+
+ if [ $result -eq 1 ]; then
+ # Excluded - immediate return
+ return 1
+ elif [ $result -eq 0 ]; then
+ included=true
+ fi
+ done
+
+ if $included; then
+ return 0
+ fi
+ return 1
+}
+
+# Get the trimmed version for matching
+CURRENT_VERSION=$(trim_version "$DUBBO_VERSION")
+
+# Find all case-versions.conf files and check dubbo.version
+EXCLUDED_MODULES=""
+
+while IFS= read -r conf_file; do
+ # Read dubbo.version from the config file
+ RAW_VERSION=$(grep -E "^dubbo\.version\s*=" "$conf_file" 2>/dev/null | sed
's/^dubbo\.version\s*=\s*//' || echo "")
+
+ if [ -n "$RAW_VERSION" ]; then
+ # Check if current version matches the rules
+ if ! check_version_rules "$CURRENT_VERSION" "$RAW_VERSION"; then
+ # Version does not match - exclude this module
+ MODULE_DIR=$(dirname "$conf_file")
+ RELATIVE_PATH="${MODULE_DIR#$BASE_DIR/}"
+
+ if [ -n "$EXCLUDED_MODULES" ]; then
+ EXCLUDED_MODULES="$EXCLUDED_MODULES,$RELATIVE_PATH"
+ else
+ EXCLUDED_MODULES="$RELATIVE_PATH"
+ fi
+
+ echo "Excluding module: $RELATIVE_PATH
(dubbo.version=$RAW_VERSION, current: $CURRENT_VERSION)" >&2
+ fi
+ fi
+done < <(find "$BASE_DIR" -name "case-versions.conf" -type f)
+
+# Output the Maven exclude parameter
+if [ -n "$EXCLUDED_MODULES" ]; then
+ # Format: -pl !module1,!module2
+ echo "-pl !${EXCLUDED_MODULES//,/,!}"
+else
+ echo ""
+fi
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]