This is an automated email from the ASF dual-hosted git repository.
xuanwo pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/opendal.git
The following commit(s) were added to refs/heads/main by this push:
new a9d0ec658 feat: impl configurable OperatorOutputStream maxBytes (#5422)
a9d0ec658 is described below
commit a9d0ec658449cdcb60e2c59e5ac7c0163e00ca31
Author: tison <[email protected]>
AuthorDate: Wed Dec 18 11:17:41 2024 +0800
feat: impl configurable OperatorOutputStream maxBytes (#5422)
* feat: impl configurable OperatorOutputStream maxBytes
Signed-off-by: tison <[email protected]>
* test: testAzblobLargeFile
Signed-off-by: tison <[email protected]>
* do fix test
Signed-off-by: tison <[email protected]>
---------
Signed-off-by: tison <[email protected]>
---
.../src/main/java/org/apache/opendal/Operator.java | 4 ++
.../org/apache/opendal/OperatorOutputStream.java | 19 ++++++---
.../opendal/test/behavior/BehaviorExtension.java | 3 ++
.../opendal/test/behavior/BehaviorTestBase.java | 14 +++++++
.../opendal/test/behavior/RegressionTest.java | 46 ++++++++++++++++++++++
5 files changed, 80 insertions(+), 6 deletions(-)
diff --git a/bindings/java/src/main/java/org/apache/opendal/Operator.java
b/bindings/java/src/main/java/org/apache/opendal/Operator.java
index 63ede3829..bb08c87a9 100644
--- a/bindings/java/src/main/java/org/apache/opendal/Operator.java
+++ b/bindings/java/src/main/java/org/apache/opendal/Operator.java
@@ -72,6 +72,10 @@ public class Operator extends NativeObject {
return new OperatorOutputStream(this, path);
}
+ public OperatorOutputStream createOutputStream(String path, int maxBytes) {
+ return new OperatorOutputStream(this, path, maxBytes);
+ }
+
public byte[] read(String path) {
return read(nativeHandle, path);
}
diff --git
a/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java
b/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java
index 0f315174f..05afac416 100644
--- a/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java
+++ b/bindings/java/src/main/java/org/apache/opendal/OperatorOutputStream.java
@@ -35,31 +35,38 @@ public class OperatorOutputStream extends OutputStream {
}
}
- private static final int MAX_BYTES = 16384;
+ private static final int DEFAULT_MAX_BYTES = 16384;
private final Writer writer;
- private final byte[] bytes = new byte[MAX_BYTES];
+ private final byte[] bytes;
+ private final int maxBytes;
private int offset = 0;
public OperatorOutputStream(Operator operator, String path) {
+ this(operator, path, DEFAULT_MAX_BYTES);
+ }
+
+ public OperatorOutputStream(Operator operator, String path, int maxBytes) {
final long op = operator.nativeHandle;
this.writer = new Writer(constructWriter(op, path));
+ this.maxBytes = maxBytes;
+ this.bytes = new byte[maxBytes];
}
@Override
public void write(int b) throws IOException {
bytes[offset++] = (byte) b;
- if (offset >= MAX_BYTES) {
+ if (offset >= maxBytes) {
flush();
}
}
@Override
public void flush() throws IOException {
- if (offset > MAX_BYTES) {
- throw new IOException("INTERNAL ERROR: " + offset + " > " +
MAX_BYTES);
- } else if (offset < MAX_BYTES) {
+ if (offset > maxBytes) {
+ throw new IOException("INTERNAL ERROR: " + offset + " > " +
maxBytes);
+ } else if (offset < maxBytes) {
final byte[] bytes = Arrays.copyOf(this.bytes, offset);
writeBytes(writer.nativeHandle, bytes);
} else {
diff --git
a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java
index 8812f3b4b..d48c645a4 100644
---
a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java
+++
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorExtension.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.extension.TestWatcher;
public class BehaviorExtension implements BeforeAllCallback, AfterAllCallback,
TestWatcher {
private String testName;
+ public String scheme;
public AsyncOperator asyncOperator;
public Operator operator;
@@ -67,6 +68,7 @@ public class BehaviorExtension implements BeforeAllCallback,
AfterAllCallback, T
this.asyncOperator = op.layer(RetryLayer.builder().build());
this.operator = this.asyncOperator.blocking();
+ this.scheme = scheme;
this.testName = String.format("%s(%s)", context.getDisplayName(),
scheme);
log.info(
"\n================================================================================"
@@ -94,6 +96,7 @@ public class BehaviorExtension implements BeforeAllCallback,
AfterAllCallback, T
operator = null;
}
+ this.scheme = null;
this.testName = null;
}
diff --git
a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java
index 78a138cf0..8380ed6e0 100644
---
a/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java
+++
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/BehaviorTestBase.java
@@ -46,6 +46,10 @@ public abstract class BehaviorTestBase {
return behaviorExtension.operator;
}
+ protected String scheme() {
+ return behaviorExtension.scheme;
+ }
+
/**
* Generates a byte array of random content.
*/
@@ -57,6 +61,16 @@ public abstract class BehaviorTestBase {
return content;
}
+ /**
+ * Generates a byte array of random content with a specific size.
+ */
+ public static byte[] generateBytes(int size) {
+ final Random random = new Random();
+ final byte[] content = new byte[size];
+ random.nextBytes(content);
+ return content;
+ }
+
/**
* Calculate SHA256 digest of input bytes
*
diff --git
a/bindings/java/src/test/java/org/apache/opendal/test/behavior/RegressionTest.java
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/RegressionTest.java
new file mode 100644
index 000000000..2a9721b69
--- /dev/null
+++
b/bindings/java/src/test/java/org/apache/opendal/test/behavior/RegressionTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.opendal.test.behavior;
+
+import static org.junit.jupiter.api.Assumptions.assumeTrue;
+import java.util.UUID;
+import org.apache.opendal.OperatorOutputStream;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInstance;
+
+@TestInstance(TestInstance.Lifecycle.PER_CLASS)
+public class RegressionTest extends BehaviorTestBase {
+ // @see https://github.com/apache/opendal/issues/5421
+ @Test
+ public void testAzblobLargeFile() throws Exception {
+ assumeTrue(scheme() != null && scheme().equalsIgnoreCase("azblob"));
+
+ final String path = UUID.randomUUID().toString();
+ final int size = 16384 * 10; // 10 x
OperatorOutputStream.DEFAULT_MAX_BYTES (10 flushes per write)
+ final byte[] content = generateBytes(size);
+
+ try (OperatorOutputStream operatorOutputStream =
op().createOutputStream(path, size)) {
+ for (int i = 0; i < 20000; i++) {
+ // More iterations in case BlockCountExceedsLimit doesn't pop
up exactly after 100K blocks.
+ operatorOutputStream.write(content);
+ }
+ }
+ }
+}