This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new ccd33b1701 [core] Make HadoopFileIO determine isObjectStore based on
the path (#6393)
ccd33b1701 is described below
commit ccd33b1701b619c9d49f77d0bab2fa5b832d0595
Author: Zouxxyy <[email protected]>
AuthorDate: Tue Oct 14 16:28:11 2025 +0800
[core] Make HadoopFileIO determine isObjectStore based on the path (#6393)
---
.../org/apache/paimon/fs/hadoop/HadoopFileIO.java | 16 +++++++++++----
.../paimon/fs/hadoop/HadoopFileIOLoader.java | 2 +-
.../paimon/fs/hadoop/HadoopViewFsFileIOLoader.java | 2 +-
.../java/org/apache/paimon/utils/FileIOUtils.java | 23 ++++++++++++++++++++++
.../paimon/fs/HadoopLocalFileIOBehaviorTest.java | 4 ++--
.../org/apache/paimon/fs/HdfsBehaviorTest.java | 23 ++++++++++++++++++++--
.../fs/hadoop/HadoopSecuredFileSystemTest.java | 3 ++-
.../java/org/apache/paimon/flink/FlinkFileIO.java | 23 ++--------------------
8 files changed, 64 insertions(+), 32 deletions(-)
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
index 918ef5f5c4..49ca2cdc87 100644
--- a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
+++ b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIO.java
@@ -27,6 +27,7 @@ import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.RemoteIterator;
import org.apache.paimon.fs.SeekableInputStream;
import org.apache.paimon.hadoop.SerializableConfiguration;
+import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.FunctionWithException;
import org.apache.paimon.utils.Pair;
import org.apache.paimon.utils.ReflectionUtils;
@@ -43,6 +44,7 @@ import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.URI;
import java.nio.charset.StandardCharsets;
+import java.util.Locale;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
@@ -58,15 +60,21 @@ public class HadoopFileIO implements FileIO {
protected transient volatile Map<Pair<String, String>, FileSystem> fsMap;
+ private final Path path;
+
+ public HadoopFileIO(Path path) {
+ this.path = path;
+ }
+
@VisibleForTesting
- public void setFileSystem(Path path, FileSystem fs) throws IOException {
- org.apache.hadoop.fs.Path hadoopPath = path(path);
- getFileSystem(hadoopPath, p -> fs);
+ public void setFileSystem(FileSystem fs) throws IOException {
+ getFileSystem(path(path), p -> fs);
}
@Override
public boolean isObjectStore() {
- return false;
+ String scheme = path.toUri().getScheme().toLowerCase(Locale.US);
+ return FileIOUtils.isObjectStore(scheme);
}
@Override
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIOLoader.java
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIOLoader.java
index 00341ac285..2caea822e0 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIOLoader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopFileIOLoader.java
@@ -33,6 +33,6 @@ public class HadoopFileIOLoader implements FileIOLoader {
@Override
public HadoopFileIO load(Path path) {
- return new HadoopFileIO();
+ return new HadoopFileIO(path);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopViewFsFileIOLoader.java
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopViewFsFileIOLoader.java
index c60bc35d81..803af22863 100644
---
a/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopViewFsFileIOLoader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/fs/hadoop/HadoopViewFsFileIOLoader.java
@@ -33,6 +33,6 @@ public class HadoopViewFsFileIOLoader implements FileIOLoader
{
@Override
public HadoopFileIO load(Path path) {
- return new HadoopFileIO();
+ return new HadoopFileIO(path);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java
b/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java
index 31162c3866..0c3559368d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/FileIOUtils.java
@@ -382,4 +382,27 @@ public class FileIOUtils {
} catch (Exception ignored) {
}
}
+
+ public static boolean isObjectStore(String scheme) {
+ if (scheme.startsWith("s3")
+ || scheme.startsWith("emr")
+ || scheme.startsWith("oss")
+ || scheme.startsWith("wasb")
+ || scheme.startsWith("abfs")
+ || scheme.startsWith("gs")
+ || scheme.startsWith("cosn")) {
+ // the Amazon S3 storage or Aliyun OSS storage or Azure Blob
Storage
+ // or Google Cloud Storage
+ return true;
+ } else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
+ // file servers instead of file systems
+ // they might actually be consistent, but we have no hard
guarantees
+ // currently to rely on that
+ return true;
+ } else {
+ // the remainder should include hdfs, kosmos, ceph, ...
+ // this also includes federated HDFS (viewfs).
+ return false;
+ }
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopLocalFileIOBehaviorTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopLocalFileIOBehaviorTest.java
index df94dd2a80..6f6ed70b1a 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fs/HadoopLocalFileIOBehaviorTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/HadoopLocalFileIOBehaviorTest.java
@@ -38,8 +38,8 @@ class HadoopLocalFileIOBehaviorTest extends
FileIOBehaviorTestBase {
protected FileIO getFileSystem() throws Exception {
org.apache.hadoop.fs.FileSystem fs = new RawLocalFileSystem();
fs.initialize(URI.create("file:///"), new Configuration());
- HadoopFileIO fileIO = new HadoopFileIO();
- fileIO.setFileSystem(getBasePath(), fs);
+ HadoopFileIO fileIO = new HadoopFileIO(getBasePath());
+ fileIO.setFileSystem(fs);
return fileIO;
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
index 56bfe052ec..b719038caa 100644
--- a/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
+++ b/paimon-common/src/test/java/org/apache/paimon/fs/HdfsBehaviorTest.java
@@ -28,11 +28,14 @@ import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
+import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.ObjectOutputStream;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
/** Behavior tests for HDFS. */
class HdfsBehaviorTest extends FileIOBehaviorTestBase {
@@ -62,8 +65,8 @@ class HdfsBehaviorTest extends FileIOBehaviorTestBase {
org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem();
basePath = new Path(hdfs.getUri().toString() + "/tests");
- fs = new HadoopFileIO();
- fs.setFileSystem(basePath, hdfs);
+ fs = new HadoopFileIO(basePath);
+ fs.setFileSystem(hdfs);
}
@AfterAll
@@ -102,4 +105,20 @@ class HdfsBehaviorTest extends FileIOBehaviorTestBase {
public void testAtomicWriteMultipleThreads() throws InterruptedException {
FileIOTest.testOverwriteFileUtf8(new Path(getBasePath(),
randomName()), fs);
}
+
+ @Test
+ public void testIsObjectStore() {
+ assertThat(fs.isObjectStore()).isEqualTo(false);
+ }
+
+ @Test
+ public void testSerializable() {
+ assertDoesNotThrow(
+ () -> {
+ try (ByteArrayOutputStream baos = new
ByteArrayOutputStream();
+ ObjectOutputStream oos = new
ObjectOutputStream(baos)) {
+ oos.writeObject(fs);
+ }
+ });
+ }
}
diff --git
a/paimon-common/src/test/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystemTest.java
b/paimon-common/src/test/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystemTest.java
index 1737434c8a..10d60f2bc4 100644
---
a/paimon-common/src/test/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystemTest.java
+++
b/paimon-common/src/test/java/org/apache/paimon/fs/hadoop/HadoopSecuredFileSystemTest.java
@@ -19,6 +19,7 @@
package org.apache.paimon.fs.hadoop;
import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.Path;
import org.apache.paimon.options.Options;
import org.junit.jupiter.api.Test;
@@ -41,7 +42,7 @@ public class HadoopSecuredFileSystemTest {
options.set("security.kerberos.login.principal", "test-user");
options.set("security.kerberos.login.keytab",
keytabFile.getAbsolutePath());
- HadoopFileIO fileIO = new HadoopFileIO();
+ HadoopFileIO fileIO = new HadoopFileIO(new Path("file:///tmp/test"));
fileIO.configure(CatalogContext.create(options));
assertThat(fileIO.getFileSystem(new
org.apache.hadoop.fs.Path("file:///tmp/test")))
.isInstanceOf(HadoopSecuredFileSystem.class);
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
index 4819967749..24651644c9 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkFileIO.java
@@ -24,6 +24,7 @@ import org.apache.paimon.fs.FileStatus;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.utils.FileIOUtils;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
@@ -50,27 +51,7 @@ public class FlinkFileIO implements FileIO {
try {
FileSystem fs = path.getFileSystem();
String scheme = fs.getUri().getScheme().toLowerCase(Locale.US);
-
- if (scheme.startsWith("s3")
- || scheme.startsWith("emr")
- || scheme.startsWith("oss")
- || scheme.startsWith("wasb")
- || scheme.startsWith("abfs")
- || scheme.startsWith("gs")
- || scheme.startsWith("cosn")) {
- // the Amazon S3 storage or Aliyun OSS storage or Azure Blob
Storage
- // or Google Cloud Storage
- return true;
- } else if (scheme.startsWith("http") || scheme.startsWith("ftp")) {
- // file servers instead of file systems
- // they might actually be consistent, but we have no hard
guarantees
- // currently to rely on that
- return true;
- } else {
- // the remainder should include hdfs, kosmos, ceph, ...
- // this also includes federated HDFS (viewfs).
- return false;
- }
+ return FileIOUtils.isObjectStore(scheme);
} catch (IOException e) {
throw new UncheckedIOException(e);
}