This is an automated email from the ASF dual-hosted git repository.

earthchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo-samples.git


The following commit(s) were added to refs/heads/master by this push:
     new fb2f15186 sample for triple backpressure (#1278)
fb2f15186 is described below

commit fb2f151860abe5b4985c15924ad58538912bc841
Author: Wang Chengming <[email protected]>
AuthorDate: Thu Jan 8 17:56:20 2026 +0800

    sample for triple backpressure (#1278)
    
    * sample for triple backpressure
    
    * sample for triple backpressure
    
    * sample for triple backpressure
    
    * sample for triple backpressure
    
    * sample for triple backpressure
    
    * sample for triple backpressure
    
    * Add dubbo version comment in pom.xml
    
    Updated dubbo.version to 3.3.7-SNAPSHOT in pom.xml
    
    * Migrate log4j.properties to log4j2.xml format
    
    * add it
    
    * change grpc pattern api
    
    * filter build version
    
    * filter build version
    
    * filter build version
    
    ---------
    
    Co-authored-by: chengming.wang <[email protected]>
    Co-authored-by: zrlw <[email protected]>
    Co-authored-by: earthchen <[email protected]>
    Co-authored-by: earthchen <[email protected]>
---
 .github/workflows/dubbo-3_2.yml                    |   8 +-
 .github/workflows/dubbo-3_3.yml                    |   8 +-
 .../case-configuration.yml                         |  48 ++
 .../case-versions.conf                             |  23 +
 .../dubbo-samples-triple-backpressure/pom.xml      | 117 ++++
 .../samples/backpressure/BackpressureConsumer.java | 212 ++++++
 .../samples/backpressure/BackpressureProvider.java |  56 ++
 .../backpressure/api/BackpressureService.java      |  60 ++
 .../dubbo/samples/backpressure/api/DataChunk.java  |  82 +++
 .../samples/backpressure/api/StreamRequest.java    |  68 ++
 .../samples/backpressure/api/StreamResponse.java   |  82 +++
 .../backpressure/impl/BackpressureServiceImpl.java | 363 +++++++++++
 .../src/main/resources/log4j2.xml                  |  31 +
 .../dubbo/samples/backpressure/BackpressureIT.java | 720 +++++++++++++++++++++
 2-advanced/pom.xml                                 |   1 +
 test/scripts/filter-build-modules.sh               | 303 +++++++++
 16 files changed, 2180 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/dubbo-3_2.yml b/.github/workflows/dubbo-3_2.yml
index 647073140..1d38e91ab 100644
--- a/.github/workflows/dubbo-3_2.yml
+++ b/.github/workflows/dubbo-3_2.yml
@@ -59,9 +59,15 @@ jobs:
         with:
           distribution: 'zulu'
           java-version: 21
+      - name: Filter modules by Dubbo version
+        id: filter-modules
+        run: |
+          EXCLUDED=$(bash ./test/scripts/filter-build-modules.sh ${{ 
needs.build-dubbo.outputs.version }})
+          echo "excluded_modules=$EXCLUDED" >> $GITHUB_OUTPUT
+          echo "Excluded modules: $EXCLUDED"
       - name: Build with Maven
         run: |
-          ./mvnw $BUILD_OPTS
+          ./mvnw $BUILD_OPTS ${{ steps.filter-modules.outputs.excluded_modules 
}}
       - name: Clean with Maven
         run: |
           ./mvnw --batch-mode --no-snapshot-updates --no-transfer-progress 
--settings ${{github.workspace}}/.mvn/settings.xml clean
diff --git a/.github/workflows/dubbo-3_3.yml b/.github/workflows/dubbo-3_3.yml
index 85ef3ff35..6f56312be 100644
--- a/.github/workflows/dubbo-3_3.yml
+++ b/.github/workflows/dubbo-3_3.yml
@@ -62,9 +62,15 @@ jobs:
         with:
           distribution: 'zulu'
           java-version: 21
+      - name: Filter modules by Dubbo version
+        id: filter-modules
+        run: |
+          EXCLUDED=$(bash ./test/scripts/filter-build-modules.sh ${{ 
needs.build-dubbo.outputs.version }})
+          echo "excluded_modules=$EXCLUDED" >> $GITHUB_OUTPUT
+          echo "Excluded modules: $EXCLUDED"
       - name: Build with Maven
         run: |
-          ./mvnw $BUILD_OPTS
+          ./mvnw $BUILD_OPTS ${{ steps.filter-modules.outputs.excluded_modules 
}}
       - name: Clean with Maven
         run: |
           ./mvnw --batch-mode --no-snapshot-updates --no-transfer-progress 
--settings ${{github.workspace}}/.mvn/settings.xml clean
diff --git 
a/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml 
b/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml
new file mode 100644
index 000000000..396ffb97b
--- /dev/null
+++ b/2-advanced/dubbo-samples-triple-backpressure/case-configuration.yml
@@ -0,0 +1,48 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+services:
+  zookeeper:
+    image: zookeeper:latest
+
+  dubbo-samples-triple-backpressure:
+    type: app
+    basedir: .
+    mainClass: org.apache.dubbo.samples.backpressure.BackpressureProvider
+    systemProps:
+      - zookeeper.address=zookeeper
+      - zookeeper.port=2181
+    waitPortsBeforeRun:
+      - zookeeper:2181
+    checkPorts:
+      - 50051
+    checkLog: "BackpressureProvider started"
+
+  dubbo-samples-triple-backpressure-test:
+    type: test
+    basedir: .
+    tests:
+      - "**/*IT.class"
+    systemProps:
+      - zookeeper.address=zookeeper
+      - zookeeper.port=2181
+      - dubbo.address=dubbo-samples-triple-backpressure
+      - dubbo.port=50051
+    waitPortsBeforeRun:
+      - zookeeper:2181
+      - dubbo-samples-triple-backpressure:50051
+    depends_on:
+      - dubbo-samples-triple-backpressure
diff --git a/2-advanced/dubbo-samples-triple-backpressure/case-versions.conf 
b/2-advanced/dubbo-samples-triple-backpressure/case-versions.conf
new file mode 100644
index 000000000..709a6f8dc
--- /dev/null
+++ b/2-advanced/dubbo-samples-triple-backpressure/case-versions.conf
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# Supported component versions of the test case
+
+# Spring app
+dubbo.version=[ > 3.3.6 ]
+spring.version=6.*
+java.version= [>= 8]
diff --git a/2-advanced/dubbo-samples-triple-backpressure/pom.xml 
b/2-advanced/dubbo-samples-triple-backpressure/pom.xml
new file mode 100644
index 000000000..c38992247
--- /dev/null
+++ b/2-advanced/dubbo-samples-triple-backpressure/pom.xml
@@ -0,0 +1,117 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~  Licensed to the Apache Software Foundation (ASF) under one or more
+  ~  contributor license agreements.  See the NOTICE file distributed with
+  ~  this work for additional information regarding copyright ownership.
+  ~  The ASF licenses this file to You under the Apache License, Version 2.0
+  ~  (the "License"); you may not use this file except in compliance with
+  ~  the License.  You may obtain a copy of the License at
+  ~
+  ~      http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~  Unless required by applicable law or agreed to in writing, software
+  ~  distributed under the License is distributed on an "AS IS" BASIS,
+  ~  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~  See the License for the specific language governing permissions and
+  ~  limitations under the License.
+  -->
+
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+      <groupId>org.apache</groupId>
+      <artifactId>apache</artifactId>
+      <version>23</version>
+      <relativePath/>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>dubbo-samples-triple-backpressure</artifactId>
+
+    <properties>
+        <source.level>1.8</source.level>
+        <target.level>1.8</target.level>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <!-- it should be replaced by release version after dubbo 3.3.7 is 
released. -->
+        <dubbo.version>3.3.7-SNAPSHOT</dubbo.version>
+        <log4j2.version>2.20.0</log4j2.version>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.dubbo</groupId>
+            <artifactId>dubbo</artifactId>
+            <version>${dubbo.version}</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.dubbo</groupId>
+          <artifactId>dubbo-zookeeper-curator5-spring-boot-starter</artifactId>
+          <version>${dubbo.version}</version>
+        </dependency>
+        <!-- Embedded Zookeeper Server Dependencies -->
+        <dependency>
+          <groupId>io.dropwizard.metrics</groupId>
+          <artifactId>metrics-core</artifactId>
+          <version>4.1.12.1</version>
+        </dependency>
+        <dependency>
+          <groupId>org.xerial.snappy</groupId>
+          <artifactId>snappy-java</artifactId>
+          <version>1.1.10.5</version>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.logging.log4j</groupId>
+          <artifactId>log4j-slf4j-impl</artifactId>
+          <version>${log4j2.version}</version>
+        </dependency>
+        <!-- Test dependencies -->
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.13.2</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <profiles>
+        <!-- For jdk 11 above JavaEE annotation -->
+        <profile>
+            <id>javax.annotation</id>
+            <activation>
+                <jdk>[1.11,)</jdk>
+            </activation>
+            <dependencies>
+                <dependency>
+                    <groupId>javax.annotation</groupId>
+                    <artifactId>javax.annotation-api</artifactId>
+                    <version>1.3.2</version>
+                </dependency>
+            </dependencies>
+        </profile>
+    </profiles>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <version>3.8.1</version>
+                <configuration>
+                    <source>${source.level}</source>
+                    <target>${target.level}</target>
+                </configuration>
+            </plugin>
+            <!-- Skip remote-resources-plugin from parent pom -->
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-remote-resources-plugin</artifactId>
+                <configuration>
+                    <skip>true</skip>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
+
diff --git 
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureConsumer.java
 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureConsumer.java
new file mode 100644
index 000000000..d49de51a1
--- /dev/null
+++ 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureConsumer.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.ReferenceConfig;
+import org.apache.dubbo.common.stream.ClientCallStreamObserver;
+import org.apache.dubbo.config.RegistryConfig;
+import org.apache.dubbo.config.bootstrap.DubboBootstrap;
+import org.apache.dubbo.samples.backpressure.api.BackpressureService;
+import org.apache.dubbo.samples.backpressure.api.DataChunk;
+import org.apache.dubbo.samples.backpressure.api.StreamRequest;
+import org.apache.dubbo.samples.backpressure.api.StreamResponse;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BackpressureConsumer {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BackpressureConsumer.class);
+
+    private static final String ZOOKEEPER_HOST = 
System.getProperty("zookeeper.address", "127.0.0.1");
+    private static final String ZOOKEEPER_PORT = 
System.getProperty("zookeeper.port", "2181");
+
+    public static void main(String[] args) throws Exception {
+        ReferenceConfig<BackpressureService> reference = new 
ReferenceConfig<>();
+        reference.setInterface(BackpressureService.class);
+        reference.setProtocol("tri");
+
+        String zkAddress = "zookeeper://" + ZOOKEEPER_HOST + ":" + 
ZOOKEEPER_PORT;
+        LOGGER.info("Using ZooKeeper: {}", zkAddress);
+
+        DubboBootstrap bootstrap = DubboBootstrap.getInstance();
+        bootstrap.application(new ApplicationConfig("backpressure-consumer"))
+                .registry(new RegistryConfig(zkAddress))
+                .reference(reference)
+                .start();
+
+        BackpressureService service = reference.get();
+
+        LOGGER.info("=== Test 1: Basic Echo ===");
+        testEcho(service);
+
+        LOGGER.info("\n=== Test 2: Client-side Backpressure (clientStream) 
===");
+        testClientSideBackpressure(service);
+
+        LOGGER.info("\n=== Test 3: Server-side Backpressure (serverStream) 
===");
+        testServerSideBackpressure(service);
+
+        LOGGER.info("\n✅ All tests completed!");
+        System.exit(0);
+    }
+
+    /**
+     * Test basic echo functionality.
+     */
+    public static void testEcho(BackpressureService service) {
+        String response = service.echo("Hello Backpressure");
+        LOGGER.info("Echo response: {}", response);
+    }
+
+    /**
+     * Test server-side backpressure.
+     * Server uses isReady() and setOnReadyHandler() to control sending rate.
+     */
+    public static void testServerSideBackpressure(BackpressureService service) 
throws InterruptedException {
+        final int expectedCount = 50;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger received = new AtomicInteger(0);
+
+        StreamRequest request = new StreamRequest(expectedCount, 1024);
+
+        LOGGER.info("[Server-Backpressure] Requesting {} chunks from server", 
expectedCount);
+
+        service.serverStream(request, new StreamObserver<DataChunk>() {
+            @Override
+            public void onNext(DataChunk chunk) {
+                int count = received.incrementAndGet();
+                if (count % 10 == 0) {
+                    LOGGER.info("[Server-Backpressure] Received {} chunks", 
count);
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[Server-Backpressure] Error: {}", 
throwable.getMessage());
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                LOGGER.info("[Server-Backpressure] Stream completed, total: 
{}", received.get());
+                latch.countDown();
+            }
+        });
+
+        latch.await(60, TimeUnit.SECONDS);
+        LOGGER.info("[Server-Backpressure] Test finished, received: {}", 
received.get());
+    }
+
+    /**
+     * Test client-side backpressure.
+     *
+     * <p>Client uses CallStreamObserver's isReady() and setOnReadyHandler()
+     * to control sending rate based on transport layer capacity.
+     */
+    public static void testClientSideBackpressure(BackpressureService service) 
throws InterruptedException {
+        final int sendCount = 100;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger sent = new AtomicInteger(0);
+        final AtomicBoolean completed = new AtomicBoolean(false);
+        final byte[] data = new byte[1024];
+
+        LOGGER.info("[Client-Backpressure] Sending {} chunks to server using 
backpressure", sendCount);
+
+        // Response observer
+        StreamObserver<StreamResponse> responseObserver = new 
StreamObserver<StreamResponse>() {
+            @Override
+            public void onNext(StreamResponse response) {
+                LOGGER.info("[Client-Backpressure] Server received: {} chunks, 
{} bytes in {}ms",
+                        response.getTotalChunks(), response.getTotalBytes(), 
response.getDurationMs());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[Client-Backpressure] Error: {}", 
throwable.getMessage());
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                LOGGER.info("[Client-Backpressure] Response completed");
+                latch.countDown();
+            }
+        };
+
+        // Get request StreamObserver
+        StreamObserver<DataChunk> requestObserver = 
service.clientStream(responseObserver);
+
+        // Cast to ClientCallStreamObserver interface to use backpressure API 
for client side
+        ClientCallStreamObserver<DataChunk> callObserver = 
(ClientCallStreamObserver<DataChunk>) requestObserver;
+
+        // Disable auto flow control for manual backpressure
+        callObserver.disableAutoFlowControl();
+        LOGGER.info("[Client-Backpressure] Disabled auto flow control");
+
+        // Set ready callback - called when stream becomes writable
+        callObserver.setOnReadyHandler(() -> {
+            LOGGER.info("[Client-Backpressure] onReadyHandler triggered, sent 
so far: {}", sent.get());
+
+            // Send data while stream is ready and we have more to send
+            while (callObserver.isReady() && sent.get() < sendCount && 
!completed.get()) {
+                int seq = sent.getAndIncrement();
+                DataChunk chunk = new DataChunk(seq, data, 
System.currentTimeMillis());
+                callObserver.onNext(chunk);
+
+                if ((seq + 1) % 20 == 0) {
+                    LOGGER.info("[Client-Backpressure] Sent {} chunks 
(isReady={})", seq + 1, callObserver.isReady());
+                }
+            }
+
+            // Complete if all data sent
+            if (sent.get() >= sendCount && !completed.getAndSet(true)) {
+                callObserver.onCompleted();
+                LOGGER.info("[Client-Backpressure] Completed via 
onReadyHandler, total: {}", sent.get());
+            }
+        });
+
+        // Initial send (if writable)
+        LOGGER.info("[Client-Backpressure] Initial send, isReady={}", 
callObserver.isReady());
+        while (callObserver.isReady() && sent.get() < sendCount) {
+            int seq = sent.getAndIncrement();
+            DataChunk chunk = new DataChunk(seq, data, 
System.currentTimeMillis());
+            callObserver.onNext(chunk);
+
+            if ((seq + 1) % 20 == 0) {
+                LOGGER.info("[Client-Backpressure] Initial sent {} chunks", 
seq + 1);
+            }
+        }
+
+        // Complete if all sent in initial phase
+        if (sent.get() >= sendCount && !completed.getAndSet(true)) {
+            callObserver.onCompleted();
+            LOGGER.info("[Client-Backpressure] All sent in initial phase, 
total: {}", sent.get());
+        } else {
+            LOGGER.info("[Client-Backpressure] Paused at {} chunks, waiting 
for onReadyHandler", sent.get());
+        }
+
+        latch.await(120, TimeUnit.SECONDS);
+        LOGGER.info("[Client-Backpressure] Test finished, total sent: {}", 
sent.get());
+    }
+}
diff --git 
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureProvider.java
 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureProvider.java
new file mode 100644
index 000000000..a4f214a8f
--- /dev/null
+++ 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/BackpressureProvider.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure;
+
+import org.apache.dubbo.common.constants.CommonConstants;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.ProtocolConfig;
+import org.apache.dubbo.config.RegistryConfig;
+import org.apache.dubbo.config.ServiceConfig;
+import org.apache.dubbo.config.bootstrap.DubboBootstrap;
+import org.apache.dubbo.samples.backpressure.api.BackpressureService;
+import org.apache.dubbo.samples.backpressure.impl.BackpressureServiceImpl;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BackpressureProvider {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BackpressureProvider.class);
+
+    private static final String ZOOKEEPER_HOST = 
System.getProperty("zookeeper.address", "127.0.0.1");
+    private static final String ZOOKEEPER_PORT = 
System.getProperty("zookeeper.port", "2181");
+
+    public static void main(String[] args) {
+        ServiceConfig<BackpressureService> service = new ServiceConfig<>();
+        service.setInterface(BackpressureService.class);
+        service.setRef(new BackpressureServiceImpl());
+
+        String zkAddress = "zookeeper://" + ZOOKEEPER_HOST + ":" + 
ZOOKEEPER_PORT;
+        LOGGER.info("Using ZooKeeper: {}", zkAddress);
+
+        DubboBootstrap bootstrap = DubboBootstrap.getInstance();
+        bootstrap.application(new ApplicationConfig("backpressure-provider"))
+                .registry(new RegistryConfig(zkAddress))
+                .protocol(new ProtocolConfig(CommonConstants.TRIPLE, 50051))
+                .service(service)
+                .start();
+
+        LOGGER.info("BackpressureProvider started on port 50051, waiting for 
requests...");
+        bootstrap.await();
+    }
+}
diff --git 
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/BackpressureService.java
 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/BackpressureService.java
new file mode 100644
index 000000000..f97b63414
--- /dev/null
+++ 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/BackpressureService.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure.api;
+
+import org.apache.dubbo.common.stream.StreamObserver;
+
+/**
+ * Service interface for demonstrating backpressure with Triple protocol.
+ */
+public interface BackpressureService {
+
+    /**
+     * Unary call - for verifying basic functionality is not affected.
+     */
+    String echo(String message);
+
+    /**
+     * Server streaming - server sends multiple responses.
+     * Used to test client-side backpressure.
+     *
+     * @param request stream request containing count and chunk size
+     * @param responseObserver observer to receive chunks
+     */
+    void serverStream(StreamRequest request, StreamObserver<DataChunk> 
responseObserver);
+
+    /**
+     * Client streaming - client sends multiple requests.
+     * Used to test server-side backpressure.
+     *
+     * @param responseObserver observer to receive final response
+     * @return observer to send chunks
+     */
+    StreamObserver<DataChunk> clientStream(StreamObserver<StreamResponse> 
responseObserver);
+
+    /**
+     * Bidirectional streaming - both sides send and receive.
+     * Used to test bidirectional backpressure.
+     *
+     * @param responseObserver observer to receive response chunks
+     * @return observer to send request chunks
+     */
+    StreamObserver<DataChunk> biStream(StreamObserver<DataChunk> 
responseObserver);
+
+}
+
+
diff --git 
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/DataChunk.java
 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/DataChunk.java
new file mode 100644
index 000000000..45d04cd17
--- /dev/null
+++ 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/DataChunk.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure.api;
+
+import java.io.Serializable;
+
+/**
+ * A chunk of data in the stream.
+ */
+public class DataChunk implements Serializable {
+
+    private static final long serialVersionUID = -1205830810422530386L;
+
+    /**
+     * Sequence number of this chunk.
+     */
+    private int sequenceNumber;
+
+    /**
+     * The data payload.
+     */
+    private byte[] data;
+
+    /**
+     * Timestamp when this chunk was created (for latency measurement).
+     */
+    private long timestamp;
+
+    public DataChunk() {
+    }
+
+    public DataChunk(int sequenceNumber, byte[] data, long timestamp) {
+        this.sequenceNumber = sequenceNumber;
+        this.data = data;
+        this.timestamp = timestamp;
+    }
+
+    public int getSequenceNumber() {
+        return sequenceNumber;
+    }
+
+    public void setSequenceNumber(int sequenceNumber) {
+        this.sequenceNumber = sequenceNumber;
+    }
+
+    public byte[] getData() {
+        return data;
+    }
+
+    public void setData(byte[] data) {
+        this.data = data;
+    }
+
+    public long getTimestamp() {
+        return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+        this.timestamp = timestamp;
+    }
+
+    @Override
+    public String toString() {
+        return "DataChunk{seq=" + sequenceNumber + ", dataLen=" + (data != 
null ? data.length : 0) + ", ts=" + timestamp + "}";
+    }
+}
+
+
diff --git 
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamRequest.java
 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamRequest.java
new file mode 100644
index 000000000..98835894a
--- /dev/null
+++ 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamRequest.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure.api;
+
+import java.io.Serializable;
+
+/**
+ * Request to start a stream.
+ */
+public class StreamRequest implements Serializable {
+
+    private static final long serialVersionUID = -7146758652012698927L;
+
+    /**
+     * Number of chunks to send.
+     */
+    private int count;
+
+    /**
+     * Size of each chunk data in bytes.
+     */
+    private int chunkSize;
+
+    public StreamRequest() {
+    }
+
+    public StreamRequest(int count, int chunkSize) {
+        this.count = count;
+        this.chunkSize = chunkSize;
+    }
+
+    public int getCount() {
+        return count;
+    }
+
+    public void setCount(int count) {
+        this.count = count;
+    }
+
+    public int getChunkSize() {
+        return chunkSize;
+    }
+
+    public void setChunkSize(int chunkSize) {
+        this.chunkSize = chunkSize;
+    }
+
+    @Override
+    public String toString() {
+        return "StreamRequest{count=" + count + ", chunkSize=" + chunkSize + 
"}";
+    }
+}
+
+
diff --git 
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamResponse.java
 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamResponse.java
new file mode 100644
index 000000000..c77279d08
--- /dev/null
+++ 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/api/StreamResponse.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure.api;
+
+import java.io.Serializable;
+
+/**
+ * Response after stream completes.
+ */
+public class StreamResponse implements Serializable {
+
+    private static final long serialVersionUID = -5423429601473151879L;
+
+    /**
+     * Total number of chunks received.
+     */
+    private int totalChunks;
+
+    /**
+     * Total bytes received.
+     */
+    private long totalBytes;
+
+    /**
+     * Duration in milliseconds.
+     */
+    private long durationMs;
+
+    public StreamResponse() {
+    }
+
+    public StreamResponse(int totalChunks, long totalBytes, long durationMs) {
+        this.totalChunks = totalChunks;
+        this.totalBytes = totalBytes;
+        this.durationMs = durationMs;
+    }
+
+    public int getTotalChunks() {
+        return totalChunks;
+    }
+
+    public void setTotalChunks(int totalChunks) {
+        this.totalChunks = totalChunks;
+    }
+
+    public long getTotalBytes() {
+        return totalBytes;
+    }
+
+    public void setTotalBytes(long totalBytes) {
+        this.totalBytes = totalBytes;
+    }
+
+    public long getDurationMs() {
+        return durationMs;
+    }
+
+    public void setDurationMs(long durationMs) {
+        this.durationMs = durationMs;
+    }
+
+    @Override
+    public String toString() {
+        return "StreamResponse{totalChunks=" + totalChunks + ", totalBytes=" + 
totalBytes + ", durationMs=" + durationMs + "}";
+    }
+}
+
+
diff --git 
a/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/impl/BackpressureServiceImpl.java
 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/impl/BackpressureServiceImpl.java
new file mode 100644
index 000000000..b8b892af8
--- /dev/null
+++ 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/java/org/apache/dubbo/samples/backpressure/impl/BackpressureServiceImpl.java
@@ -0,0 +1,363 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure.impl;
+
+import org.apache.dubbo.common.stream.CallStreamObserver;
+import org.apache.dubbo.common.stream.ServerCallStreamObserver;
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.samples.backpressure.api.BackpressureService;
+import org.apache.dubbo.samples.backpressure.api.DataChunk;
+import org.apache.dubbo.samples.backpressure.api.StreamRequest;
+import org.apache.dubbo.samples.backpressure.api.StreamResponse;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of BackpressureService demonstrating streaming with 
backpressure.
+ */
+public class BackpressureServiceImpl implements BackpressureService {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BackpressureServiceImpl.class);
+
+    @Override
+    public String echo(String message) {
+        LOGGER.info("[Server] Echo received: {}", message);
+        return "Echo: " + message;
+    }
+
+    /**
+     * Server streaming with SERVER-SIDE backpressure.
+     */
+    @Override
+    public void serverStream(StreamRequest request, StreamObserver<DataChunk> 
responseObserver) {
+        int totalCount = request.getCount();
+        int chunkSize = request.getChunkSize();
+        LOGGER.info("[Server-Backpressure] Starting server stream, count={}, 
chunkSize={}", totalCount, chunkSize);
+
+        // Use ServerCallStreamObserver interface for backpressure
+        if (responseObserver instanceof ServerCallStreamObserver) {
+            ServerCallStreamObserver<DataChunk> serverObserver =
+                    (ServerCallStreamObserver<DataChunk>) responseObserver;
+            final AtomicInteger sentCount = new AtomicInteger(0);
+            final AtomicBoolean completed = new AtomicBoolean(false);
+            final byte[] data = new byte[chunkSize];
+
+            // Set ready callback - called when the stream becomes writable
+            serverObserver.setOnReadyHandler(() -> {
+                LOGGER.info("[Server-Backpressure] onReadyHandler triggered, 
sent so far: {}", sentCount.get());
+
+                // Send data while stream is ready and we have more to send
+                while (serverObserver.isReady() && sentCount.get() < 
totalCount && !completed.get()) {
+                    int seq = sentCount.getAndIncrement();
+                    DataChunk chunk = new DataChunk(seq, data, 
System.currentTimeMillis());
+                    serverObserver.onNext(chunk);
+
+                    if ((seq + 1) % 10 == 0) {
+                        LOGGER.info("[Server-Backpressure] Sent {} chunks 
(isReady={})", seq + 1, serverObserver.isReady());
+                    }
+                }
+
+                // Complete if all data sent
+                if (sentCount.get() >= totalCount && 
!completed.getAndSet(true)) {
+                    serverObserver.onCompleted();
+                    LOGGER.info("[Server-Backpressure] Completed via 
onReadyHandler, total: {}", sentCount.get());
+                }
+            });
+
+            // Initial send (if writable)
+            LOGGER.info("[Server-Backpressure] Initial send, isReady={}", 
serverObserver.isReady());
+            while (serverObserver.isReady() && sentCount.get() < totalCount) {
+                int seq = sentCount.getAndIncrement();
+                DataChunk chunk = new DataChunk(seq, data, 
System.currentTimeMillis());
+                serverObserver.onNext(chunk);
+
+                if ((seq + 1) % 10 == 0) {
+                    LOGGER.info("[Server-Backpressure] Initial sent {} 
chunks", seq + 1);
+                }
+            }
+
+            // Complete if all sent in initial phase
+            if (sentCount.get() >= totalCount && !completed.getAndSet(true)) {
+                serverObserver.onCompleted();
+                LOGGER.info("[Server-Backpressure] All sent in initial phase, 
total: {}", sentCount.get());
+            } else {
+                LOGGER.info("[Server-Backpressure] Paused at {} chunks, 
waiting for onReadyHandler", sentCount.get());
+            }
+        }
+
+    }
+
+    /**
+     * Client streaming with SERVER-SIDE receive backpressure.
+     *
+     * <p>This demonstrates how server controls the rate of receiving data 
from client
+     * using {@code disableAutoRequest()} and {@code request(int)} APIs
+     *
+     * <p>Dubbo API reference (aligned with gRPC):
+     * <ul>
+     *   <li>{@code ServerCallStreamObserver.disableAutoRequest()} - Disable 
automatic message requesting</li>
+     *   <li>{@code ServerCallStreamObserver.request(int)} - Manually request 
messages from client</li>
+     * </ul>
+     */
+    @Override
+    public StreamObserver<DataChunk> 
clientStream(StreamObserver<StreamResponse> responseObserver) {
+        LOGGER.info("[Server-ReceiveBackpressure] Client stream started - 
receiving data from client with backpressure");
+        final long startTime = System.currentTimeMillis();
+        final AtomicInteger chunkCount = new AtomicInteger(0);
+        final AtomicLong totalBytes = new AtomicLong(0);
+
+        // Cast to ServerCallStreamObserver for receive backpressure control ()
+        if (responseObserver instanceof ServerCallStreamObserver) {
+            ServerCallStreamObserver<StreamResponse> serverObserver =
+                    (ServerCallStreamObserver<StreamResponse>) 
responseObserver;
+
+            // Disable automatic message requesting - server will manually 
control receive rate
+            serverObserver.disableAutoRequest();
+            LOGGER.info("[Server-ReceiveBackpressure] Disabled auto request, 
server will control receive rate");
+
+            // Request initial batch of messages
+            final int initialRequest = 5;
+            serverObserver.request(initialRequest);
+            LOGGER.info("[Server-ReceiveBackpressure] Requested initial {} 
messages", initialRequest);
+
+            return new StreamObserver<DataChunk>() {
+                @Override
+                public void onNext(DataChunk chunk) {
+                    int count = chunkCount.incrementAndGet();
+                    if (chunk.getData() != null) {
+                        totalBytes.addAndGet(chunk.getData().length);
+                    }
+
+                    if (count % 10 == 0) {
+                        LOGGER.info("[Server-ReceiveBackpressure] Received {} 
chunks from client", count);
+                    }
+
+                    // After processing each message, request the next one
+                    // This is the key to server-side receive backpressure 
control
+                    serverObserver.request(1);
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    LOGGER.error("[Server-ReceiveBackpressure] Client stream 
error: {}", throwable.getMessage());
+                }
+
+                @Override
+                public void onCompleted() {
+                    long duration = System.currentTimeMillis() - startTime;
+                    StreamResponse response = new 
StreamResponse(chunkCount.get(), totalBytes.get(), duration);
+                    responseObserver.onNext(response);
+                    responseObserver.onCompleted();
+                    LOGGER.info("[Server-ReceiveBackpressure] Client stream 
completed: {} chunks, {} bytes in {}ms",
+                            chunkCount.get(), totalBytes.get(), duration);
+                }
+            };
+        }
+
+        // Fallback: basic implementation without backpressure control
+        LOGGER.warn("[Server] Fallback to basic implementation without receive 
backpressure");
+        return new StreamObserver<DataChunk>() {
+            @Override
+            public void onNext(DataChunk chunk) {
+                int count = chunkCount.incrementAndGet();
+                if (chunk.getData() != null) {
+                    totalBytes.addAndGet(chunk.getData().length);
+                }
+                if (count % 10 == 0) {
+                    LOGGER.info("[Server] Received {} chunks from client", 
count);
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[Server] Client stream error: {}", 
throwable.getMessage());
+            }
+
+            @Override
+            public void onCompleted() {
+                long duration = System.currentTimeMillis() - startTime;
+                StreamResponse response = new StreamResponse(chunkCount.get(), 
totalBytes.get(), duration);
+                responseObserver.onNext(response);
+                responseObserver.onCompleted();
+                LOGGER.info("[Server] Client stream completed: {} chunks, {} 
bytes in {}ms",
+                        chunkCount.get(), totalBytes.get(), duration);
+            }
+        };
+    }
+
+    /**
+     * Bidirectional streaming with SERVER-SIDE send and receive backpressure.
+     *
+     * <p>This demonstrates complete server-side backpressure control:
+     * <ul>
+     *   <li>Send backpressure: Using {@code setOnReadyHandler()} and {@code 
isReady()} to control sending rate</li>
+     *   <li>Receive backpressure: Using {@code disableAutoRequest()} and 
{@code request(int)} to control receiving rate</li>
+     * </ul>
+     *
+     * <p>Dubbo API reference (aligned with gRPC):
+     * <ul>
+     *   <li>{@code ServerCallStreamObserver.setOnReadyHandler(Runnable)} - 
Callback when stream becomes writable</li>
+     *   <li>{@code ServerCallStreamObserver.isReady()} - Check if stream is 
ready to accept more data</li>
+     *   <li>{@code ServerCallStreamObserver.disableAutoRequest()} - Disable 
automatic message requesting</li>
+     *   <li>{@code ServerCallStreamObserver.request(int)} - Manually request 
messages from client</li>
+     * </ul>
+     */
+    @Override
+    public StreamObserver<DataChunk> biStream(StreamObserver<DataChunk> 
responseObserver) {
+        LOGGER.info("[Server-BiStream-Backpressure] BiStream started - 
bidirectional streaming with full backpressure control");
+        final AtomicInteger receivedCount = new AtomicInteger(0);
+        final AtomicInteger sentCount = new AtomicInteger(0);
+
+        // Cast to ServerCallStreamObserver for full backpressure control
+        if (responseObserver instanceof ServerCallStreamObserver) {
+            ServerCallStreamObserver<DataChunk> serverObserver =
+                    (ServerCallStreamObserver<DataChunk>) responseObserver;
+
+            // Queue to buffer chunks that couldn't be sent immediately due to 
backpressure
+            java.util.concurrent.ConcurrentLinkedQueue<DataChunk> 
pendingChunks =
+                    new java.util.concurrent.ConcurrentLinkedQueue<>();
+            final AtomicBoolean streamCompleted = new AtomicBoolean(false);
+
+            // === Send Backpressure Control ===
+            // Set onReadyHandler to send pending chunks when stream becomes 
writable
+            serverObserver.setOnReadyHandler(() -> {
+                LOGGER.info("[Server-BiStream-Backpressure] onReadyHandler 
triggered, isReady={}, pending={}",
+                        serverObserver.isReady(), pendingChunks.size());
+
+                // Send pending chunks while stream is ready
+                while (serverObserver.isReady() && !pendingChunks.isEmpty()) {
+                    DataChunk chunk = pendingChunks.poll();
+                    if (chunk != null) {
+                        serverObserver.onNext(chunk);
+                        int sent = sentCount.incrementAndGet();
+                        if (sent % 10 == 0) {
+                            LOGGER.info("[Server-BiStream-Backpressure] Sent 
{} response chunks via onReadyHandler", sent);
+                        }
+                    }
+                }
+
+                // Complete the stream if all chunks sent and stream is 
completed
+                if (streamCompleted.get() && pendingChunks.isEmpty()) {
+                    serverObserver.onCompleted();
+                    LOGGER.info("[Server-BiStream-Backpressure] Completed via 
onReadyHandler: received={}, sent={}",
+                            receivedCount.get(), sentCount.get());
+                }
+            });
+
+            // === Receive Backpressure Control ===
+            // Disable automatic message requesting - server will manually 
control receive rate
+            serverObserver.disableAutoRequest();
+            LOGGER.info("[Server-BiStream-Backpressure] Disabled auto request 
for receive backpressure");
+
+            // Request initial batch of messages
+            final int initialRequest = 5;
+            serverObserver.request(initialRequest);
+            LOGGER.info("[Server-BiStream-Backpressure] Requested initial {} 
messages, configured send backpressure with setOnReadyHandler", initialRequest);
+
+            return new StreamObserver<DataChunk>() {
+                @Override
+                public void onNext(DataChunk chunk) {
+                    int count = receivedCount.incrementAndGet();
+                    if (count % 10 == 0) {
+                        LOGGER.info("[Server-BiStream-Backpressure] Received 
{} chunks, isReady={}",
+                                count, serverObserver.isReady());
+                    }
+
+                    // Prepare response chunk
+                    DataChunk response = new DataChunk(
+                            chunk.getSequenceNumber(),
+                            chunk.getData(),
+                            System.currentTimeMillis()
+                    );
+
+                    // Try to send immediately if stream is ready, otherwise 
buffer
+                    if (serverObserver.isReady()) {
+                        serverObserver.onNext(response);
+                        int sent = sentCount.incrementAndGet();
+                        if (sent % 10 == 0) {
+                            LOGGER.info("[Server-BiStream-Backpressure] Sent 
{} response chunks directly", sent);
+                        }
+                    } else {
+                        // Buffer the chunk - onReadyHandler will send it when 
stream becomes writable
+                        pendingChunks.offer(response);
+                        LOGGER.debug("[Server-BiStream-Backpressure] Buffered 
chunk, pending={}", pendingChunks.size());
+                    }
+
+                    // Request next message (receive backpressure control)
+                    serverObserver.request(1);
+                }
+
+                @Override
+                public void onError(Throwable throwable) {
+                    LOGGER.error("[Server-BiStream-Backpressure] Error: {}", 
throwable.getMessage());
+                }
+
+                @Override
+                public void onCompleted() {
+                    streamCompleted.set(true);
+                    // If all pending chunks sent, complete now; otherwise 
onReadyHandler will complete
+                    if (pendingChunks.isEmpty()) {
+                        serverObserver.onCompleted();
+                        LOGGER.info("[Server-BiStream-Backpressure] Completed 
immediately: received={}, sent={}",
+                                receivedCount.get(), sentCount.get());
+                    } else {
+                        LOGGER.info("[Server-BiStream-Backpressure] Waiting 
for pending chunks to be sent: {}",
+                                pendingChunks.size());
+                    }
+                }
+            };
+        }
+
+        // Fallback: basic implementation without backpressure control
+        LOGGER.warn("[Server-BiStream] Fallback to basic implementation 
without backpressure");
+        CallStreamObserver<DataChunk> callObserver = 
(CallStreamObserver<DataChunk>) responseObserver;
+
+        return new StreamObserver<DataChunk>() {
+            @Override
+            public void onNext(DataChunk chunk) {
+                int count = receivedCount.incrementAndGet();
+                if (count % 10 == 0) {
+                    LOGGER.info("[Server-BiStream] Received {} chunks", count);
+                }
+                DataChunk response = new DataChunk(
+                        chunk.getSequenceNumber(),
+                        chunk.getData(),
+                        System.currentTimeMillis()
+                );
+                callObserver.onNext(response);
+                sentCount.incrementAndGet();
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[Server-BiStream] Error: {}", 
throwable.getMessage());
+            }
+
+            @Override
+            public void onCompleted() {
+                callObserver.onCompleted();
+                LOGGER.info("[Server-BiStream] Completed: received={}, 
sent={}",
+                        receivedCount.get(), sentCount.get());
+            }
+        };
+    }
+}
diff --git 
a/2-advanced/dubbo-samples-triple-backpressure/src/main/resources/log4j2.xml 
b/2-advanced/dubbo-samples-triple-backpressure/src/main/resources/log4j2.xml
new file mode 100644
index 000000000..06efcb47f
--- /dev/null
+++ b/2-advanced/dubbo-samples-triple-backpressure/src/main/resources/log4j2.xml
@@ -0,0 +1,31 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT" follow="true">
+            <PatternLayout pattern="%style{%d{HH:mm:ss.SSS}}{Magenta} 
%style{|-}{White}%highlight{%-5p} [%t] %style{%40.40c}{Cyan}:%style{%-3L}{Blue} 
%style{-|}{White} 
%m%n%rEx{filters(jdk.internal.reflect,java.lang.reflect,sun.reflect)}" 
disableAnsi="false" charset="UTF-8"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="info">
+            <AppenderRef ref="Console"/>
+        </Root>
+    </Loggers>
+</Configuration>
+
+
diff --git 
a/2-advanced/dubbo-samples-triple-backpressure/src/test/java/org/apache/dubbo/samples/backpressure/BackpressureIT.java
 
b/2-advanced/dubbo-samples-triple-backpressure/src/test/java/org/apache/dubbo/samples/backpressure/BackpressureIT.java
new file mode 100644
index 000000000..752e61415
--- /dev/null
+++ 
b/2-advanced/dubbo-samples-triple-backpressure/src/test/java/org/apache/dubbo/samples/backpressure/BackpressureIT.java
@@ -0,0 +1,720 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.samples.backpressure;
+
+import org.apache.dubbo.common.stream.ClientCallStreamObserver;
+import org.apache.dubbo.common.stream.ClientResponseObserver;
+import org.apache.dubbo.common.stream.StreamObserver;
+import org.apache.dubbo.config.ApplicationConfig;
+import org.apache.dubbo.config.ProtocolConfig;
+import org.apache.dubbo.config.ReferenceConfig;
+import org.apache.dubbo.config.RegistryConfig;
+import org.apache.dubbo.config.ServiceConfig;
+import org.apache.dubbo.config.bootstrap.DubboBootstrap;
+import org.apache.dubbo.samples.backpressure.api.BackpressureService;
+import org.apache.dubbo.samples.backpressure.api.DataChunk;
+import org.apache.dubbo.samples.backpressure.api.StreamRequest;
+import org.apache.dubbo.samples.backpressure.api.StreamResponse;
+import org.apache.dubbo.samples.backpressure.impl.BackpressureServiceImpl;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Integration test for Triple protocol backpressure functionality.
+ *
+ * <p>This test covers all streaming scenarios with backpressure APIs:
+ *
+ * <h3>High-Level API (setOnReadyHandler) - Controlling Send Rate</h3>
+ * <ul>
+ *   <li>{@link #testServerStreamWithOnReadyHandler()} - Server uses isReady() 
+ setOnReadyHandler() to control sending</li>
+ *   <li>{@link #testClientStreamWithOnReadyHandler()} - Client uses isReady() 
+ setOnReadyHandler() to control sending</li>
+ *   <li>{@link #testBiStreamWithOnReadyHandler()} - Both sides use 
setOnReadyHandler()</li>
+ * </ul>
+ *
+ * <h3>Low-Level API (disableAutoRequestWithInitial) - Controlling Receive 
Rate (Client-Side)</h3>
+ * <ul>
+ *   <li>{@link #testServerStreamWithManualRequest()} - Client uses 
ClientResponseObserver.beforeStart() to control receive rate</li>
+ *   <li>{@link #testClientStreamWithDisableAutoRequestWithInitial()} - Client 
uses ClientResponseObserver.beforeStart()</li>
+ *   <li>{@link #testBiStreamWithDisableAutoRequestWithInitial()} - Client 
uses ClientResponseObserver.beforeStart()</li>
+ * </ul>
+ *
+ * <h3>Server-Side Backpressure API (disableAutoRequest) - Controlling Receive 
Rate (Server-Side)</h3>
+ * <ul>
+ *   <li>{@link #testClientStreamWithServerReceiveBackpressure()} - Server 
uses disableAutoRequest() + request() to control receive rate</li>
+ *   <li>{@link #testBiStreamWithServerFullBackpressure()} - Server uses full 
backpressure control (send + receive)</li>
+ * </ul>
+ *
+ * <p>Note: For client-side, Dubbo uses {@code 
disableAutoRequestWithInitial(int)} which combines gRPC's
+ * {@code disableAutoRequest()} and {@code request(int)} into a single method 
call.
+ * For server-side, Dubbo uses {@code disableAutoRequest()} followed by {@code 
request(int)} separately.
+ */
+public class BackpressureIT {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(BackpressureIT.class);
+
+    // Use random port to avoid conflicts between consecutive test runs
+    private static final int PORT = 50052 + (int)(Math.random() * 1000);
+
+    private static BackpressureService service;
+    private static DubboBootstrap bootstrap;
+
+    @BeforeClass
+    public static void setup() {
+        // Provider config
+        ServiceConfig<BackpressureService> serviceConfig = new 
ServiceConfig<>();
+        serviceConfig.setInterface(BackpressureService.class);
+        serviceConfig.setRef(new BackpressureServiceImpl());
+
+        // Consumer config with direct connection (no registry needed)
+        ReferenceConfig<BackpressureService> reference = new 
ReferenceConfig<>();
+        reference.setInterface(BackpressureService.class);
+        reference.setUrl("tri://127.0.0.1:" + PORT);
+        reference.setTimeout(60000);
+
+        // Start both provider and consumer using newInstance to avoid 
singleton pollution
+        bootstrap = DubboBootstrap.getInstance();
+        bootstrap.application(new ApplicationConfig("backpressure-test"))
+                .registry(new RegistryConfig("N/A"))  // No registry needed
+                .protocol(new ProtocolConfig("tri", PORT))
+                .service(serviceConfig)
+                .reference(reference)
+                .start();
+
+        service = reference.get();
+        LOGGER.info("Provider and Consumer started on port {}", PORT);
+
+        // Warm up the connection to ensure resources are properly initialized
+        // This prevents timing issues when running individual tests
+        try {
+            service.echo("warmup");
+            LOGGER.info("Connection warmup completed");
+        } catch (Exception e) {
+            LOGGER.warn("Warmup failed, but continuing: {}", e.getMessage());
+        }
+    }
+
+    @AfterClass
+    public static void teardown() {
+        if (bootstrap != null) {
+            bootstrap.stop();
+        }
+    }
+
+    // ==================== Basic Test ====================
+
+    @Test
+    public void testEcho() {
+        String result = service.echo("Hello Backpressure");
+        LOGGER.info("Echo response: {}", result);
+        Assert.assertNotNull(result);
+        Assert.assertTrue(result.contains("Hello Backpressure"));
+    }
+
+    // ==================== Server Stream Tests ====================
+
+    /**
+     * Test server streaming with HIGH-LEVEL API (setOnReadyHandler).
+     * Server uses isReady() and setOnReadyHandler() to control sending rate.
+     * This demonstrates SERVER-SIDE send backpressure.
+     */
+    @Test
+    public void testServerStreamWithOnReadyHandler() throws 
InterruptedException {
+        final int requestCount = 30;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger receivedCount = new AtomicInteger(0);
+
+        StreamRequest request = new StreamRequest(requestCount, 1024);
+
+        service.serverStream(request, new StreamObserver<DataChunk>() {
+            @Override
+            public void onNext(DataChunk chunk) {
+                int count = receivedCount.incrementAndGet();
+                if (count % 10 == 0) {
+                    LOGGER.info("[ServerStream-OnReady] Received {} chunks", 
count);
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[ServerStream-OnReady] Error: {}", 
throwable.getMessage());
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                LOGGER.info("[ServerStream-OnReady] Completed, received: {}", 
receivedCount.get());
+                latch.countDown();
+            }
+        });
+
+        boolean completed = latch.await(30, TimeUnit.SECONDS);
+        Assert.assertTrue("Stream should complete within timeout", completed);
+        Assert.assertEquals("Should receive all chunks", requestCount, 
receivedCount.get());
+        LOGGER.info("✅ Server stream with onReadyHandler test passed!");
+    }
+
+    /**
+     * Test server streaming with LOW-LEVEL API 
(disableAutoRequestWithInitial).
+     * Client uses ClientResponseObserver.beforeStart() to configure receive 
backpressure
+     * BEFORE the stream starts, following gRPC's pattern.
+     * This demonstrates CLIENT-SIDE receive backpressure.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testServerStreamWithManualRequest() throws 
InterruptedException {
+        final int requestCount = 30;
+        final int initialRequest = 5;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger receivedCount = new AtomicInteger(0);
+
+        StreamRequest request = new StreamRequest(requestCount, 1024);
+
+        // Use ClientResponseObserver to configure receive backpressure BEFORE 
the stream starts ()
+        ClientResponseObserver<StreamRequest, DataChunk> responseObserver =
+                new ClientResponseObserver<StreamRequest, DataChunk>() {
+            // Member variable to hold the stream observer for manual request
+            private ClientCallStreamObserver<StreamRequest> requestStream;
+
+            @Override
+            public void beforeStart(ClientCallStreamObserver<StreamRequest> 
requestStream) {
+                this.requestStream = requestStream;
+                // Configure receive backpressure - disable auto request and 
set initial request
+                requestStream.disableAutoRequestWithInitial(initialRequest);
+                LOGGER.info("[ServerStream-ManualRequest] beforeStart: 
configured receive backpressure, initialRequest={}",
+                        initialRequest);
+            }
+
+            @Override
+            public void onNext(DataChunk chunk) {
+                int count = receivedCount.incrementAndGet();
+                if (count % 5 == 0) {
+                    LOGGER.info("[ServerStream-ManualRequest] Received {} 
chunks", count);
+                }
+
+                // After processing each chunk, request more data
+                // This simulates controlled consumption rate
+                if (requestStream != null) {
+                    requestStream.request(1);
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[ServerStream-ManualRequest] Error: {}", 
throwable.getMessage());
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                LOGGER.info("[ServerStream-ManualRequest] Completed, received: 
{}", receivedCount.get());
+                latch.countDown();
+            }
+        };
+
+        // Start the stream - beforeStart() is called inside
+        service.serverStream(request, responseObserver);
+
+        boolean completed = latch.await(60, TimeUnit.SECONDS);
+        Assert.assertTrue("Stream should complete within timeout", completed);
+        Assert.assertEquals("Should receive all chunks", requestCount, 
receivedCount.get());
+        LOGGER.info("✅ Server stream with manual request test passed!");
+    }
+
+    // ==================== Client Stream Tests ====================
+
+    /**
+     * Test client streaming with HIGH-LEVEL API (setOnReadyHandler).
+     * Uses ClientResponseObserver.beforeStart() to configure send backpressure
+     * BEFORE the stream starts, following gRPC's pattern.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testClientStreamWithOnReadyHandler() throws 
InterruptedException {
+        final int sendCount = 50;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger sent = new AtomicInteger(0);
+        final AtomicBoolean sendCompleted = new AtomicBoolean(false);
+        final AtomicInteger serverReceivedChunks = new AtomicInteger(0);
+        final byte[] data = new byte[1024];
+
+        // Use ClientResponseObserver to configure backpressure BEFORE the 
stream starts ()
+        ClientResponseObserver<DataChunk, StreamResponse> responseObserver =
+                new ClientResponseObserver<DataChunk, StreamResponse>() {
+            @Override
+            public void beforeStart(ClientCallStreamObserver<DataChunk> 
requestStream) {
+                LOGGER.info("[ClientStream-OnReady] beforeStart called");
+
+                // Disable auto flow control for manual send control
+                requestStream.disableAutoFlowControl();
+
+                // Set onReadyHandler BEFORE the stream starts
+                requestStream.setOnReadyHandler(() -> {
+                    LOGGER.info("[ClientStream-OnReady] onReadyHandler 
triggered, isReady={}, sent={}",
+                            requestStream.isReady(), sent.get());
+                    while (requestStream.isReady() && sent.get() < sendCount 
&& !sendCompleted.get()) {
+                        int seq = sent.getAndIncrement();
+                        requestStream.onNext(new DataChunk(seq, data, 
System.currentTimeMillis()));
+                    }
+
+                    if (sent.get() >= sendCount && 
!sendCompleted.getAndSet(true)) {
+                        requestStream.onCompleted();
+                        LOGGER.info("[ClientStream-OnReady] onCompleted 
called");
+                    }
+                });
+            }
+
+            @Override
+            public void onNext(StreamResponse response) {
+                serverReceivedChunks.set(response.getTotalChunks());
+                LOGGER.info("[ClientStream-OnReady] Server received: {} 
chunks", response.getTotalChunks());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[ClientStream-OnReady] Error: {}", 
throwable.getMessage());
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                LOGGER.info("[ClientStream-OnReady] Completed");
+                latch.countDown();
+            }
+        };
+
+        // Start the stream - beforeStart() is called inside
+        StreamObserver<DataChunk> requestObserver = 
service.clientStream(responseObserver);
+
+        // Note: We don't configure anything here because it's already done in 
beforeStart()
+
+        boolean completed = latch.await(60, TimeUnit.SECONDS);
+        Assert.assertTrue("Stream should complete within timeout", completed);
+        Assert.assertEquals("Server should receive all chunks", sendCount, 
serverReceivedChunks.get());
+        LOGGER.info("✅ Client stream with onReadyHandler test passed!");
+    }
+
+    /**
+     * Test client streaming with LOW-LEVEL API 
(disableAutoRequestWithInitial).
+     * Uses ClientResponseObserver.beforeStart() to set up both send and 
receive backpressure
+     * BEFORE the stream starts, following gRPC's pattern.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testClientStreamWithDisableAutoRequestWithInitial() throws 
InterruptedException {
+        final int sendCount = 30;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger sent = new AtomicInteger(0);
+        final AtomicInteger serverReceivedChunks = new AtomicInteger(0);
+        final AtomicBoolean responseReceived = new AtomicBoolean(false);
+        final byte[] data = new byte[1024];
+
+        // Use ClientResponseObserver to configure backpressure BEFORE the 
stream starts ()
+        ClientResponseObserver<DataChunk, StreamResponse> responseObserver =
+                new ClientResponseObserver<DataChunk, StreamResponse>() {
+            @Override
+            public void beforeStart(ClientCallStreamObserver<DataChunk> 
requestStream) {
+                LOGGER.info("[ClientStream-DisableAutoRequestWithInitial] 
beforeStart called");
+
+                // Configure receive backpressure
+                requestStream.disableAutoRequestWithInitial(10);
+
+                // Configure send backpressure
+                requestStream.disableAutoFlowControl();
+
+                // Set onReadyHandler BEFORE the stream starts
+                requestStream.setOnReadyHandler(() -> {
+                    LOGGER.info("[ClientStream-DisableAutoRequestWithInitial] 
onReadyHandler triggered, isReady={}, sent={}",
+                            requestStream.isReady(), sent.get());
+                    while (requestStream.isReady() && sent.get() < sendCount) {
+                        int seq = sent.getAndIncrement();
+                        requestStream.onNext(new DataChunk(seq, data, 
System.currentTimeMillis()));
+                    }
+
+                    if (sent.get() >= sendCount) {
+                        requestStream.onCompleted();
+                        
LOGGER.info("[ClientStream-DisableAutoRequestWithInitial] onCompleted called");
+                    }
+                });
+            }
+
+            @Override
+            public void onNext(StreamResponse response) {
+                serverReceivedChunks.set(response.getTotalChunks());
+                responseReceived.set(true);
+                LOGGER.info("[ClientStream-DisableAutoRequestWithInitial] 
Response: {} chunks",
+                        response.getTotalChunks());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[ClientStream-DisableAutoRequestWithInitial] 
Error: {}", throwable.getMessage());
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                LOGGER.info("[ClientStream-DisableAutoRequestWithInitial] 
Completed");
+                latch.countDown();
+            }
+        };
+
+        // Start the stream - beforeStart() is called inside
+        StreamObserver<DataChunk> requestObserver = 
service.clientStream(responseObserver);
+
+        // Note: We don't configure anything here because it's already done in 
beforeStart()
+
+        boolean completed = latch.await(60, TimeUnit.SECONDS);
+        Assert.assertTrue("Stream should complete within timeout", completed);
+        Assert.assertEquals("Server should receive all chunks", sendCount, 
serverReceivedChunks.get());
+        Assert.assertTrue("Should receive response", responseReceived.get());
+        LOGGER.info("✅ Client stream with disableAutoRequestWithInitial test 
passed!");
+    }
+
+    /**
+     * Test client streaming with SERVER-SIDE receive backpressure.
+     * Server uses disableAutoRequest() and request(int) to control receiving 
rate.
+     * This demonstrates SERVER-SIDE receive backpressure ().
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testClientStreamWithServerReceiveBackpressure() throws 
InterruptedException {
+        final int sendCount = 50;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger sent = new AtomicInteger(0);
+        final AtomicBoolean sendCompleted = new AtomicBoolean(false);
+        final AtomicInteger serverReceivedChunks = new AtomicInteger(0);
+        final byte[] data = new byte[1024];
+
+        // Use ClientResponseObserver to configure send backpressure
+        ClientResponseObserver<DataChunk, StreamResponse> responseObserver =
+                new ClientResponseObserver<DataChunk, StreamResponse>() {
+            @Override
+            public void beforeStart(ClientCallStreamObserver<DataChunk> 
requestStream) {
+                LOGGER.info("[ClientStream-ServerReceiveBackpressure] 
beforeStart called");
+
+                // Disable auto flow control for manual send control
+                requestStream.disableAutoFlowControl();
+
+                // Set onReadyHandler to send data when ready
+                requestStream.setOnReadyHandler(() -> {
+                    LOGGER.info("[ClientStream-ServerReceiveBackpressure] 
onReadyHandler triggered, isReady={}, sent={}",
+                            requestStream.isReady(), sent.get());
+                    while (requestStream.isReady() && sent.get() < sendCount 
&& !sendCompleted.get()) {
+                        int seq = sent.getAndIncrement();
+                        requestStream.onNext(new DataChunk(seq, data, 
System.currentTimeMillis()));
+                    }
+
+                    if (sent.get() >= sendCount && 
!sendCompleted.getAndSet(true)) {
+                        requestStream.onCompleted();
+                        LOGGER.info("[ClientStream-ServerReceiveBackpressure] 
onCompleted called");
+                    }
+                });
+            }
+
+            @Override
+            public void onNext(StreamResponse response) {
+                serverReceivedChunks.set(response.getTotalChunks());
+                LOGGER.info("[ClientStream-ServerReceiveBackpressure] Server 
received: {} chunks, {} bytes in {}ms",
+                        response.getTotalChunks(), response.getTotalBytes(), 
response.getDurationMs());
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[ClientStream-ServerReceiveBackpressure] Error: 
{}", throwable.getMessage());
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                LOGGER.info("[ClientStream-ServerReceiveBackpressure] 
Completed");
+                latch.countDown();
+            }
+        };
+
+        // Start the stream - server will use disableAutoRequest() to control 
receive rate
+        StreamObserver<DataChunk> requestObserver = 
service.clientStream(responseObserver);
+
+        boolean completed = latch.await(60, TimeUnit.SECONDS);
+        Assert.assertTrue("Stream should complete within timeout", completed);
+        Assert.assertEquals("Server should receive all chunks", sendCount, 
serverReceivedChunks.get());
+        LOGGER.info("✅ Client stream with server receive backpressure test 
passed!");
+    }
+
+    // ==================== Bidirectional Stream Tests ====================
+
+    /**
+     * Test bidirectional streaming with HIGH-LEVEL API (setOnReadyHandler).
+     * Uses ClientResponseObserver.beforeStart() to configure send backpressure
+     * BEFORE the stream starts, following gRPC's pattern.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testBiStreamWithOnReadyHandler() throws InterruptedException {
+        final int sendCount = 30;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger sent = new AtomicInteger(0);
+        final AtomicInteger received = new AtomicInteger(0);
+        final AtomicBoolean sendCompleted = new AtomicBoolean(false);
+        final byte[] data = new byte[1024];
+
+        // Use AtomicReference to access the request stream from the 
onReadyHandler
+        final AtomicReference<ClientCallStreamObserver<DataChunk>> 
requestStreamRef = new AtomicReference<>();
+
+        // Use ClientResponseObserver to configure backpressure BEFORE the 
stream starts ()
+        ClientResponseObserver<DataChunk, DataChunk> responseObserver =
+                new ClientResponseObserver<DataChunk, DataChunk>() {
+            @Override
+            public void beforeStart(ClientCallStreamObserver<DataChunk> 
requestStream) {
+                requestStreamRef.set(requestStream);
+
+                // Disable auto flow control for manual send control
+                requestStream.disableAutoFlowControl();
+
+                // Set onReadyHandler BEFORE the stream starts
+                requestStream.setOnReadyHandler(() -> {
+                    LOGGER.info("[BiStream-OnReady] onReadyHandler triggered, 
isReady={}, sent={}",
+                            requestStream.isReady(), sent.get());
+                    while (requestStream.isReady() && sent.get() < sendCount 
&& !sendCompleted.get()) {
+                        int seq = sent.getAndIncrement();
+                        requestStream.onNext(new DataChunk(seq, data, 
System.currentTimeMillis()));
+                    }
+
+                    if (sent.get() >= sendCount && 
!sendCompleted.getAndSet(true)) {
+                        requestStream.onCompleted();
+                        LOGGER.info("[BiStream-OnReady] onCompleted called");
+                    }
+                });
+                LOGGER.info("[BiStream-OnReady] beforeStart: configured send 
backpressure");
+            }
+
+            @Override
+            public void onNext(DataChunk chunk) {
+                int count = received.incrementAndGet();
+                if (count % 10 == 0) {
+                    LOGGER.info("[BiStream-OnReady] Received {} response 
chunks", count);
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[BiStream-OnReady] Error: {}", 
throwable.getMessage());
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                LOGGER.info("[BiStream-OnReady] Completed, sent: {}, received: 
{}",
+                        sent.get(), received.get());
+                latch.countDown();
+            }
+        };
+
+        // Start the stream - beforeStart() is called inside
+        StreamObserver<DataChunk> requestObserver = 
service.biStream(responseObserver);
+
+        // Note: We don't need to set onReadyHandler here because it's already 
set in beforeStart()
+        // The onReadyHandler will be triggered by the framework when the 
stream becomes ready
+
+        boolean completed = latch.await(60, TimeUnit.SECONDS);
+        Assert.assertTrue("BiStream should complete within timeout", 
completed);
+        Assert.assertEquals("Should send all chunks", sendCount, sent.get());
+        Assert.assertTrue("Should receive response chunks", received.get() > 
0);
+        LOGGER.info("✅ BiStream with onReadyHandler test passed! sent={}, 
received={}",
+                sent.get(), received.get());
+    }
+
+    /**
+     * Test bidirectional streaming with LOW-LEVEL API 
(disableAutoRequestWithInitial).
+     * Uses ClientResponseObserver.beforeStart() to set up BOTH send and 
receive backpressure
+     * BEFORE the stream starts, following gRPC's pattern.
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testBiStreamWithDisableAutoRequestWithInitial() throws 
InterruptedException {
+        final int sendCount = 30;
+        final int initialRequest = 10;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger sent = new AtomicInteger(0);
+        final AtomicInteger received = new AtomicInteger(0);
+        final AtomicBoolean sendCompleted = new AtomicBoolean(false);
+        final byte[] data = new byte[1024];
+
+        // Use ClientResponseObserver to configure BOTH send and receive 
backpressure BEFORE the stream starts ()
+        ClientResponseObserver<DataChunk, DataChunk> responseObserver =
+                new ClientResponseObserver<DataChunk, DataChunk>() {
+            // Member variable to hold the request stream - same pattern as 
gRPC
+            private ClientCallStreamObserver<DataChunk> requestStream;
+
+            @Override
+            public void beforeStart(ClientCallStreamObserver<DataChunk> 
requestStream) {
+                this.requestStream = requestStream;
+
+                // Configure receive backpressure (LOW-LEVEL API)
+                requestStream.disableAutoRequestWithInitial(initialRequest);
+
+                // Configure send backpressure (HIGH-LEVEL API)
+                requestStream.disableAutoFlowControl();
+
+                // Set onReadyHandler BEFORE the stream starts ()
+                requestStream.setOnReadyHandler(() -> {
+                    LOGGER.info("[BiStream-DisableAutoRequestWithInitial] 
onReadyHandler triggered, isReady={}, sent={}",
+                            requestStream.isReady(), sent.get());
+                    while (requestStream.isReady() && sent.get() < sendCount 
&& !sendCompleted.get()) {
+                        int seq = sent.getAndIncrement();
+                        requestStream.onNext(new DataChunk(seq, data, 
System.currentTimeMillis()));
+                    }
+
+                    if (sent.get() >= sendCount && 
!sendCompleted.getAndSet(true)) {
+                        requestStream.onCompleted();
+                        LOGGER.info("[BiStream-DisableAutoRequestWithInitial] 
onCompleted called");
+                    }
+                });
+
+                LOGGER.info("[BiStream-DisableAutoRequestWithInitial] 
beforeStart: configured both send and receive backpressure");
+            }
+
+            @Override
+            public void onNext(DataChunk chunk) {
+                int count = received.incrementAndGet();
+                if (count % 10 == 0) {
+                    LOGGER.info("[BiStream-DisableAutoRequestWithInitial] 
Received {} chunks", count);
+                }
+
+                // Request more after processing - using member variable 
directly
+                if (requestStream != null) {
+                    requestStream.request(1);
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[BiStream-DisableAutoRequestWithInitial] Error: 
{}", throwable.getMessage());
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                LOGGER.info("[BiStream-DisableAutoRequestWithInitial] 
Completed, sent: {}, received: {}",
+                        sent.get(), received.get());
+                latch.countDown();
+            }
+        };
+
+        // Start the stream - beforeStart() is called inside, which sets up 
all backpressure configuration
+        StreamObserver<DataChunk> requestObserver = 
service.biStream(responseObserver);
+
+        // Note: We don't configure anything here because it's already done in 
beforeStart()
+
+        boolean completed = latch.await(60, TimeUnit.SECONDS);
+        Assert.assertTrue("BiStream should complete within timeout", 
completed);
+        Assert.assertEquals("Should send all chunks", sendCount, sent.get());
+        LOGGER.info("✅ BiStream with disableAutoRequestWithInitial test 
passed! sent={}, received={}",
+                sent.get(), received.get());
+    }
+
+    /**
+     * Test bidirectional streaming with SERVER-SIDE full backpressure control.
+     * Server uses both send backpressure (setOnReadyHandler) and receive 
backpressure (disableAutoRequest).
+     * This demonstrates the complete SERVER-SIDE backpressure pattern ().
+     */
+    @Test
+    @SuppressWarnings("unchecked")
+    public void testBiStreamWithServerFullBackpressure() throws 
InterruptedException {
+        final int sendCount = 50;
+        final CountDownLatch latch = new CountDownLatch(1);
+        final AtomicInteger sent = new AtomicInteger(0);
+        final AtomicInteger received = new AtomicInteger(0);
+        final AtomicBoolean sendCompleted = new AtomicBoolean(false);
+        final byte[] data = new byte[1024];
+
+        // Use ClientResponseObserver to configure client-side send 
backpressure
+        ClientResponseObserver<DataChunk, DataChunk> responseObserver =
+                new ClientResponseObserver<DataChunk, DataChunk>() {
+            private ClientCallStreamObserver<DataChunk> requestStream;
+
+            @Override
+            public void beforeStart(ClientCallStreamObserver<DataChunk> 
requestStream) {
+                this.requestStream = requestStream;
+
+                // Disable auto flow control for manual send control
+                requestStream.disableAutoFlowControl();
+
+                // Set onReadyHandler for client-side send backpressure
+                requestStream.setOnReadyHandler(() -> {
+                    LOGGER.info("[BiStream-ServerFullBackpressure] Client 
onReadyHandler triggered, isReady={}, sent={}",
+                            requestStream.isReady(), sent.get());
+                    while (requestStream.isReady() && sent.get() < sendCount 
&& !sendCompleted.get()) {
+                        int seq = sent.getAndIncrement();
+                        requestStream.onNext(new DataChunk(seq, data, 
System.currentTimeMillis()));
+                    }
+
+                    if (sent.get() >= sendCount && 
!sendCompleted.getAndSet(true)) {
+                        requestStream.onCompleted();
+                        LOGGER.info("[BiStream-ServerFullBackpressure] Client 
onCompleted called");
+                    }
+                });
+
+                LOGGER.info("[BiStream-ServerFullBackpressure] Client 
beforeStart configured");
+            }
+
+            @Override
+            public void onNext(DataChunk chunk) {
+                int count = received.incrementAndGet();
+                if (count % 10 == 0) {
+                    LOGGER.info("[BiStream-ServerFullBackpressure] Client 
received {} response chunks", count);
+                }
+            }
+
+            @Override
+            public void onError(Throwable throwable) {
+                LOGGER.error("[BiStream-ServerFullBackpressure] Error: {}", 
throwable.getMessage());
+                latch.countDown();
+            }
+
+            @Override
+            public void onCompleted() {
+                LOGGER.info("[BiStream-ServerFullBackpressure] Completed, 
sent: {}, received: {}",
+                        sent.get(), received.get());
+                latch.countDown();
+            }
+        };
+
+        // Start the stream - server will use full backpressure control:
+        // - setOnReadyHandler() for send backpressure
+        // - disableAutoRequest() + request() for receive backpressure
+        StreamObserver<DataChunk> requestObserver = 
service.biStream(responseObserver);
+
+        boolean completed = latch.await(60, TimeUnit.SECONDS);
+        Assert.assertTrue("BiStream should complete within timeout", 
completed);
+        Assert.assertEquals("Should send all chunks", sendCount, sent.get());
+        Assert.assertTrue("Should receive response chunks", received.get() > 
0);
+        LOGGER.info("✅ BiStream with server full backpressure test passed! 
sent={}, received={}",
+                sent.get(), received.get());
+    }
+}
diff --git a/2-advanced/pom.xml b/2-advanced/pom.xml
index 5cefff4d7..eb8c61cb7 100644
--- a/2-advanced/pom.xml
+++ b/2-advanced/pom.xml
@@ -73,5 +73,6 @@
         <module>dubbo-samples-multiple-protocols</module>
         <module>dubbo-samples-triple-websocket</module>
         <module>dubbo-samples-broadcast</module>
+        <module>dubbo-samples-triple-backpressure</module>
     </modules>
 </project>
diff --git a/test/scripts/filter-build-modules.sh 
b/test/scripts/filter-build-modules.sh
new file mode 100755
index 000000000..c385dc6e5
--- /dev/null
+++ b/test/scripts/filter-build-modules.sh
@@ -0,0 +1,303 @@
+#!/bin/bash
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# This script filters modules based on dubbo.version configuration
+# in case-versions.conf files. The logic is consistent with Java 
VersionMatcher.
+#
+# Usage: ./filter-build-modules.sh <dubbo_version>
+# Output: Maven -pl exclude list (e.g., "-pl !module1,!module2")
+#
+# Supported dubbo.version formats (same as Java VersionMatcher):
+#   dubbo.version=2.7*, 3.*           -> wildcard match
+#   dubbo.version=[ >= 3.3.6 ]        -> range match
+#   dubbo.version=3.3.*               -> wildcard match
+#   dubbo.version=!2.7.8*             -> exclusion
+#   dubbo.version=>=2.7.7 <3.0        -> combined range
+
+set -e
+
+SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
+BASE_DIR="$( cd "$SCRIPT_DIR/../.." && pwd )"
+
+DUBBO_VERSION=$1
+
+if [ -z "$DUBBO_VERSION" ]; then
+    echo "Usage: $0 <dubbo_version>" >&2
+    echo "Example: $0 3.2.6" >&2
+    exit 1
+fi
+
+# Trim version suffix like '-SNAPSHOT'
+trim_version() {
+    local version=$1
+    echo "$version" | sed 's/-.*$//'
+}
+
+# Convert version string to comparable array (e.g., "3.2.6" -> "3 2 6 0")
+# Returns space-separated integers for comparison
+version_to_ints() {
+    local version=$1
+    version=$(trim_version "$version")
+    
+    local IFS='.'
+    read -ra parts <<< "$version"
+    
+    local result=""
+    for i in 0 1 2 3; do
+        if [ $i -lt ${#parts[@]} ]; then
+            # Extract numeric part only
+            local num=$(echo "${parts[$i]}" | grep -oE '^[0-9]+' || echo "0")
+            result="$result $num"
+        else
+            result="$result 0"
+        fi
+    done
+    echo $result
+}
+
+# Compare two versions: returns 0 if v1 > v2, 1 if v1 < v2, 2 if equal
+compare_versions() {
+    local v1_ints=($1)
+    local v2_ints=($2)
+    
+    for i in 0 1 2 3; do
+        if [ "${v1_ints[$i]}" -gt "${v2_ints[$i]}" ]; then
+            return 0  # v1 > v2
+        elif [ "${v1_ints[$i]}" -lt "${v2_ints[$i]}" ]; then
+            return 1  # v1 < v2
+        fi
+    done
+    return 2  # equal
+}
+
+# Check if version matches a wildcard pattern (e.g., "3.*", "2.7*")
+match_wildcard() {
+    local version=$1
+    local pattern=$2
+    
+    # Convert wildcard pattern to regex
+    # "3.*" -> "^3\..*$", "2.7*" -> "^2\.7.*$"
+    local regex=$(echo "$pattern" | sed 's/\./\\./g' | sed 's/\*/.*/g')
+    regex="^${regex}$"
+    
+    if [[ "$version" =~ $regex ]]; then
+        return 0  # match
+    else
+        return 1  # no match
+    fi
+}
+
+# Check if version satisfies a range condition (e.g., ">=3.3.6", "<3.0")
+match_range() {
+    local version=$1
+    local operator=$2
+    local target=$3
+    
+    local v_ints=$(version_to_ints "$version")
+    local t_ints=$(version_to_ints "$target")
+    
+    compare_versions "$v_ints" "$t_ints"
+    local cmp=$?
+    
+    case "$operator" in
+        ">=")
+            [ $cmp -eq 0 ] || [ $cmp -eq 2 ]
+            return $?
+            ;;
+        ">")
+            [ $cmp -eq 0 ]
+            return $?
+            ;;
+        "<=")
+            [ $cmp -eq 1 ] || [ $cmp -eq 2 ]
+            return $?
+            ;;
+        "<")
+            [ $cmp -eq 1 ]
+            return $?
+            ;;
+    esac
+    return 1
+}
+
+# Parse and check a single rule against a version
+# Returns: 0=included, 1=excluded, 2=no match
+check_rule() {
+    local version=$1
+    local rule=$2
+    local excluded=0
+    
+    # Trim leading/trailing whitespace first
+    rule=$(echo "$rule" | xargs)
+    
+    # Check for exclusion prefix
+    if [[ "$rule" == !* ]]; then
+        excluded=1
+        rule="${rule:1}"
+        rule=$(echo "$rule" | xargs)  # trim again after removing !
+    fi
+    
+    # Check for range operators (>=, >, <=, <)
+    if [[ "$rule" =~ ^(>=|>|<=|<)[[:space:]]*([0-9]+\.[0-9]+(\.[0-9]+)?) ]]; 
then
+        local operator="${BASH_REMATCH[1]}"
+        local target="${BASH_REMATCH[2]}"
+        
+        if match_range "$version" "$operator" "$target"; then
+            [ $excluded -eq 1 ] && return 1 || return 0
+        fi
+        return 2
+    fi
+    
+    # Check for wildcard
+    if [[ "$rule" == *"*"* ]]; then
+        if match_wildcard "$version" "$rule"; then
+            [ $excluded -eq 1 ] && return 1 || return 0
+        fi
+        return 2
+    fi
+    
+    # Plain version match
+    local trimmed_version=$(trim_version "$version")
+    if [ "$trimmed_version" == "$rule" ]; then
+        [ $excluded -eq 1 ] && return 1 || return 0
+    fi
+    
+    return 2
+}
+
+# Check if a version matches a set of rules (from dubbo.version line)
+# Returns: 0 if version is included, 1 if excluded or no match
+check_version_rules() {
+    local version=$1
+    local rules_str=$2
+    
+    # Trim leading/trailing whitespace
+    rules_str=$(echo "$rules_str" | xargs)
+    
+    # Remove brackets [ ]
+    rules_str=$(echo "$rules_str" | sed 's/^\[//' | sed 's/\]$//' | xargs)
+    
+    # Handle combined range rules like ">=2.7.7 <3.0" (space-separated without 
comma)
+    # First, check if it's a combined range (contains space but no comma 
between operators)
+    if [[ "$rules_str" =~ 
^[[:space:]]*(\>=|\>|\<=|\<)[^,]+[[:space:]]+(\>=|\>|\<=|\<) ]]; then
+        # Combined range rule - all conditions must match
+        local all_match=true
+        while [[ "$rules_str" =~ (\>=|\>|\<=|\<)([^[:space:]\>=\<]+) ]]; do
+            local operator="${BASH_REMATCH[1]}"
+            local target="${BASH_REMATCH[2]}"
+            # Remove quotes
+            target=$(echo "$target" | tr -d "\"'")
+            
+            if ! match_range "$version" "$operator" "$target"; then
+                all_match=false
+                break
+            fi
+            # Remove matched part
+            rules_str="${rules_str#*${BASH_REMATCH[0]}}"
+        done
+        
+        if $all_match; then
+            return 0
+        fi
+        return 1
+    fi
+    
+    # Split by comma for multiple rules
+    local included=false
+    
+    IFS=',' read -ra rules <<< "$rules_str"
+    for rule in "${rules[@]}"; do
+        # Remove quotes
+        rule=$(echo "$rule" | tr -d "\"'" | xargs)
+        
+        # Skip empty rules
+        [ -z "$rule" ] && continue
+        
+        # Check for combined range within a single rule (e.g., ">=2.7.7 <3.0")
+        if [[ "$rule" =~ 
^[[:space:]]*(\>=|\>|\<=|\<).+[[:space:]]+(\>=|\>|\<=|\<) ]]; then
+            local all_match=true
+            local temp_rule="$rule"
+            while [[ "$temp_rule" =~ (\>=|\>|\<=|\<)([^[:space:]\>=\<]+) ]]; do
+                local operator="${BASH_REMATCH[1]}"
+                local target="${BASH_REMATCH[2]}"
+                
+                if ! match_range "$version" "$operator" "$target"; then
+                    all_match=false
+                    break
+                fi
+                temp_rule="${temp_rule#*${BASH_REMATCH[0]}}"
+            done
+            
+            if $all_match; then
+                included=true
+            fi
+            continue
+        fi
+        
+        check_rule "$version" "$rule"
+        local result=$?
+        
+        if [ $result -eq 1 ]; then
+            # Excluded - immediate return
+            return 1
+        elif [ $result -eq 0 ]; then
+            included=true
+        fi
+    done
+    
+    if $included; then
+        return 0
+    fi
+    return 1
+}
+
+# Get the trimmed version for matching
+CURRENT_VERSION=$(trim_version "$DUBBO_VERSION")
+
+# Find all case-versions.conf files and check dubbo.version
+EXCLUDED_MODULES=""
+
+while IFS= read -r conf_file; do
+    # Read dubbo.version from the config file
+    RAW_VERSION=$(grep -E "^dubbo\.version\s*=" "$conf_file" 2>/dev/null | sed 
's/^dubbo\.version\s*=\s*//' || echo "")
+    
+    if [ -n "$RAW_VERSION" ]; then
+        # Check if current version matches the rules
+        if ! check_version_rules "$CURRENT_VERSION" "$RAW_VERSION"; then
+            # Version does not match - exclude this module
+            MODULE_DIR=$(dirname "$conf_file")
+            RELATIVE_PATH="${MODULE_DIR#$BASE_DIR/}"
+            
+            if [ -n "$EXCLUDED_MODULES" ]; then
+                EXCLUDED_MODULES="$EXCLUDED_MODULES,$RELATIVE_PATH"
+            else
+                EXCLUDED_MODULES="$RELATIVE_PATH"
+            fi
+            
+            echo "Excluding module: $RELATIVE_PATH 
(dubbo.version=$RAW_VERSION, current: $CURRENT_VERSION)" >&2
+        fi
+    fi
+done < <(find "$BASE_DIR" -name "case-versions.conf" -type f)
+
+# Output the Maven exclude parameter
+if [ -n "$EXCLUDED_MODULES" ]; then
+    # Format: -pl !module1,!module2
+    echo "-pl !${EXCLUDED_MODULES//,/,!}"
+else
+    echo ""
+fi


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to