This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c2e7d2dd6d [core] Support format reader and writer use File IO
directly (#5760)
c2e7d2dd6d is described below
commit c2e7d2dd6dcda132a9dc7b03c291371015698dcb
Author: YeJunHao <[email protected]>
AuthorDate: Fri Jun 27 17:11:57 2025 +0800
[core] Support format reader and writer use File IO directly (#5760)
---
.../apache/paimon/format/SupportsDirectWrite.java | 30 ++++++++++++++++++++++
.../org/apache/paimon/io/SingleFileWriter.java | 13 +++++++---
.../java/org/apache/paimon/utils/ObjectsFile.java | 27 +++++++++++++------
.../java/org/apache/paimon/jindo/JindoFileIO.java | 4 +++
.../main/java/org/apache/paimon/oss/OSSFileIO.java | 4 +++
5 files changed, 66 insertions(+), 12 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
b/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
new file mode 100644
index 0000000000..76ac9a8ba1
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+
+import java.io.IOException;
+
+/** Creaet a FormatWriter which has full control abort file io. */
+public interface SupportsDirectWrite {
+
+ FormatWriter create(FileIO fileIO, Path path, String compression) throws
IOException;
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index ec80244113..c9bf104249 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.BundleFormatWriter;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SupportsDirectWrite;
import org.apache.paimon.fs.AsyncPositionOutputStream;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
@@ -68,11 +69,15 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
this.converter = converter;
try {
- out = fileIO.newOutputStream(path, false);
- if (asyncWrite) {
- out = new AsyncPositionOutputStream(out);
+ if (factory instanceof SupportsDirectWrite) {
+ writer = ((SupportsDirectWrite) factory).create(fileIO, path,
compression);
+ } else {
+ out = fileIO.newOutputStream(path, false);
+ if (asyncWrite) {
+ out = new AsyncPositionOutputStream(out);
+ }
+ writer = factory.create(out, compression);
}
- writer = factory.create(out, compression);
} catch (IOException e) {
LOG.warn(
"Failed to open the bulk writer, closing the output stream
and throw the error.",
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 241fd5eb90..5e777ad19f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
import org.apache.paimon.format.FormatReaderFactory;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SupportsDirectWrite;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
@@ -156,19 +157,29 @@ public class ObjectsFile<T> implements
SimpleFileReader<T> {
protected Pair<String, Long> writeWithoutRolling(Iterator<T> records) {
Path path = pathFactory.newPath();
try {
- PositionOutputStream out = fileIO.newOutputStream(path, false);
- long pos;
- try {
- try (FormatWriter writer = writerFactory.create(out,
compression)) {
+ if (writerFactory instanceof SupportsDirectWrite) {
+ try (FormatWriter writer =
+ ((SupportsDirectWrite) writerFactory).create(fileIO,
path, compression)) {
while (records.hasNext()) {
writer.addElement(serializer.toRow(records.next()));
}
}
- } finally {
- pos = out.getPos();
- out.close();
+ return Pair.of(path.getName(), fileIO.getFileSize(path));
+ } else {
+ PositionOutputStream out = fileIO.newOutputStream(path, false);
+ long pos;
+ try {
+ try (FormatWriter writer = writerFactory.create(out,
compression)) {
+ while (records.hasNext()) {
+
writer.addElement(serializer.toRow(records.next()));
+ }
+ }
+ } finally {
+ pos = out.getPos();
+ out.close();
+ }
+ return Pair.of(path.getName(), pos);
}
- return Pair.of(path.getName(), pos);
} catch (Throwable e) {
fileIO.deleteQuietly(path);
throw new RuntimeException(
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
index 80a03b2e95..9273952fed 100644
---
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
@@ -125,6 +125,10 @@ public class JindoFileIO extends HadoopCompliantFileIO {
}
}
+ public Options hadoopOptions() {
+ return hadoopOptions;
+ }
+
@Override
protected Pair<JindoHadoopSystem, String>
createFileSystem(org.apache.hadoop.fs.Path path) {
final String scheme = path.toUri().getScheme();
diff --git
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
index bbaf2d672c..f4e28bc14c 100644
---
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
+++
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
@@ -115,6 +115,10 @@ public class OSSFileIO extends HadoopCompliantFileIO {
}
}
+ public Options hadoopOptions() {
+ return hadoopOptions;
+ }
+
@Override
protected AliyunOSSFileSystem createFileSystem(org.apache.hadoop.fs.Path
path) {
final String scheme = path.toUri().getScheme();