This is an automated email from the ASF dual-hosted git repository.

casion pushed a commit to branch dev-1.8.0
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/dev-1.8.0 by this push:
     new 76975786f4 support azure (#5214)
76975786f4 is described below

commit 76975786f4ff4e8d7bb9276c595e939fb93c405b
Author: v-kkhuang <[email protected]>
AuthorDate: Mon Sep 22 17:07:53 2025 +0800

    support azure (#5214)
    
    * support azure
    
    * remove file
    
    * add azure conf
    
    ---------
    
    Co-authored-by: “v_kkhuang” <“[email protected]”>
---
 linkis-commons/linkis-storage/pom.xml              |  12 +
 .../factory/impl/BuildAzureBlobFileSystem.java     |  59 +++
 .../storage/fs/impl/AzureBlobFileSystem.java       | 401 +++++++++++++++++++++
 .../storage/utils/StorageConfiguration.scala       |   6 +-
 .../apache/linkis/storage/utils/StorageUtils.scala |   4 +-
 .../storage/utils/StorageConfigurationTest.scala   |   5 +
 linkis-dist/package/conf/linkis.properties         |   6 +-
 pom.xml                                            |   8 +
 8 files changed, 498 insertions(+), 3 deletions(-)

diff --git a/linkis-commons/linkis-storage/pom.xml 
b/linkis-commons/linkis-storage/pom.xml
index 6e04016fa7..8715b97c7a 100644
--- a/linkis-commons/linkis-storage/pom.xml
+++ b/linkis-commons/linkis-storage/pom.xml
@@ -105,6 +105,18 @@
       <version>1.12.261</version>
     </dependency>
 
+    <dependency>
+       <groupId>com.azure</groupId>
+       <artifactId>azure-storage-blob</artifactId>
+    </dependency>
+    <dependency>
+       <groupId>com.azure</groupId>
+       <artifactId>azure-storage-common</artifactId>
+    </dependency>
+    <dependency>
+       <groupId>com.azure</groupId>
+       <artifactId>azure-identity</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.parquet</groupId>
       <artifactId>parquet-avro</artifactId>
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java
new file mode 100644
index 0000000000..292bb952ed
--- /dev/null
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.linkis.storage.factory.impl;
+
+import org.apache.linkis.common.io.Fs;
+import org.apache.linkis.storage.factory.BuildFactory;
+import org.apache.linkis.storage.fs.impl.AzureBlobFileSystem;
+import org.apache.linkis.storage.utils.StorageUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class BuildAzureBlobFileSystem implements BuildFactory {
+    private static final Logger LOG = 
LoggerFactory.getLogger(BuildAzureBlobFileSystem.class);
+
+    @Override
+    public Fs getFs(String user, String proxyUser) {
+        AzureBlobFileSystem fs = new AzureBlobFileSystem();
+        try {
+            fs.init(null);
+        } catch (IOException e) {
+            LOG.warn("get file system failed", e);
+        }
+        fs.setUser(user);
+        return fs;
+    }
+
+    @Override
+    public Fs getFs(String user, String proxyUser, String label) {
+        AzureBlobFileSystem fs = new AzureBlobFileSystem();
+        try {
+            fs.init(null);
+        } catch (IOException e) {
+            LOG.warn("get file system failed", e);
+        }
+        fs.setUser(user);
+        return fs;
+    }
+
+    @Override
+    public String fsName() {
+        return StorageUtils.BLOB;
+    }
+}
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java
new file mode 100644
index 0000000000..67475aecf2
--- /dev/null
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java
@@ -0,0 +1,401 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.storage.fs.impl;
+
+import com.azure.core.util.polling.SyncPoller;
+import com.azure.storage.blob.BlobClient;
+import com.azure.storage.blob.BlobContainerClient;
+import com.azure.storage.blob.BlobServiceClient;
+import com.azure.storage.blob.BlobServiceClientBuilder;
+import com.azure.storage.blob.models.BlobCopyInfo;
+import com.azure.storage.blob.models.BlobStorageException;
+import com.azure.storage.blob.specialized.BlobOutputStream;
+import com.azure.storage.blob.specialized.BlockBlobClient;
+import org.apache.linkis.common.io.FsPath;
+import org.apache.linkis.storage.exception.StorageWarnException;
+import org.apache.linkis.storage.fs.FileSystem;
+import org.apache.linkis.storage.utils.StorageConfiguration;
+import org.apache.linkis.storage.utils.StorageUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.time.Duration;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static 
org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.TO_BE_UNKNOW;
+import static org.apache.linkis.storage.utils.StorageUtils.BLOB_SCHEMA;
+
+public class AzureBlobFileSystem extends FileSystem {
+
+    private static final String SLASH = "/";
+
+    public static class PahtInfo {
+        private String schema = "http://";; // http
+        private String domain; //
+        private String container; // container name
+        private String blobName; // blob name
+        private String tail;
+
+        public PahtInfo(String domain, String container, String blobName) {
+            this.domain = domain;
+            this.container = container;
+            this.blobName = blobName;
+            if (blobName != null) {
+                String[] names = blobName.split(SLASH, -1);
+                tail = names[names.length - 1];
+            }
+        }
+
+        public String toFullName() {
+            return schema + domain + SLASH + container + SLASH + blobName;
+        }
+
+        public String getSchema() {
+            return schema;
+        }
+
+        public String getDomain() {
+            return domain;
+        }
+
+        public String getContainer() {
+            return container;
+        }
+
+        public String getBlobName() {
+            return blobName;
+        }
+
+        public String getTail() {
+            return tail;
+        }
+
+        @Override
+        public String toString() {
+            return "PahtInfo{" +
+                    "schema='" + schema + '\'' +
+                    ", domain='" + domain + '\'' +
+                    ", container='" + container + '\'' +
+                    ", blobName='" + blobName + '\'' +
+                    ", tail='" + tail + '\'' +
+                    '}';
+        }
+    }
+
+    /**
+     * manipulate Azure storage resources and Blob container 
管理命名空间下的存储资源和Blob容器
+     */
+    private BlobServiceClient serviceClient;
+
+    /**
+     * getBlobContainerClient
+     *
+     * @param containerName
+     * @return client which can manipulate Azure Storage containers and their 
blobs.<br>
+     * 操作一个容器和其blobs的客户端
+     */
+    private BlobContainerClient getBlobContainerClient(String containerName) {
+        return serviceClient.getBlobContainerClient(containerName);
+    }
+
+    private PahtInfo azureLocation(String path) {
+        return this.azureLocation(new FsPath(path));
+    }
+
+    /**
+     * @param dest
+     * @return domain name,container name,blob name
+     */
+    private PahtInfo azureLocation(FsPath dest) {
+        //https://myaccount.blob.core.windows.net/mycontainer/dir/blobname
+        // returns myaccount.blob.core.windows.net/mycontainer/dir/blobname
+        String path = dest.getPath();
+        // myaccount.blob.core.windows.net/mycontainer/dir/blobname
+        // will split to myaccount.blob.core.windows.net
+        // and mycontainer/dir/blobname
+        String[] paths = path.split(SLASH, 2);
+        if (paths.length < 2) {
+            throw new IllegalArgumentException("file path error,with out 
container:" + path);
+        }
+        // split to container and blob object,
+        // container/dir/blobname will split to container and dir/blobname
+        String[] names = paths[1].split(SLASH, 2);
+        if (names.length < 2) {
+            return new PahtInfo(paths[0], names[0], null);
+        } else {
+            return new PahtInfo(paths[0], names[0], names[1]);
+        }
+    }
+
+    /**
+     * init serviceClient
+     *
+     * @param properties
+     * @throws IOException
+     */
+    @Override
+    public void init(Map<String, String> properties) throws IOException {
+
+        /**
+         * The storage account provides the top-level namespace for the Blob 
service. 每个账户提供了一个顶级的命名空间
+         */
+        String acctName = 
StorageConfiguration.AZURE_ACCT_NAME.getValue(properties);
+        String connectStr = 
StorageConfiguration.AZURE_ACCT_CONNECT_STR.getValue(properties);
+        // Azure SDK client builders accept the credential as a parameter
+        serviceClient =
+                new BlobServiceClientBuilder()
+                        .endpoint(BLOB_SCHEMA + acctName + 
".blob.core.windows.net/")
+                        .connectionString(connectStr)
+                        .buildClient();
+    }
+
+    /**
+     * name of the fileSystem
+     *
+     * @return
+     */
+    @Override
+    public String fsName() {
+        return StorageUtils.BLOB;
+    }
+
+    @Override
+    public String rootUserName() {
+        return "";
+    }
+
+    /**
+     * @param dest
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public FsPath get(String dest) throws IOException {
+        FsPath path = new FsPath(dest);
+        if (exists(path)) {
+            return path;
+        } else {
+            throw new StorageWarnException(
+                    TO_BE_UNKNOW.getErrorCode(),
+                    "File or folder does not exist or file name is 
garbled(文件或者文件夹不存在或者文件名乱码)");
+        }
+    }
+
+    /**
+     * Opens a blob input stream to download the blob.
+     *
+     * @param dest
+     * @return
+     * @throws BlobStorageException – If a storage service error occurred.
+     */
+    @Override
+    public InputStream read(FsPath dest) {
+        PahtInfo result = azureLocation(dest);
+        BlobClient blobclient =
+                
getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName());
+        return blobclient.openInputStream();
+    }
+
+    /**
+     * @param dest
+     * @param overwrite
+     * @return
+     * @throws BlobStorageException – If a storage service error occurred.
+     * @see BlockBlobClient #getBlobOutputStream
+     */
+    @Override
+    public OutputStream write(FsPath dest, boolean overwrite) {
+
+        PahtInfo result = azureLocation(dest);
+        BlobClient blobclient =
+                
getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName());
+        return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite);
+    }
+
+    /**
+     * create a blob<br>
+     * 创建一个对象("文件")
+     *
+     * @param dest
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public boolean create(String dest) throws IOException {
+        FsPath path = new FsPath(dest);
+        if (exists(path)) {
+            return false;
+        }
+        PahtInfo names = this.azureLocation(dest);
+        // TODO 如果是路径的话后面补一个文件.
+        if (!names.getTail().contains(".")) {
+            String tmp = names.toFullName() + SLASH + "_tmp.txt";
+            names = this.azureLocation(tmp);
+        }
+        BlobContainerClient client = 
serviceClient.createBlobContainerIfNotExists(names.getContainer());
+        try (BlobOutputStream bos =
+                     
client.getBlobClient(names.getBlobName()).getBlockBlobClient().getBlobOutputStream())
 {
+            bos.write(1);
+            bos.flush();
+        }
+
+        return true;
+    }
+
+    /**
+     * Flat listing 5000 results at a time,without deleted.<br>
+     * 扁平化展示未删除的blob对象,最多5000条 TODO 分页接口,迭代器接口?
+     *
+     * @param path
+     * @return
+     * @throws IOException
+     */
+    @Override
+    public List<FsPath> list(FsPath path) throws IOException {
+        final PahtInfo result = azureLocation(path);
+        return 
getBlobContainerClient(result.getContainer()).listBlobs().stream()
+                // Azure不会返回已删除对象
+                .filter(item -> !item.isDeleted())
+                .map(item -> {
+                    FsPath tmp = new FsPath(result.toFullName() + SLASH + 
item.getName());
+                    // TODO 根据观察使用contentType来区别"对象"和"路径",但文档中没有具体的说明
+                    if (item.getProperties().getContentType() == null) {
+                        tmp.setIsdir(true);
+                    }
+                    return tmp;
+                })
+                .collect(Collectors.toList());
+    }
+
+    @Override
+    public boolean canRead(FsPath dest) throws IOException {
+        if (this.exists(dest)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public boolean canWrite(FsPath dest) throws IOException {
+        if (this.exists(dest)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public boolean exists(FsPath dest) throws IOException {
+        PahtInfo file = this.azureLocation(dest);
+        return 
getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).exists();
+    }
+
+    @Override
+    public boolean delete(FsPath dest) throws IOException {
+        PahtInfo file = this.azureLocation(dest);
+        return 
getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).deleteIfExists();
+    }
+
+    @Override
+    public boolean copy(String origin, String dest) throws IOException {
+        PahtInfo oriNames = this.azureLocation(origin);
+        PahtInfo destNames = this.azureLocation(dest);
+
+        BlobClient oriClient =
+                
getBlobContainerClient(oriNames.getContainer()).getBlobClient(oriNames.getBlobName());
+        BlockBlobClient destClient =
+                getBlobContainerClient(destNames.getContainer())
+                        .getBlobClient(destNames.getBlobName())
+                        .getBlockBlobClient();
+        SyncPoller<BlobCopyInfo, Void> poller = 
destClient.beginCopy(oriClient.getBlobUrl(), Duration.ofSeconds(2));
+        poller.waitForCompletion();
+        return true;
+    }
+
+    @Override
+    public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException 
{
+        // 没有事务性保证
+        this.copy(oldDest.getPath(), newDest.getPath());
+        this.delete(oldDest);
+        return true;
+    }
+
+    @Override
+    public boolean mkdir(FsPath dest) throws IOException {
+        return this.create(dest.getPath());
+    }
+
+    @Override
+    public boolean mkdirs(FsPath dest) throws IOException {
+        return this.mkdir(dest);
+    }
+
+    // 下面这些方法可能都无法支持
+    @Override
+    public String listRoot() throws IOException {
+        return "";
+    }
+
+    @Override
+    public long getTotalSpace(FsPath dest) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public long getFreeSpace(FsPath dest) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public long getUsableSpace(FsPath dest) throws IOException {
+        return 0;
+    }
+
+    @Override
+    public boolean canExecute(FsPath dest) throws IOException {
+        return false;
+    }
+
+    @Override
+    public boolean setOwner(FsPath dest, String user, String group) throws 
IOException {
+        return false;
+    }
+
+    @Override
+    public boolean setOwner(FsPath dest, String user) throws IOException {
+        return false;
+    }
+
+    @Override
+    public boolean setGroup(FsPath dest, String group) throws IOException {
+        return false;
+    }
+
+    @Override
+    public boolean setPermission(FsPath dest, String permission) throws 
IOException {
+        return false;
+    }
+
+    @Override
+    public void close() throws IOException {
+    }
+}
diff --git 
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
 
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
index c73b00743d..17345c050a 100644
--- 
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
+++ 
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala
@@ -50,7 +50,8 @@ object StorageConfiguration {
   val STORAGE_BUILD_FS_CLASSES = CommonVars(
     "wds.linkis.storage.build.fs.classes",
     
"org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem,"
 +
-      
"org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem"
+    
"org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem,"
 +
+    "org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem"
   )
 
   val IS_SHARE_NODE = CommonVars("wds.linkis.storage.is.share.node", true)
@@ -117,4 +118,7 @@ object StorageConfiguration {
 
   val S3_BUCKET = CommonVars[String]("linkis.storage.s3.bucket", "", null, 
null)
 
+  val AZURE_ACCT_NAME = CommonVars[String]("linkis.storage.azure.acctName", 
"", null, null)
+
+  val AZURE_ACCT_CONNECT_STR = 
CommonVars[String]("linkis.storage.azure.connectstr", "", null, null)
 }
diff --git 
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
 
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
index dd5d8c37ef..a38b0edc4c 100644
--- 
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
+++ 
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala
@@ -39,11 +39,13 @@ object StorageUtils extends Logging {
   val FILE = "file"
   val OSS = "oss"
   val S3 = "s3"
+  val BLOB = "https"
 
   val FILE_SCHEMA = "file://"
   val HDFS_SCHEMA = "hdfs://"
   val OSS_SCHEMA = "oss://"
   val S3_SCHEMA = "s3://"
+  val BLOB_SCHEMA = "https://";
 
   private val nf = NumberFormat.getInstance()
   nf.setGroupingUsed(false)
@@ -202,7 +204,7 @@ object StorageUtils extends Logging {
    * @return
    */
   def getFsPath(path: String): FsPath = {
-    if (path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA)) new 
FsPath(path)
+    if (path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA) || 
path.startsWith(BLOB_SCHEMA)) new FsPath(path)
     else {
       new FsPath(FILE_SCHEMA + path)
     }
diff --git 
a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala
 
b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala
index 4d21655ebd..a821038005 100644
--- 
a/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala
+++ 
b/linkis-commons/linkis-storage/src/test/scala/org/apache/linkis/storage/utils/StorageConfigurationTest.scala
@@ -60,6 +60,11 @@ class StorageConfigurationTest {
       
"txt.TextResultSet,table.TableResultSet,io.IOResultSet,html.HtmlResultSet,picture.PictureResultSet",
       storageresultsetclasses
     )
+    Assertions.assertEquals(
+      
"org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem,"
 +
+        
"org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem",
+      storagebuildfsclasses
+    )
     Assertions.assertTrue(issharenode)
     Assertions.assertFalse(enableioproxy)
     Assertions.assertEquals("root", ioUser)
diff --git a/linkis-dist/package/conf/linkis.properties 
b/linkis-dist/package/conf/linkis.properties
index ae30dce4a6..7b0a9e7d56 100644
--- a/linkis-dist/package/conf/linkis.properties
+++ b/linkis-dist/package/conf/linkis.properties
@@ -120,4 +120,8 @@ linkis.storage.s3.access.key=
 linkis.storage.s3.secret.key=
 linkis.storage.s3.endpoint=
 linkis.storage.s3.region=
-linkis.storage.s3.bucket=
\ No newline at end of file
+linkis.storage.s3.bucket=
+
+# azure file system
+linkis.storage.azure.acctName=
+linkis.storage.azure.connectstr=
diff --git a/pom.xml b/pom.xml
index e70cf06919..0d824dde84 100644
--- a/pom.xml
+++ b/pom.xml
@@ -227,6 +227,7 @@
     <spring-cloud.version>2021.0.8</spring-cloud.version>
     <spring-cloud-alibaba.version>2021.0.6.0</spring-cloud-alibaba.version>
     <spring-cloud-common.version>3.1.7</spring-cloud-common.version>
+    <azure.blob.bom>1.2.30</azure.blob.bom>
 
     <!-- platform encoding override -->
     <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
@@ -1378,6 +1379,13 @@
         <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
         <version>${spring-cloud-alibaba.version}</version>
       </dependency>
+      <dependency>
+        <groupId>com.azure</groupId>
+        <artifactId>azure-sdk-bom</artifactId>
+        <version>${azure.blob.bom}</version>
+        <type>pom</type>
+        <scope>import</scope>
+      </dependency>
     </dependencies>
   </dependencyManagement>
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to