This is an automated email from the ASF dual-hosted git repository.

xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git


The following commit(s) were added to refs/heads/main by this push:
     new a9d0ec658 feat: impl configurable OperatorOutputStream maxBytes (#5422)
a9d0ec658 is described below

commit a9d0ec658449cdcb60e2c59e5ac7c0163e00ca31
Author: tison <[email protected]>
AuthorDate: Wed Dec 18 11:17:41 2024 +0800

    feat: impl configurable OperatorOutputStream maxBytes (#5422)
    
    * feat: impl configurable OperatorOutputStream maxBytes
    
    Signed-off-by: tison <[email protected]>
    
    * test: testAzblobLargeFile
    
    Signed-off-by: tison <[email protected]>
    
    * do fix test
    
    Signed-off-by: tison <[email protected]>
    
    ---------
    
    Signed-off-by: tison <[email protected]>
---
 .../src/main/java/org/apache/opendal/Operator.java |  4 ++
 .../org/apache/opendal/OperatorOutputStream.java   | 19 ++++++---
 .../opendal/test/behavior/BehaviorExtension.java   |  3 ++
 .../opendal/test/behavior/BehaviorTestBase.java    | 14 +++++++
 .../opendal/test/behavior/RegressionTest.java      | 46 ++++++++++++++++++++++
 5 files changed, 80 insertions(+), 6 deletions(-)

diff --git a/bindings/java/src/main/java/org/apache/opendal/Operator.java 
b/bindings/java/src/main/java/org/apache/opendal/Operator.java
index 63ede3829..bb08c87a9 100644
--- a/bindings/java/src/main/java/org/apache/opendal/Operator.java
+++ b/bindings/java/src/main/java/org/apache/opendal/Operator.java
@@ -72,6 +72,10 @@ public class Operator extends NativeObject {
         return new OperatorOutputStream(this, path);
     }
 
+    public OperatorOutputStream createOutputStream(String path, int maxBytes) {
+        return new OperatorOutputStream(this, path, maxBytes);
+    }
+
     public byte[] read(String path) {
         return read(nativeHandle, path);
     }
diff --git 
a/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java 
b/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java
index 0f315174f..05afac416 100644
--- a/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java
+++ b/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java
@@ -35,31 +35,38 @@ public class OperatorOutputStream extends OutputStream {
         }
     }
 
-    private static final int MAX_BYTES = 16384;
+    private static final int DEFAULT_MAX_BYTES = 16384;
 
     private final Writer writer;
-    private final byte[] bytes = new byte[MAX_BYTES];
+    private final byte[] bytes;
+    private final int maxBytes;
 
     private int offset = 0;
 
     public OperatorOutputStream(Operator operator, String path) {
+        this(operator, path, DEFAULT_MAX_BYTES);
+    }
+
+    public OperatorOutputStream(Operator operator, String path, int maxBytes) {
         final long op = operator.nativeHandle;
         this.writer = new Writer(constructWriter(op, path));
+        this.maxBytes = maxBytes;
+        this.bytes = new byte[maxBytes];
     }
 
     @Override
     public void write(int b) throws IOException {
         bytes[offset++] = (byte) b;
-        if (offset >= MAX_BYTES) {
+        if (offset >= maxBytes) {
             flush();
         }
     }
 
     @Override
     public void flush() throws IOException {
-        if (offset > MAX_BYTES) {
-            throw new IOException("INTERNAL ERROR: " + offset + " > " + 
MAX_BYTES);
-        } else if (offset < MAX_BYTES) {
+        if (offset > maxBytes) {
+            throw new IOException("INTERNAL ERROR: " + offset + " > " + 
maxBytes);
+        } else if (offset < maxBytes) {
             final byte[] bytes = Arrays.copyOf(this.bytes, offset);
             writeBytes(writer.nativeHandle, bytes);
         } else {
diff --git 
a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java
 
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java
index 8812f3b4b..d48c645a4 100644
--- 
a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java
+++ 
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.extension.TestWatcher;
 public class BehaviorExtension implements BeforeAllCallback, AfterAllCallback, 
TestWatcher {
     private String testName;
 
+    public String scheme;
     public AsyncOperator asyncOperator;
     public Operator operator;
 
@@ -67,6 +68,7 @@ public class BehaviorExtension implements BeforeAllCallback, 
AfterAllCallback, T
             this.asyncOperator = op.layer(RetryLayer.builder().build());
             this.operator = this.asyncOperator.blocking();
 
+            this.scheme = scheme;
             this.testName = String.format("%s(%s)", context.getDisplayName(), 
scheme);
             log.info(
                     
"\n================================================================================"
@@ -94,6 +96,7 @@ public class BehaviorExtension implements BeforeAllCallback, 
AfterAllCallback, T
             operator = null;
         }
 
+        this.scheme = null;
         this.testName = null;
     }
 
diff --git 
a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java
 
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java
index 78a138cf0..8380ed6e0 100644
--- 
a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java
+++ 
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java
@@ -46,6 +46,10 @@ public abstract class BehaviorTestBase {
         return behaviorExtension.operator;
     }
 
+    protected String scheme() {
+        return behaviorExtension.scheme;
+    }
+
     /**
      * Generates a byte array of random content.
      */
@@ -57,6 +61,16 @@ public abstract class BehaviorTestBase {
         return content;
     }
 
+    /**
+     * Generates a byte array of random content with a specific size.
+     */
+    public static byte[] generateBytes(int size) {
+        final Random random = new Random();
+        final byte[] content = new byte[size];
+        random.nextBytes(content);
+        return content;
+    }
+
     /**
      * Calculate SHA256 digest of input bytes
      *
diff --git 
a/bindings/java/src/test/java/org/apache/opendal/test/behavior/RegressionTest.java
 
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/RegressionTest.java
new file mode 100644
index 000000000..2a9721b69
--- /dev/null
+++ 
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/RegressionTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.opendal.test.behavior;
+
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+import java.util.UUID;
+import org.apache.opendal.OperatorOutputStream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class RegressionTest extends BehaviorTestBase {
+    // @see https://github.com/apache/opendal/issues/5421
+    @Test
+    public void testAzblobLargeFile() throws Exception {
+        assumeTrue(scheme() != null && scheme().equalsIgnoreCase("azblob"));
+
+        final String path = UUID.randomUUID().toString();
+        final int size = 16384 * 10; // 10 x 
OperatorOutputStream.DEFAULT_MAX_BYTES (10 flushes per write)
+        final byte[] content = generateBytes(size);
+
+        try (OperatorOutputStream operatorOutputStream = 
op().createOutputStream(path, size)) {
+            for (int i = 0; i < 20000; i++) {
+                // More iterations in case BlockCountExceedsLimit doesn't pop 
up exactly after 100K blocks.
+                operatorOutputStream.write(content);
+            }
+        }
+    }
+}

Reply via email to