This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9f5565833a6 HDDS-12992. Clients should not use gRPC port for Streaming 
(#9988)
9f5565833a6 is described below

commit 9f5565833a6bfc875bea4bc3fe040b6282b63e4e
Author: Russole <[email protected]>
AuthorDate: Sat Mar 28 23:43:32 2026 +0800

    HDDS-12992. Clients should not use gRPC port for Streaming (#9988)
---
 .../hdds/scm/storage/BlockDataStreamOutput.java    |   8 ++
 .../hadoop/hdds/protocol/DatanodeDetails.java      |  11 ++
 .../ozone/client/io/KeyDataStreamOutput.java       |   6 +-
 ...oneFileSystemWithStreamingDisabledDatanode.java | 144 +++++++++++++++++++++
 4 files changed, 167 insertions(+), 2 deletions(-)

diff --git 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
index e570ba30885..1ababc7a1d7 100644
--- 
a/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
+++ 
b/hadoop-hdds/client/src/main/java/org/apache/hadoop/hdds/scm/storage/BlockDataStreamOutput.java
@@ -194,6 +194,14 @@ public BlockDataStreamOutput(
   }
 
   private DataStreamOutput setupStream(Pipeline pipeline) throws IOException {
+    for (DatanodeDetails dn : pipeline.getNodes()) {
+      if (!dn.hasPort(DatanodeDetails.Port.Name.RATIS_DATASTREAM)) {
+        throw new IOException("RATIS_DATASTREAM port is missing for datanode "
+            + dn + " in pipeline " + pipeline.getId()
+            + "; datastream is disabled for this pipeline");
+      }
+    }
+
     // Execute a dummy WriteChunk request to get the path of the target file,
     // but does NOT write any data to it.
     ContainerProtos.WriteChunkRequestProto.Builder writeChunkRequest =
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
index 2585f593611..d72e010535d 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/protocol/DatanodeDetails.java
@@ -384,6 +384,17 @@ public synchronized Port getPort(Port.Name name) {
     return null;
   }
 
+  // CHANGE: add a helper to check whether a port is explicitly present
+  // without applying compatibility fallback.
+  public synchronized boolean hasPort(Port.Name name) {
+    for (Port port : ports) {
+      if (port.getName().equals(name)) {
+        return true;
+      }
+    }
+    return false;
+  }
+
   /**
    * Helper method to get the Ratis port.
    * 
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
index 21384d488dd..ceacd624e93 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyDataStreamOutput.java
@@ -306,8 +306,10 @@ private void handleException(BlockDataStreamOutputEntry 
streamEntry,
           BlockDataStreamOutput blockDataStreamOutput =
               (BlockDataStreamOutput) currentStreamEntry
                   .getByteBufStreamOutput();
-          blockDataStreamOutput.executePutBlock(false, false);
-          blockDataStreamOutput.watchForCommit(false);
+          if (blockDataStreamOutput != null) {
+            blockDataStreamOutput.executePutBlock(false, false);
+            blockDataStreamOutput.watchForCommit(false);
+          }
         } catch (IOException e) {
           LOG.error(
               "Failed to execute putBlock/watchForCommit. " +
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreamingDisabledDatanode.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreamingDisabledDatanode.java
new file mode 100644
index 00000000000..cb59a3ae4da
--- /dev/null
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/fs/ozone/TestOzoneFileSystemWithStreamingDisabledDatanode.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.ozone;
+
+import static 
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_RATIS_PIPELINE_LIMIT;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_AUTO_THRESHOLD;
+import static 
org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_FS_DATASTREAM_ENABLED;
+import static org.apache.hadoop.ozone.OzoneConsts.OZONE_URI_SCHEME;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.io.IOException;
+import java.util.concurrent.ThreadLocalRandom;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.utils.IOUtils;
+import org.apache.hadoop.ozone.ClientConfigForTesting;
+import org.apache.hadoop.ozone.MiniOzoneCluster;
+import org.apache.hadoop.ozone.TestDataUtil;
+import org.apache.hadoop.ozone.client.OzoneBucket;
+import org.apache.hadoop.ozone.client.OzoneClient;
+import org.apache.hadoop.ozone.om.helpers.BucketLayout;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Verifies that when DataNode-side datastream is disabled, the client fails
+ * fast instead of falling back to the gRPC port for streaming writes.
+ */
+public class TestOzoneFileSystemWithStreamingDisabledDatanode {
+
+  private static MiniOzoneCluster cluster;
+  private static OzoneClient client;
+  private static OzoneBucket bucket;
+  private static OzoneConfiguration conf;
+
+  @BeforeAll
+  public static void init() throws Exception {
+    conf = new OzoneConfiguration();
+
+    final BucketLayout layout = BucketLayout.FILE_SYSTEM_OPTIMIZED;
+
+    // DataNode side: disable datastream, so pipeline should not explicitly
+    // carry RATIS_DATASTREAM ports.
+    conf.setBoolean(HDDS_CONTAINER_RATIS_DATASTREAM_ENABLED, false);
+
+    // Client side: enable datastream so the write path will attempt streaming.
+    conf.setBoolean(OZONE_FS_DATASTREAM_ENABLED, true);
+
+    conf.set(OZONE_FS_DATASTREAM_AUTO_THRESHOLD, "1B");
+
+    conf.setInt(OZONE_SCM_RATIS_PIPELINE_LIMIT, 10);
+
+    ClientConfigForTesting.newBuilder(StorageUnit.BYTES)
+        .setChunkSize(16 << 10)
+        .setStreamBufferFlushSize(32 << 10)
+        .setStreamBufferMaxSize(64 << 10)
+        .setBlockSize(128 << 10)
+        .applyTo(conf);
+
+    cluster = MiniOzoneCluster.newBuilder(conf)
+        .setNumDatanodes(3)
+        .build();
+    cluster.waitForClusterToBeReady();
+
+    client = cluster.newClient();
+    bucket = TestDataUtil.createVolumeAndBucket(client, layout);
+  }
+
+  @AfterAll
+  public static void teardown() {
+    IOUtils.closeQuietly(client);
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testDatastreamWriteFailsFastWhenDatanodeStreamingDisabled()
+      throws Exception {
+    final String rootPath = String.format("%s://%s.%s/",
+        OZONE_URI_SCHEME, bucket.getName(), bucket.getVolumeName());
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, rootPath);
+
+    final byte[] bytes = new byte[3 << 20];
+    ThreadLocalRandom.current().nextBytes(bytes);
+
+    try (FileSystem fs = FileSystem.get(conf)) {
+      final Path file = new Path("/streaming-disabled-dn.dat");
+
+      IOException ex = assertThrows(IOException.class, () -> {
+        try (FSDataOutputStream out = fs.create(file, true)) {
+          out.write(bytes);
+        }
+      });
+
+      final String msg = collectMessages(ex).toLowerCase();
+
+      // Expect a clear fail-fast error from our validation.
+      assertTrue(
+          msg.contains("ratis_datastream port is missing")
+              || msg.contains("datastream is disabled")
+              || msg.contains("ratis_datastream"),
+          () -> "Expected a clear fail-fast datastream error, but got: " + ex);
+
+      // Ensure we did not fall back to the old gRPC/HTTP2 failure path.
+      assertTrue(
+          !msg.contains("http/2") && !msg.contains("timeout"),
+          () -> "Should not fall back to gRPC/HTTP2 path, but got: " + ex);
+    }
+  }
+
+  private static String collectMessages(Throwable t) {
+    StringBuilder sb = new StringBuilder();
+    while (t != null) {
+      if (t.getMessage() != null) {
+        sb.append(t.getMessage()).append('\n');
+      }
+      t = t.getCause();
+    }
+    return sb.toString();
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to