This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 92a6b272a1 [fs] Add JindoFileIO access to OSS without dependency 
(#5303)
92a6b272a1 is described below

commit 92a6b272a1b850e60bb2550906c5a3624ae11460
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 19 10:46:59 2025 +0800

    [fs] Add JindoFileIO access to OSS without dependency (#5303)
---
 docs/content/maintenance/filesystems.md            |   9 +
 docs/content/project/download.md                   |  18 +-
 paimon-filesystems/paimon-jindo/pom.xml            |  92 ++++++
 .../apache/paimon/jindo/HadoopCompliantFileIO.java | 312 +++++++++++++++++++++
 .../java/org/apache/paimon/jindo/JindoFileIO.java  | 203 ++++++++++++++
 .../java/org/apache/paimon/jindo/JindoLoader.java  |  49 ++++
 .../services/org.apache.paimon.fs.FileIOLoader     |  16 ++
 paimon-filesystems/pom.xml                         |   1 +
 8 files changed, 692 insertions(+), 8 deletions(-)

diff --git a/docs/content/maintenance/filesystems.md 
b/docs/content/maintenance/filesystems.md
index f987747fd8..cdc27657af 100644
--- a/docs/content/maintenance/filesystems.md
+++ b/docs/content/maintenance/filesystems.md
@@ -256,6 +256,15 @@ Please note that:
 {{< /tab >}}
 {{< /tabs >}}
 
+If you environment has jindo sdk dependencies, you can use Jindo Fs to connect 
OSS. Jindo has better read and write efficiency.
+
+{{< stable >}}
+Download [paimon-jindo-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-jindo/{{<
 version >}}/paimon-jindo-{{< version >}}.jar).
+{{< /stable >}}
+{{< unstable >}}
+Download [paimon-jindo-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-jindo/{{<
 version >}}/).
+{{< /unstable >}}
+
 ## S3
 
 {{< stable >}}
diff --git a/docs/content/project/download.md b/docs/content/project/download.md
index 23d0112b09..d0b92107f5 100644
--- a/docs/content/project/download.md
+++ b/docs/content/project/download.md
@@ -82,19 +82,21 @@ This documentation is a guide for downloading Paimon Jars.
 
 {{< unstable >}}
 
-| Version    | Jar                                                             
                                                        |
-|------------|-------------------------------------------------------------------------------------------------------------------------|
-| paimon-oss | [paimon-oss-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-oss/{{<
 version >}}/) |
-| paimon-s3  | [paimon-s3-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-s3/{{<
 version >}}/)   |
+| Version      | Jar                                                           
                                                              |
+|--------------|-----------------------------------------------------------------------------------------------------------------------------|
+| paimon-oss   | [paimon-oss-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-oss/{{<
 version >}}/)     |
+| paimon-jindo | [paimon-jindo-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-jindo/{{<
 version >}}/) |
+| paimon-s3    | [paimon-s3-{{< version 
>}}.jar](https://repository.apache.org/snapshots/org/apache/paimon/paimon-s3/{{<
 version >}}/)       |
 
 {{< /unstable >}}
 
 {{< stable >}}
 
-| Version    | Jar                                                             
                                                                                
   |
-|------------|----------------------------------------------------------------------------------------------------------------------------------------------------|
-| paimon-oss | [paimon-oss-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/{{< 
version >}}/paimon-oss-{{< version >}}.jar) |
-| paimon-s3  | [paimon-s3-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-s3/{{< 
version >}}/paimon-s3-{{< version >}}.jar)    |
+| Version      | Jar                                                           
                                                                                
           |
+|--------------|----------------------------------------------------------------------------------------------------------------------------------------------------------|
+| paimon-oss   | [paimon-oss-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-oss/{{< 
version >}}/paimon-oss-{{< version >}}.jar)       |
+| paimon-jindo | [paimon-jindo-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-jindo/{{<
 version >}}/paimon-jindo-{{< version >}}.jar) |
+| paimon-s3    | [paimon-s3-{{< version 
>}}.jar](https://repo.maven.apache.org/maven2/org/apache/paimon/paimon-s3/{{< 
version >}}/paimon-s3-{{< version >}}.jar)          |
 
 {{< /stable >}}
 
diff --git a/paimon-filesystems/paimon-jindo/pom.xml 
b/paimon-filesystems/paimon-jindo/pom.xml
new file mode 100644
index 0000000000..14320c1622
--- /dev/null
+++ b/paimon-filesystems/paimon-jindo/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one
+  ~ or more contributor license agreements.  See the NOTICE file
+  ~ distributed with this work for additional information
+  ~ regarding copyright ownership.  The ASF licenses this file
+  ~ to you under the Apache License, Version 2.0 (the
+  ~ "License"); you may not use this file except in compliance
+  ~ with the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>paimon-filesystems</artifactId>
+        <groupId>org.apache.paimon</groupId>
+        <version>1.1-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>paimon-jindo</artifactId>
+    <name>Paimon : FileSystems : Jindo</name>
+    <packaging>jar</packaging>
+
+    <properties>
+        <jindodata.version>6.5.6</jindodata.version>
+    </properties>
+
+    <repositories>
+        <repository>
+            <id>jindodata</id>
+            
<url>https://jindodata-binary.oss-cn-shanghai.aliyuncs.com/mvn-repo/</url>
+        </repository>
+    </repositories>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-common</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-common</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.aliyun.jindodata</groupId>
+            <artifactId>jindo-core</artifactId>
+            <version>${jindodata.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>com.aliyun.jindodata</groupId>
+            <artifactId>jindo-sdk</artifactId>
+            <version>${jindodata.version}</version>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>*</groupId>
+                    <artifactId>*</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+    </dependencies>
+</project>
\ No newline at end of file
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
new file mode 100644
index 0000000000..030d845b3b
--- /dev/null
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/HadoopCompliantFileIO.java
@@ -0,0 +1,312 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileStatus;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.SeekableInputStream;
+import org.apache.paimon.fs.VectoredReadable;
+import org.apache.paimon.utils.Pair;
+
+import com.aliyun.jindodata.common.JindoHadoopSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Hadoop {@link FileIO}.
+ *
+ * <p>Important: copy this class from HadoopFileIO here to avoid class loader 
conflicts.
+ */
+public abstract class HadoopCompliantFileIO implements FileIO {
+
+    private static final long serialVersionUID = 1L;
+
+    protected transient volatile Map<String, Pair<JindoHadoopSystem, String>> 
fsMap;
+
+    @Override
+    public SeekableInputStream newInputStream(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = path(path);
+        Pair<JindoHadoopSystem, String> pair = getFileSystemPair(hadoopPath);
+        JindoHadoopSystem fs = pair.getKey();
+        String sysType = pair.getValue();
+        FSDataInputStream fsInput = fs.open(hadoopPath);
+        return "jobj".equalsIgnoreCase(sysType)
+                ? new VectoredReadableInputStream(fsInput)
+                : new HadoopSeekableInputStream(fsInput);
+    }
+
+    @Override
+    public PositionOutputStream newOutputStream(Path path, boolean overwrite) 
throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = path(path);
+        return new HadoopPositionOutputStream(
+                getFileSystem(hadoopPath).create(hadoopPath, overwrite));
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = path(path);
+        return new 
HadoopFileStatus(getFileSystem(hadoopPath).getFileStatus(hadoopPath));
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = path(path);
+        FileStatus[] statuses = new FileStatus[0];
+        org.apache.hadoop.fs.FileStatus[] hadoopStatuses =
+                getFileSystem(hadoopPath).listStatus(hadoopPath);
+        if (hadoopStatuses != null) {
+            statuses = new FileStatus[hadoopStatuses.length];
+            for (int i = 0; i < hadoopStatuses.length; i++) {
+                statuses[i] = new HadoopFileStatus(hadoopStatuses[i]);
+            }
+        }
+        return statuses;
+    }
+
+    @Override
+    public boolean exists(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = path(path);
+        return getFileSystem(hadoopPath).exists(hadoopPath);
+    }
+
+    @Override
+    public boolean delete(Path path, boolean recursive) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = path(path);
+        return getFileSystem(hadoopPath).delete(hadoopPath, recursive);
+    }
+
+    @Override
+    public boolean mkdirs(Path path) throws IOException {
+        org.apache.hadoop.fs.Path hadoopPath = path(path);
+        return getFileSystem(hadoopPath).mkdirs(hadoopPath);
+    }
+
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+        org.apache.hadoop.fs.Path hadoopSrc = path(src);
+        org.apache.hadoop.fs.Path hadoopDst = path(dst);
+        return getFileSystem(hadoopSrc).rename(hadoopSrc, hadoopDst);
+    }
+
+    private org.apache.hadoop.fs.Path path(Path path) {
+        URI uri = path.toUri();
+        if (uri.getScheme().equals("oss") && uri.getUserInfo() != null) {
+            path = new Path("oss:/" + uri.getPath());
+        }
+        return new org.apache.hadoop.fs.Path(path.toUri());
+    }
+
+    private JindoHadoopSystem getFileSystem(org.apache.hadoop.fs.Path path) 
throws IOException {
+        return getFileSystemPair(path).getKey();
+    }
+
+    private Pair<JindoHadoopSystem, String> 
getFileSystemPair(org.apache.hadoop.fs.Path path)
+            throws IOException {
+        if (fsMap == null) {
+            synchronized (this) {
+                if (fsMap == null) {
+                    fsMap = new ConcurrentHashMap<>();
+                }
+            }
+        }
+
+        Map<String, Pair<JindoHadoopSystem, String>> map = fsMap;
+
+        String authority = path.toUri().getAuthority();
+        if (authority == null) {
+            authority = "DEFAULT";
+        }
+        try {
+            return map.computeIfAbsent(
+                    authority,
+                    k -> {
+                        try {
+                            return createFileSystem(path);
+                        } catch (IOException e) {
+                            throw new UncheckedIOException(e);
+                        }
+                    });
+        } catch (UncheckedIOException e) {
+            throw e.getCause();
+        }
+    }
+
+    protected abstract Pair<JindoHadoopSystem, String> createFileSystem(
+            org.apache.hadoop.fs.Path path) throws IOException;
+
+    private static class HadoopSeekableInputStream extends SeekableInputStream 
{
+
+        private static final int MIN_SKIP_BYTES = 1024 * 1024;
+
+        protected final FSDataInputStream in;
+
+        private HadoopSeekableInputStream(FSDataInputStream in) {
+            this.in = in;
+        }
+
+        @Override
+        public void seek(long seekPos) throws IOException {
+            // We do some optimizations to avoid that some implementations of 
distributed FS perform
+            // expensive seeks when they are actually not needed.
+            long delta = seekPos - getPos();
+
+            if (delta > 0L && delta <= MIN_SKIP_BYTES) {
+                // Instead of a small forward seek, we skip over the gap
+                skipFully(delta);
+            } else if (delta != 0L) {
+                // For larger gaps and backward seeks, we do a real seek
+                forceSeek(seekPos);
+            } // Do nothing if delta is zero.
+        }
+
+        @Override
+        public long getPos() throws IOException {
+            return in.getPos();
+        }
+
+        @Override
+        public int read() throws IOException {
+            return in.read();
+        }
+
+        @Override
+        public int read(byte[] b, int off, int len) throws IOException {
+            return in.read(b, off, len);
+        }
+
+        @Override
+        public void close() throws IOException {
+            in.close();
+        }
+
+        /**
+         * Positions the stream to the given location. In contrast to {@link 
#seek(long)}, this
+         * method will always issue a "seek" command to the dfs and may not 
replace it by {@link
+         * #skip(long)} for small seeks.
+         *
+         * <p>Notice that the underlying DFS implementation can still decide 
to do skip instead of
+         * seek.
+         *
+         * @param seekPos the position to seek to.
+         */
+        public void forceSeek(long seekPos) throws IOException {
+            in.seek(seekPos);
+        }
+
+        /**
+         * Skips over a given amount of bytes in the stream.
+         *
+         * @param bytes the number of bytes to skip.
+         */
+        public void skipFully(long bytes) throws IOException {
+            while (bytes > 0) {
+                bytes -= in.skip(bytes);
+            }
+        }
+    }
+
+    private static class VectoredReadableInputStream extends 
HadoopSeekableInputStream
+            implements VectoredReadable {
+
+        private VectoredReadableInputStream(FSDataInputStream in) {
+            super(in);
+        }
+
+        @Override
+        public int pread(long position, byte[] bytes, int off, int len) throws 
IOException {
+            return in.read(position, bytes, off, len);
+        }
+    }
+
+    private static class HadoopPositionOutputStream extends 
PositionOutputStream {
+
+        private final FSDataOutputStream out;
+
+        private HadoopPositionOutputStream(FSDataOutputStream out) {
+            this.out = out;
+        }
+
+        @Override
+        public long getPos() throws IOException {
+            return out.getPos();
+        }
+
+        @Override
+        public void write(int b) throws IOException {
+            out.write(b);
+        }
+
+        @Override
+        public void write(byte[] b) throws IOException {
+            out.write(b);
+        }
+
+        @Override
+        public void write(byte[] b, int off, int len) throws IOException {
+            out.write(b, off, len);
+        }
+
+        @Override
+        public void flush() throws IOException {
+            out.hflush();
+        }
+
+        @Override
+        public void close() throws IOException {
+            out.close();
+        }
+    }
+
+    private static class HadoopFileStatus implements FileStatus {
+
+        private final org.apache.hadoop.fs.FileStatus status;
+
+        private HadoopFileStatus(org.apache.hadoop.fs.FileStatus status) {
+            this.status = status;
+        }
+
+        @Override
+        public long getLen() {
+            return status.getLen();
+        }
+
+        @Override
+        public boolean isDir() {
+            return status.isDirectory();
+        }
+
+        @Override
+        public Path getPath() {
+            return new Path(status.getPath().toUri());
+        }
+
+        @Override
+        public long getModificationTime() {
+            return status.getModificationTime();
+        }
+    }
+}
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
new file mode 100644
index 0000000000..423f6fb7ec
--- /dev/null
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoFileIO.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.utils.Pair;
+
+import com.aliyun.jindodata.common.JindoHadoopSystem;
+import com.aliyun.jindodata.dls.JindoDlsFileSystem;
+import com.aliyun.jindodata.oss.JindoOssFileSystem;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Supplier;
+
+import static org.apache.paimon.options.CatalogOptions.FILE_IO_ALLOW_CACHE;
+
+/** Jindo {@link FileIO}. */
+public class JindoFileIO extends HadoopCompliantFileIO {
+
+    private static final long serialVersionUID = 2L;
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(JindoFileIO.class);
+
+    /**
+     * In order to simplify, we make paimon oss configuration keys same with 
hadoop oss module. So,
+     * we add all configuration key with prefix `fs.` in paimon conf to hadoop 
conf.
+     *
+     * <p>Not use fs.oss because this FileIO also access dlf/dls scheme.
+     */
+    private static final String[] CONFIG_PREFIXES = {"fs."};
+
+    private static final String OSS_ACCESS_KEY_ID = "fs.oss.accessKeyId";
+    private static final String OSS_ACCESS_KEY_SECRET = 
"fs.oss.accessKeySecret";
+    private static final String OSS_SECURITY_TOKEN = "fs.oss.securityToken";
+
+    private static final Map<String, String> CASE_SENSITIVE_KEYS =
+            new HashMap<String, String>() {
+                {
+                    put(OSS_ACCESS_KEY_ID.toLowerCase(), OSS_ACCESS_KEY_ID);
+                    put(OSS_ACCESS_KEY_SECRET.toLowerCase(), 
OSS_ACCESS_KEY_SECRET);
+                    put(OSS_SECURITY_TOKEN.toLowerCase(), OSS_SECURITY_TOKEN);
+                }
+            };
+
+    /**
+     * Cache JindoOssFileSystem, at present, there is no good mechanism to 
ensure that the file
+     * system will be shut down, so here the fs cache is used to avoid 
resource leakage.
+     */
+    private static final Map<CacheKey, Pair<JindoHadoopSystem, String>> CACHE =
+            new ConcurrentHashMap<>();
+
+    private Options hadoopOptions;
+    private boolean allowCache = true;
+
+    @Override
+    public boolean isObjectStore() {
+        return true;
+    }
+
+    @Override
+    public void configure(CatalogContext context) {
+        allowCache = context.options().get(FILE_IO_ALLOW_CACHE);
+        hadoopOptions = new Options();
+        // 
https://github.com/aliyun/alibabacloud-jindodata/blob/master/docs/user/4.x/4.6.x/4.6.1/oss/hadoop/jindosdk_ide_hadoop.md
+        hadoopOptions.set("fs.oss.impl", 
"com.aliyun.jindodata.oss.JindoOssFileSystem");
+        hadoopOptions.set("fs.AbstractFileSystem.oss.impl", 
"com.aliyun.jindodata.oss.OSS");
+
+        // Misalignment can greatly affect performance, so the maximum buffer 
is set here
+        hadoopOptions.set("fs.oss.read.position.buffer.size", "8388608");
+
+        // read all configuration with prefix 'CONFIG_PREFIXES'
+        for (String key : context.options().keySet()) {
+            for (String prefix : CONFIG_PREFIXES) {
+                if (key.startsWith(prefix)) {
+                    String value = context.options().get(key);
+                    if (CASE_SENSITIVE_KEYS.containsKey(key.toLowerCase())) {
+                        key = CASE_SENSITIVE_KEYS.get(key.toLowerCase());
+                    }
+                    hadoopOptions.set(key, value);
+
+                    LOG.debug(
+                            "Adding config entry for {} as {} to Hadoop 
config",
+                            key,
+                            hadoopOptions.get(key));
+                }
+            }
+        }
+    }
+
+    @Override
+    protected Pair<JindoHadoopSystem, String> 
createFileSystem(org.apache.hadoop.fs.Path path) {
+        final String scheme = path.toUri().getScheme();
+        final String authority = path.toUri().getAuthority();
+        Supplier<Pair<JindoHadoopSystem, String>> supplier =
+                () -> {
+                    Configuration hadoopConf = new Configuration(false);
+                    hadoopOptions.toMap().forEach(hadoopConf::set);
+                    URI fsUri = path.toUri();
+                    if (scheme == null && authority == null) {
+                        fsUri = FileSystem.getDefaultUri(hadoopConf);
+                    } else if (scheme != null && authority == null) {
+                        URI defaultUri = FileSystem.getDefaultUri(hadoopConf);
+                        if (scheme.equals(defaultUri.getScheme())
+                                && defaultUri.getAuthority() != null) {
+                            fsUri = defaultUri;
+                        }
+                    }
+
+                    JindoHadoopSystem fs;
+                    if ("oss".equals(scheme)) {
+                        fs = new JindoOssFileSystem();
+                    } else if ("dls".equals(scheme)) {
+                        fs = new JindoDlsFileSystem();
+                    } else {
+                        throw new RuntimeException(
+                                "Unsupported scheme for Jindo FileSystem: " + 
scheme);
+                    }
+
+                    try {
+                        fs.initialize(fsUri, hadoopConf);
+                    } catch (IOException e) {
+                        throw new UncheckedIOException(e);
+                    }
+                    return Pair.of(fs, fs.getSysType(path).getSysType());
+                };
+
+        if (allowCache) {
+            return CACHE.computeIfAbsent(
+                    new CacheKey(hadoopOptions, scheme, authority), key -> 
supplier.get());
+        } else {
+            return supplier.get();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (!allowCache) {
+            
fsMap.values().stream().map(Pair::getKey).forEach(IOUtils::closeQuietly);
+            fsMap.clear();
+        }
+    }
+
+    private static class CacheKey {
+
+        private final Options options;
+        private final String scheme;
+        private final String authority;
+
+        private CacheKey(Options options, String scheme, String authority) {
+            this.options = options;
+            this.scheme = scheme;
+            this.authority = authority;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            CacheKey cacheKey = (CacheKey) o;
+            return Objects.equals(options, cacheKey.options)
+                    && Objects.equals(scheme, cacheKey.scheme)
+                    && Objects.equals(authority, cacheKey.authority);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(options, scheme, authority);
+        }
+    }
+}
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoLoader.java
 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoLoader.java
new file mode 100644
index 0000000000..04a5b430f5
--- /dev/null
+++ 
b/paimon-filesystems/paimon-jindo/src/main/java/org/apache/paimon/jindo/JindoLoader.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.jindo;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.FileIOLoader;
+import org.apache.paimon.fs.Path;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** File loader for {@link JindoFileIO}. */
+public class JindoLoader implements FileIOLoader {
+
+    @Override
+    public List<String[]> requiredOptions() {
+        List<String[]> options = new ArrayList<>();
+        options.add(new String[] {"fs.oss.endpoint"});
+        options.add(new String[] {"fs.oss.accessKeyId"});
+        options.add(new String[] {"fs.oss.accessKeySecret"});
+        return options;
+    }
+
+    @Override
+    public String getScheme() {
+        return "oss";
+    }
+
+    @Override
+    public FileIO load(Path path) {
+        return new JindoFileIO();
+    }
+}
diff --git 
a/paimon-filesystems/paimon-jindo/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
 
b/paimon-filesystems/paimon-jindo/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
new file mode 100644
index 0000000000..ce136e2263
--- /dev/null
+++ 
b/paimon-filesystems/paimon-jindo/src/main/resources/META-INF/services/org.apache.paimon.fs.FileIOLoader
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.paimon.jindo.JindoLoader
diff --git a/paimon-filesystems/pom.xml b/paimon-filesystems/pom.xml
index a6c4c310aa..7e431c4919 100644
--- a/paimon-filesystems/pom.xml
+++ b/paimon-filesystems/pom.xml
@@ -39,6 +39,7 @@
         <module>paimon-s3-impl</module>
         <module>paimon-cosn</module>
         <module>paimon-cosn-impl</module>
+        <module>paimon-jindo</module>
     </modules>
 
     <properties>

Reply via email to