This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 92a6b272a1 [fs] Add JindoFileIO access to OSS without dependency
(#5303)
92a6b272a1 is described below
commit 92a6b272a1b850e60bb2550906c5a3624ae11460
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 19 10:46:59 2025 +0800
[fs] Add JindoFileIO access to OSS without dependency (#5303)
---
docs/content/maintenance/filesystems.md | 9 +
docs/content/project/download.md | 18 +-
paimon-filesystems/paimon-jindo/pom.xml | 92 ++++++
.../apache/paimon/jindo/HadoopCompliantFileIO.java | 312 +++++++++++++++++++++
.../java/org/apache/paimon/jindo/JindoFileIO.java | 203 ++++++++++++++
.../java/org/apache/paimon/jindo/JindoLoader.java | 49 ++++
.../services/org.apache.paimon.fs.FileIOLoader | 16 ++
paimon-filesystems/pom.xml | 1 +
8 files changed, 692 insertions(+), 8 deletions(-)
diff --git a/docs/content/maintenance/filesystems.md
b/docs/content/maintenance/filesystems.md
index f987747fd8..cdc27657af 100644
--- a/docs/content/maintenance/filesystems.md
+++ b/docs/content/maintenance/filesystems.md
@@ -256,6 +256,15 @@ Please note that:
{{< /tab >}}
{{< /tabs >}}
+If you environment has jindo sdk dependencies, you can use Jindo Fs to connect
OSS. Jindo has better read and write efficiency.
+
+{{< stable >}}
+Download [paimon-jindo-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-jindo/{{<
version >}}/paimon-jindo-{{< version >}}.jar).
+{{< /stable >}}
+{{< unstable >}}
+Download [paimon-jindo-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-jindo/{{<
version >}}/).
+{{< /unstable >}}
+
## S3
{{< stable >}}
diff --git a/docs/content/project/download.md b/docs/content/project/download.md
index 23d0112b09..d0b92107f5 100644
--- a/docs/content/project/download.md
+++ b/docs/content/project/download.md
@@ -82,19 +82,21 @@ This documentation is a guide for downloading Paimon Jars.
{{< unstable >}}
-| Version | Jar
|
-|------------|-------------------------------------------------------------------------------------------------------------------------|
-| paimon-oss | [paimon-oss-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-oss/{{<
version >}}/) |
-| paimon-s3 | [paimon-s3-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-s3/{{<
version >}}/) |
+| Version | Jar
|
+|--------------|-----------------------------------------------------------------------------------------------------------------------------|
+| paimon-oss | [paimon-oss-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-oss/{{<
version >}}/) |
+| paimon-jindo | [paimon-jindo-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-jindo/{{<
version >}}/) |
+| paimon-s3 | [paimon-s3-{{< version
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-s3/{{<
version >}}/) |
{{< /unstable >}}
{{< stable >}}
-| Version | Jar
|
-|------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
-| paimon-oss | [paimon-oss-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/{{<
version >}}/paimon-oss-{{< version >}}.jar) |
-| paimon-s3 | [paimon-s3-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-s3/{{<
version >}}/paimon-s3-{{< version >}}.jar) |
+| Version | Jar
|
+|--------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|
+| paimon-oss | [paimon-oss-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/{{<
version >}}/paimon-oss-{{< version >}}.jar) |
+| paimon-jindo | [paimon-jindo-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-jindo/{{<
version >}}/paimon-jindo-{{< version >}}.jar) |
+| paimon-s3 | [paimon-s3-{{< version
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-s3/{{<
version >}}/paimon-s3-{{< version >}}.jar) |
{{< /stable >}}
diff --git a/paimon-filesystems/paimon-jindo/pom.xml
b/paimon-filesystems/paimon-jindo/pom.xml
new file mode 100644
index 0000000000..14320c1622
--- /dev/null
+++ b/paimon-filesystems/paimon-jindo/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one
+ ~ or more contributor license agreements. See the NOTICE file
+ ~ distributed with this work for additional information
+ ~ regarding copyright ownership. The ASF licenses this file
+ ~ to you under the Apache License, Version 2.0 (the
+ ~ "License"); you may not use this file except in compliance
+ ~ with the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <artifactId>paimon-filesystems</artifactId>
+ <groupId>org.apache.paimon</groupId>
+ <version>1.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>paimon-jindo</artifactId>
+ <name>Paimon : FileSystems : Jindo</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <jindodata.version>6.5.6</jindodata.version>
+ </properties>
+
+ <repositories>
+ <repository>
+ <id>jindodata</id>
+
<url>https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/</url>
+ </repository>
+ </repositories>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-common</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.aliyun.jindodata</groupId>
+ <artifactId>jindo-core</artifactId>
+ <version>${jindodata.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>com.aliyun.jindodata</groupId>
+ <artifactId>jindo-sdk</artifactId>
+ <version>${jindodata.version}</version>
+ <scope>provided</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>*</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+</project>
\ No newline at end of file
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
new file mode 100644
index 0000000000..030d845b3b
--- /dev/null
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.VectoredReadable;
+import org.apache.paimon.utils.Pair;
+
+import com.aliyun.jindodata.common.JindoHadoopSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Hadoop {@link FileIO}.
+ *
+ * <p>Important: copy this class from HadoopFileIO here to avoid class loader
conflicts.
+ */
+public abstract class HadoopCompliantFileIO implements FileIO {
+
+ private static final long serialVersionUID = 1L;
+
+ protected transient volatile Map<String, Pair<JindoHadoopSystem, String>>
fsMap;
+
+ @Override
+ public SeekableInputStream newInputStream(Path path) throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ Pair<JindoHadoopSystem, String> pair = getFileSystemPair(hadoopPath);
+ JindoHadoopSystem fs = pair.getKey();
+ String sysType = pair.getValue();
+ FSDataInputStream fsInput = fs.open(hadoopPath);
+ return "jobj".equalsIgnoreCase(sysType)
+ ? new VectoredReadableInputStream(fsInput)
+ : new HadoopSeekableInputStream(fsInput);
+ }
+
+ @Override
+ public PositionOutputStream newOutputStream(Path path, boolean overwrite)
throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ return new HadoopPositionOutputStream(
+ getFileSystem(hadoopPath).create(hadoopPath, overwrite));
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path path) throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ return new
HadoopFileStatus(getFileSystem(hadoopPath).getFileStatus(hadoopPath));
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path path) throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ FileStatus[] statuses = new FileStatus[0];
+ org.apache.hadoop.fs.FileStatus[] hadoopStatuses =
+ getFileSystem(hadoopPath).listStatus(hadoopPath);
+ if (hadoopStatuses != null) {
+ statuses = new FileStatus[hadoopStatuses.length];
+ for (int i = 0; i < hadoopStatuses.length; i++) {
+ statuses[i] = new HadoopFileStatus(hadoopStatuses[i]);
+ }
+ }
+ return statuses;
+ }
+
+ @Override
+ public boolean exists(Path path) throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ return getFileSystem(hadoopPath).exists(hadoopPath);
+ }
+
+ @Override
+ public boolean delete(Path path, boolean recursive) throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ return getFileSystem(hadoopPath).delete(hadoopPath, recursive);
+ }
+
+ @Override
+ public boolean mkdirs(Path path) throws IOException {
+ org.apache.hadoop.fs.Path hadoopPath = path(path);
+ return getFileSystem(hadoopPath).mkdirs(hadoopPath);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ org.apache.hadoop.fs.Path hadoopSrc = path(src);
+ org.apache.hadoop.fs.Path hadoopDst = path(dst);
+ return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
+ }
+
+ private org.apache.hadoop.fs.Path path(Path path) {
+ URI uri = path.toUri();
+ if (uri.getScheme().equals("oss") && uri.getUserInfo() != null) {
+ path = new Path("oss:/" + uri.getPath());
+ }
+ return new org.apache.hadoop.fs.Path(path.toUri());
+ }
+
+ private JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path)
throws IOException {
+ return getFileSystemPair(path).getKey();
+ }
+
+ private Pair<JindoHadoopSystem, String>
getFileSystemPair(org.apache.hadoop.fs.Path path)
+ throws IOException {
+ if (fsMap == null) {
+ synchronized (this) {
+ if (fsMap == null) {
+ fsMap = new ConcurrentHashMap<>();
+ }
+ }
+ }
+
+ Map<String, Pair<JindoHadoopSystem, String>> map = fsMap;
+
+ String authority = path.toUri().getAuthority();
+ if (authority == null) {
+ authority = "DEFAULT";
+ }
+ try {
+ return map.computeIfAbsent(
+ authority,
+ k -> {
+ try {
+ return createFileSystem(path);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ } catch (UncheckedIOException e) {
+ throw e.getCause();
+ }
+ }
+
+ protected abstract Pair<JindoHadoopSystem, String> createFileSystem(
+ org.apache.hadoop.fs.Path path) throws IOException;
+
+ private static class HadoopSeekableInputStream extends SeekableInputStream
{
+
+ private static final int MIN_SKIP_BYTES = 1024 * 1024;
+
+ protected final FSDataInputStream in;
+
+ private HadoopSeekableInputStream(FSDataInputStream in) {
+ this.in = in;
+ }
+
+ @Override
+ public void seek(long seekPos) throws IOException {
+ // We do some optimizations to avoid that some implementations of
distributed FS perform
+ // expensive seeks when they are actually not needed.
+ long delta = seekPos - getPos();
+
+ if (delta > 0L && delta <= MIN_SKIP_BYTES) {
+ // Instead of a small forward seek, we skip over the gap
+ skipFully(delta);
+ } else if (delta != 0L) {
+ // For larger gaps and backward seeks, we do a real seek
+ forceSeek(seekPos);
+ } // Do nothing if delta is zero.
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return in.getPos();
+ }
+
+ @Override
+ public int read() throws IOException {
+ return in.read();
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ return in.read(b, off, len);
+ }
+
+ @Override
+ public void close() throws IOException {
+ in.close();
+ }
+
+ /**
+ * Positions the stream to the given location. In contrast to {@link
#seek(long)}, this
+ * method will always issue a "seek" command to the dfs and may not
replace it by {@link
+ * #skip(long)} for small seeks.
+ *
+ * <p>Notice that the underlying DFS implementation can still decide
to do skip instead of
+ * seek.
+ *
+ * @param seekPos the position to seek to.
+ */
+ public void forceSeek(long seekPos) throws IOException {
+ in.seek(seekPos);
+ }
+
+ /**
+ * Skips over a given amount of bytes in the stream.
+ *
+ * @param bytes the number of bytes to skip.
+ */
+ public void skipFully(long bytes) throws IOException {
+ while (bytes > 0) {
+ bytes -= in.skip(bytes);
+ }
+ }
+ }
+
+ private static class VectoredReadableInputStream extends
HadoopSeekableInputStream
+ implements VectoredReadable {
+
+ private VectoredReadableInputStream(FSDataInputStream in) {
+ super(in);
+ }
+
+ @Override
+ public int pread(long position, byte[] bytes, int off, int len) throws
IOException {
+ return in.read(position, bytes, off, len);
+ }
+ }
+
+ private static class HadoopPositionOutputStream extends
PositionOutputStream {
+
+ private final FSDataOutputStream out;
+
+ private HadoopPositionOutputStream(FSDataOutputStream out) {
+ this.out = out;
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ return out.getPos();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b) throws IOException {
+ out.write(b);
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ out.hflush();
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ }
+ }
+
+ private static class HadoopFileStatus implements FileStatus {
+
+ private final org.apache.hadoop.fs.FileStatus status;
+
+ private HadoopFileStatus(org.apache.hadoop.fs.FileStatus status) {
+ this.status = status;
+ }
+
+ @Override
+ public long getLen() {
+ return status.getLen();
+ }
+
+ @Override
+ public boolean isDir() {
+ return status.isDirectory();
+ }
+
+ @Override
+ public Path getPath() {
+ return new Path(status.getPath().toUri());
+ }
+
+ @Override
+ public long getModificationTime() {
+ return status.getModificationTime();
+ }
+ }
+}
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
new file mode 100644
index 0000000000..423f6fb7ec
--- /dev/null
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Pair;
+
+import com.aliyun.jindodata.common.JindoHadoopSystem;
+import com.aliyun.jindodata.dls.JindoDlsFileSystem;
+import com.aliyun.jindodata.oss.JindoOssFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
+
+/** Jindo {@link FileIO}. */
+public class JindoFileIO extends HadoopCompliantFileIO {
+
+ private static final long serialVersionUID = 2L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(JindoFileIO.class);
+
+ /**
+ * In order to simplify, we make paimon oss configuration keys same with
hadoop oss module. So,
+ * we add all configuration key with prefix `fs.` in paimon conf to hadoop
conf.
+ *
+ * <p>Not use fs.oss because this FileIO also access dlf/dls scheme.
+ */
+ private static final String[] CONFIG_PREFIXES = {"fs."};
+
+ private static final String OSS_ACCESS_KEY_ID = "fs.oss.accessKeyId";
+ private static final String OSS_ACCESS_KEY_SECRET =
"fs.oss.accessKeySecret";
+ private static final String OSS_SECURITY_TOKEN = "fs.oss.securityToken";
+
+ private static final Map<String, String> CASE_SENSITIVE_KEYS =
+ new HashMap<String, String>() {
+ {
+ put(OSS_ACCESS_KEY_ID.toLowerCase(), OSS_ACCESS_KEY_ID);
+ put(OSS_ACCESS_KEY_SECRET.toLowerCase(),
OSS_ACCESS_KEY_SECRET);
+ put(OSS_SECURITY_TOKEN.toLowerCase(), OSS_SECURITY_TOKEN);
+ }
+ };
+
+ /**
+ * Cache JindoOssFileSystem, at present, there is no good mechanism to
ensure that the file
+ * system will be shut down, so here the fs cache is used to avoid
resource leakage.
+ */
+ private static final Map<CacheKey, Pair<JindoHadoopSystem, String>> CACHE =
+ new ConcurrentHashMap<>();
+
+ private Options hadoopOptions;
+ private boolean allowCache = true;
+
+ @Override
+ public boolean isObjectStore() {
+ return true;
+ }
+
+ @Override
+ public void configure(CatalogContext context) {
+ allowCache = context.options().get(FILE_IO_ALLOW_CACHE);
+ hadoopOptions = new Options();
+ //
https://github.com/aliyun/alibabacloud-jindodata/blob/master/docs/user/4.x/4.6.x/4.6.1/oss/hadoop/jindosdk_ide_hadoop.md
+ hadoopOptions.set("fs.oss.impl",
"com.aliyun.jindodata.oss.JindoOssFileSystem");
+ hadoopOptions.set("fs.AbstractFileSystem.oss.impl",
"com.aliyun.jindodata.oss.OSS");
+
+ // Misalignment can greatly affect performance, so the maximum buffer
is set here
+ hadoopOptions.set("fs.oss.read.position.buffer.size", "8388608");
+
+ // read all configuration with prefix 'CONFIG_PREFIXES'
+ for (String key : context.options().keySet()) {
+ for (String prefix : CONFIG_PREFIXES) {
+ if (key.startsWith(prefix)) {
+ String value = context.options().get(key);
+ if (CASE_SENSITIVE_KEYS.containsKey(key.toLowerCase())) {
+ key = CASE_SENSITIVE_KEYS.get(key.toLowerCase());
+ }
+ hadoopOptions.set(key, value);
+
+ LOG.debug(
+ "Adding config entry for {} as {} to Hadoop
config",
+ key,
+ hadoopOptions.get(key));
+ }
+ }
+ }
+ }
+
+ @Override
+ protected Pair<JindoHadoopSystem, String>
createFileSystem(org.apache.hadoop.fs.Path path) {
+ final String scheme = path.toUri().getScheme();
+ final String authority = path.toUri().getAuthority();
+ Supplier<Pair<JindoHadoopSystem, String>> supplier =
+ () -> {
+ Configuration hadoopConf = new Configuration(false);
+ hadoopOptions.toMap().forEach(hadoopConf::set);
+ URI fsUri = path.toUri();
+ if (scheme == null && authority == null) {
+ fsUri = FileSystem.getDefaultUri(hadoopConf);
+ } else if (scheme != null && authority == null) {
+ URI defaultUri = FileSystem.getDefaultUri(hadoopConf);
+ if (scheme.equals(defaultUri.getScheme())
+ && defaultUri.getAuthority() != null) {
+ fsUri = defaultUri;
+ }
+ }
+
+ JindoHadoopSystem fs;
+ if ("oss".equals(scheme)) {
+ fs = new JindoOssFileSystem();
+ } else if ("dls".equals(scheme)) {
+ fs = new JindoDlsFileSystem();
+ } else {
+ throw new RuntimeException(
+ "Unsupported scheme for Jindo FileSystem: " +
scheme);
+ }
+
+ try {
+ fs.initialize(fsUri, hadoopConf);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ return Pair.of(fs, fs.getSysType(path).getSysType());
+ };
+
+ if (allowCache) {
+ return CACHE.computeIfAbsent(
+ new CacheKey(hadoopOptions, scheme, authority), key ->
supplier.get());
+ } else {
+ return supplier.get();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (!allowCache) {
+
fsMap.values().stream().map(Pair::getKey).forEach(IOUtils::closeQuietly);
+ fsMap.clear();
+ }
+ }
+
+ private static class CacheKey {
+
+ private final Options options;
+ private final String scheme;
+ private final String authority;
+
+ private CacheKey(Options options, String scheme, String authority) {
+ this.options = options;
+ this.scheme = scheme;
+ this.authority = authority;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CacheKey cacheKey = (CacheKey) o;
+ return Objects.equals(options, cacheKey.options)
+ && Objects.equals(scheme, cacheKey.scheme)
+ && Objects.equals(authority, cacheKey.authority);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(options, scheme, authority);
+ }
+ }
+}
diff --git
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoLoader.java
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoLoader.java
new file mode 100644
index 0000000000..04a5b430f5
--- /dev/null
+++
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoLoader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOLoader;
+import org.apache.paimon.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** File loader for {@link JindoFileIO}. */
+public class JindoLoader implements FileIOLoader {
+
+ @Override
+ public List<String[]> requiredOptions() {
+ List<String[]> options = new ArrayList<>();
+ options.add(new String[] {"fs.oss.endpoint"});
+ options.add(new String[] {"fs.oss.accessKeyId"});
+ options.add(new String[] {"fs.oss.accessKeySecret"});
+ return options;
+ }
+
+ @Override
+ public String getScheme() {
+ return "oss";
+ }
+
+ @Override
+ public FileIO load(Path path) {
+ return new JindoFileIO();
+ }
+}
diff --git
a/paimon-filesystems/paimon-jindo/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
b/paimon-filesystems/paimon-jindo/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
new file mode 100644
index 0000000000..ce136e2263
--- /dev/null
+++
b/paimon-filesystems/paimon-jindo/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.jindo.JindoLoader
diff --git a/paimon-filesystems/pom.xml b/paimon-filesystems/pom.xml
index a6c4c310aa..7e431c4919 100644
--- a/paimon-filesystems/pom.xml
+++ b/paimon-filesystems/pom.xml
@@ -39,6 +39,7 @@
<module>paimon-s3-impl</module>
<module>paimon-cosn</module>
<module>paimon-cosn-impl</module>
+ <module>paimon-jindo</module>
</modules>
<properties>