This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c2e7d2dd6d [core] Support format reader and writer use File IO 
directly (#5760)
c2e7d2dd6d is described below

commit c2e7d2dd6dcda132a9dc7b03c291371015698dcb
Author: YeJunHao <[email protected]>
AuthorDate: Fri Jun 27 17:11:57 2025 +0800

    [core] Support format reader and writer use File IO directly (#5760)
---
 .../apache/paimon/format/SupportsDirectWrite.java  | 30 ++++++++++++++++++++++
 .../org/apache/paimon/io/SingleFileWriter.java     | 13 +++++++---
 .../java/org/apache/paimon/utils/ObjectsFile.java  | 27 +++++++++++++------
 .../java/org/apache/paimon/jindo/JindoFileIO.java  |  4 +++
 .../main/java/org/apache/paimon/oss/OSSFileIO.java |  4 +++
 5 files changed, 66 insertions(+), 12 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java 
b/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
new file mode 100644
index 0000000000..76ac9a8ba1
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/format/SupportsDirectWrite.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+
+import java.io.IOException;
+
+/** Creaet a FormatWriter which has full control abort file io. */
+public interface SupportsDirectWrite {
+
+    FormatWriter create(FileIO fileIO, Path path, String compression) throws 
IOException;
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java 
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index ec80244113..c9bf104249 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.BundleFormatWriter;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SupportsDirectWrite;
 import org.apache.paimon.fs.AsyncPositionOutputStream;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
@@ -68,11 +69,15 @@ public abstract class SingleFileWriter<T, R> implements 
FileWriter<T, R> {
         this.converter = converter;
 
         try {
-            out = fileIO.newOutputStream(path, false);
-            if (asyncWrite) {
-                out = new AsyncPositionOutputStream(out);
+            if (factory instanceof SupportsDirectWrite) {
+                writer = ((SupportsDirectWrite) factory).create(fileIO, path, 
compression);
+            } else {
+                out = fileIO.newOutputStream(path, false);
+                if (asyncWrite) {
+                    out = new AsyncPositionOutputStream(out);
+                }
+                writer = factory.create(out, compression);
             }
-            writer = factory.create(out, compression);
         } catch (IOException e) {
             LOG.warn(
                     "Failed to open the bulk writer, closing the output stream 
and throw the error.",
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 241fd5eb90..5e777ad19f 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -22,6 +22,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.format.FormatWriter;
 import org.apache.paimon.format.FormatWriterFactory;
+import org.apache.paimon.format.SupportsDirectWrite;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.PositionOutputStream;
@@ -156,19 +157,29 @@ public class ObjectsFile<T> implements 
SimpleFileReader<T> {
     protected Pair<String, Long> writeWithoutRolling(Iterator<T> records) {
         Path path = pathFactory.newPath();
         try {
-            PositionOutputStream out = fileIO.newOutputStream(path, false);
-            long pos;
-            try {
-                try (FormatWriter writer = writerFactory.create(out, 
compression)) {
+            if (writerFactory instanceof SupportsDirectWrite) {
+                try (FormatWriter writer =
+                        ((SupportsDirectWrite) writerFactory).create(fileIO, 
path, compression)) {
                     while (records.hasNext()) {
                         writer.addElement(serializer.toRow(records.next()));
                     }
                 }
-            } finally {
-                pos = out.getPos();
-                out.close();
+                return Pair.of(path.getName(), fileIO.getFileSize(path));
+            } else {
+                PositionOutputStream out = fileIO.newOutputStream(path, false);
+                long pos;
+                try {
+                    try (FormatWriter writer = writerFactory.create(out, 
compression)) {
+                        while (records.hasNext()) {
+                            
writer.addElement(serializer.toRow(records.next()));
+                        }
+                    }
+                } finally {
+                    pos = out.getPos();
+                    out.close();
+                }
+                return Pair.of(path.getName(), pos);
             }
-            return Pair.of(path.getName(), pos);
         } catch (Throwable e) {
             fileIO.deleteQuietly(path);
             throw new RuntimeException(
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
index 80a03b2e95..9273952fed 100644
--- 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
@@ -125,6 +125,10 @@ public class JindoFileIO extends HadoopCompliantFileIO {
         }
     }
 
+    public Options hadoopOptions() {
+        return hadoopOptions;
+    }
+
     @Override
     protected Pair<JindoHadoopSystem, String> 
createFileSystem(org.apache.hadoop.fs.Path path) {
         final String scheme = path.toUri().getScheme();
diff --git 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
index bbaf2d672c..f4e28bc14c 100644
--- 
a/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
+++ 
b/paimon-filesystems/paimon-oss-impl/src/main/java/org/apache/paimon/oss/OSSFileIO.java
@@ -115,6 +115,10 @@ public class OSSFileIO extends HadoopCompliantFileIO {
         }
     }
 
+    public Options hadoopOptions() {
+        return hadoopOptions;
+    }
+
     @Override
     protected AliyunOSSFileSystem createFileSystem(org.apache.hadoop.fs.Path 
path) {
         final String scheme = path.toUri().getScheme();

Reply via email to