This is an automated email from the ASF dual-hosted git repository. kinghao pushed a commit to branch release-1.8.0-rc1 in repository https://gitbox.apache.org/repos/asf/linkis.git
commit fee4f7fdeb5e5f44c3276265200c51a06c47708c Author: aiceflower <[email protected]> AuthorDate: Mon Sep 29 11:51:46 2025 +0800 fix azure compile error --- linkis-commons/linkis-storage/pom.xml | 352 ++++----- .../factory/impl/BuildAzureBlobFileSystem.java | 120 +-- .../storage/fs/impl/AzureBlobFileSystem.java | 828 +++++++++++---------- 3 files changed, 664 insertions(+), 636 deletions(-) diff --git a/linkis-commons/linkis-storage/pom.xml b/linkis-commons/linkis-storage/pom.xml index 8715b97c7a..72ce14950c 100644 --- a/linkis-commons/linkis-storage/pom.xml +++ b/linkis-commons/linkis-storage/pom.xml @@ -1,176 +1,176 @@ -<?xml version="1.0" encoding="UTF-8"?> -<!-- - ~ Licensed to the Apache Software Foundation (ASF) under one or more - ~ contributor license agreements. See the NOTICE file distributed with - ~ this work for additional information regarding copyright ownership. - ~ The ASF licenses this file to You under the Apache License, Version 2.0 - ~ (the "License"); you may not use this file except in compliance with - ~ the License. You may obtain a copy of the License at - ~ - ~ http://www.apache.org/licenses/LICENSE-2.0 - ~ - ~ Unless required by applicable law or agreed to in writing, software - ~ distributed under the License is distributed on an "AS IS" BASIS, - ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - ~ See the License for the specific language governing permissions and - ~ limitations under the License. - --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> - <modelVersion>4.0.0</modelVersion> - - <parent> - <groupId>org.apache.linkis</groupId> - <artifactId>linkis</artifactId> - <version>${revision}</version> - <relativePath>../../pom.xml</relativePath> - </parent> - <artifactId>linkis-storage</artifactId> - - <packaging>jar</packaging> - - <dependencies> - <dependency> - <groupId>org.apache.linkis</groupId> - <artifactId>linkis-common</artifactId> - <version>${project.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.linkis</groupId> - <artifactId>linkis-hadoop-common</artifactId> - <version>${project.version}</version> - <exclusions> - <exclusion> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - </exclusion> - <exclusion> - <groupId>io.netty</groupId> - <artifactId>netty</artifactId> - </exclusion> - </exclusions> - </dependency> - - <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>${protobuf.version}</version> - </dependency> - <dependency> - <groupId>org.springframework</groupId> - <artifactId>spring-core</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.poi</groupId> - <artifactId>poi</artifactId> - <version>${poi.version}</version> - </dependency> - - <dependency> - <groupId>org.apache.poi</groupId> - <artifactId>poi-ooxml</artifactId> - <version>${poi.version}</version> - </dependency> - - <dependency> - <groupId>com.github.pjfanning</groupId> - <artifactId>excel-streaming-reader</artifactId> - <version>5.0.2</version> - </dependency> - - <dependency> - <groupId>org.apache.commons</groupId> - <artifactId>commons-compress</artifactId> - </dependency> - - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-aliyun</artifactId> - <version>3.3.4</version> - </dependency> - <dependency> - <groupId>com.aliyun.oss</groupId> - <artifactId>aliyun-sdk-oss</artifactId> - <version>3.16.0</version> - </dependency> - <dependency> - <groupId>org.jdom</groupId> - <artifactId>jdom2</artifactId> - </dependency> - - <dependency> - <groupId>com.amazonaws</groupId> - <artifactId>aws-java-sdk-s3</artifactId> - <version>1.12.261</version> - </dependency> - - <dependency> - <groupId>com.azure</groupId> - <artifactId>azure-storage-blob</artifactId> - </dependency> - <dependency> - <groupId>com.azure</groupId> - <artifactId>azure-storage-common</artifactId> - </dependency> - <dependency> - <groupId>com.azure</groupId> - <artifactId>azure-identity</artifactId> - </dependency> - <dependency> - <groupId>org.apache.parquet</groupId> - <artifactId>parquet-avro</artifactId> - <version>${parquet-avro.version}</version> - <scope>${storage.parquet.scope}</scope> - </dependency> - <dependency> - <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-mapreduce-client-core</artifactId> - <version>${hadoop.version}</version> - <scope>${storage.parquet.scope}</scope> - <exclusions> - <exclusion> - <groupId>log4j</groupId> - <artifactId>log4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-log4j12</artifactId> - </exclusion> - <!-- for hadoop 3.3.3 --> - <exclusion> - <groupId>ch.qos.reload4j</groupId> - <artifactId>reload4j</artifactId> - </exclusion> - <exclusion> - <groupId>org.slf4j</groupId> - <artifactId>slf4j-reload4j</artifactId> - </exclusion> - </exclusions> - </dependency> - <dependency> - <groupId>org.apache.orc</groupId> - <artifactId>orc-core</artifactId> - <version>${orc-core.version}</version> - <classifier>nohive</classifier> - <scope>${storage.orc.scope}</scope> - <exclusions> - <exclusion> - <groupId>org.apache.hive</groupId> - <artifactId>hive-storage-api</artifactId> - </exclusion> - </exclusions> - </dependency> - - </dependencies> - - <build> - <plugins> - <plugin> - <groupId>net.alchim31.maven</groupId> - <artifactId>scala-maven-plugin</artifactId> - </plugin> - </plugins> - </build> - -</project> +<?xml version="1.0" encoding="UTF-8"?> +<!-- + ~ Licensed to the Apache Software Foundation (ASF) under one or more + ~ contributor license agreements. See the NOTICE file distributed with + ~ this work for additional information regarding copyright ownership. + ~ The ASF licenses this file to You under the Apache License, Version 2.0 + ~ (the "License"); you may not use this file except in compliance with + ~ the License. You may obtain a copy of the License at + ~ + ~ http://www.apache.org/licenses/LICENSE-2.0 + ~ + ~ Unless required by applicable law or agreed to in writing, software + ~ distributed under the License is distributed on an "AS IS" BASIS, + ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + ~ See the License for the specific language governing permissions and + ~ limitations under the License. + --> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>org.apache.linkis</groupId> + <artifactId>linkis</artifactId> + <version>${revision}</version> + <relativePath>../../pom.xml</relativePath> + </parent> + <artifactId>linkis-storage</artifactId> + + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.linkis</groupId> + <artifactId>linkis-common</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.linkis</groupId> + <artifactId>linkis-hadoop-common</artifactId> + <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + </exclusion> + <exclusion> + <groupId>io.netty</groupId> + <artifactId>netty</artifactId> + </exclusion> + </exclusions> + </dependency> + + <dependency> + <groupId>com.google.protobuf</groupId> + <artifactId>protobuf-java</artifactId> + <version>${protobuf.version}</version> + </dependency> + <dependency> + <groupId>org.springframework</groupId> + <artifactId>spring-core</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.poi</groupId> + <artifactId>poi</artifactId> + <version>${poi.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.poi</groupId> + <artifactId>poi-ooxml</artifactId> + <version>${poi.version}</version> + </dependency> + + <dependency> + <groupId>com.github.pjfanning</groupId> + <artifactId>excel-streaming-reader</artifactId> + <version>5.0.2</version> + </dependency> + + <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-compress</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-aliyun</artifactId> + <version>3.3.4</version> + </dependency> + <dependency> + <groupId>com.aliyun.oss</groupId> + <artifactId>aliyun-sdk-oss</artifactId> + <version>3.16.0</version> + </dependency> + <dependency> + <groupId>org.jdom</groupId> + <artifactId>jdom2</artifactId> + </dependency> + + <dependency> + <groupId>com.amazonaws</groupId> + <artifactId>aws-java-sdk-s3</artifactId> + <version>1.12.261</version> + </dependency> + + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-storage-blob</artifactId> + </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-storage-common</artifactId> + </dependency> + <dependency> + <groupId>com.azure</groupId> + <artifactId>azure-identity</artifactId> + </dependency> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet-avro.version}</version> + <scope>${storage.parquet.scope}</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-mapreduce-client-core</artifactId> + <version>${hadoop.version}</version> + <scope>${storage.parquet.scope}</scope> + <exclusions> + <exclusion> + <groupId>log4j</groupId> + <artifactId>log4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-log4j12</artifactId> + </exclusion> + <!-- for hadoop 3.3.3 --> + <exclusion> + <groupId>ch.qos.reload4j</groupId> + <artifactId>reload4j</artifactId> + </exclusion> + <exclusion> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-reload4j</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.apache.orc</groupId> + <artifactId>orc-core</artifactId> + <version>${orc-core.version}</version> + <classifier>nohive</classifier> + <scope>${storage.orc.scope}</scope> + <exclusions> + <exclusion> + <groupId>org.apache.hive</groupId> + <artifactId>hive-storage-api</artifactId> + </exclusion> + </exclusions> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>net.alchim31.maven</groupId> + <artifactId>scala-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + +</project> diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java index 292bb952ed..8cd263facb 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/factory/impl/BuildAzureBlobFileSystem.java @@ -1,59 +1,61 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.linkis.storage.factory.impl; - -import org.apache.linkis.common.io.Fs; -import org.apache.linkis.storage.factory.BuildFactory; -import org.apache.linkis.storage.fs.impl.AzureBlobFileSystem; -import org.apache.linkis.storage.utils.StorageUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -public class BuildAzureBlobFileSystem implements BuildFactory { - private static final Logger LOG = LoggerFactory.getLogger(BuildAzureBlobFileSystem.class); - - @Override - public Fs getFs(String user, String proxyUser) { - AzureBlobFileSystem fs = new AzureBlobFileSystem(); - try { - fs.init(null); - } catch (IOException e) { - LOG.warn("get file system failed", e); - } - fs.setUser(user); - return fs; - } - - @Override - public Fs getFs(String user, String proxyUser, String label) { - AzureBlobFileSystem fs = new AzureBlobFileSystem(); - try { - fs.init(null); - } catch (IOException e) { - LOG.warn("get file system failed", e); - } - fs.setUser(user); - return fs; - } - - @Override - public String fsName() { - return StorageUtils.BLOB; - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.factory.impl; + +import org.apache.linkis.common.io.Fs; +import org.apache.linkis.storage.factory.BuildFactory; +import org.apache.linkis.storage.fs.impl.AzureBlobFileSystem; +import org.apache.linkis.storage.utils.StorageUtils; + +import java.io.IOException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BuildAzureBlobFileSystem implements BuildFactory { + private static final Logger LOG = LoggerFactory.getLogger(BuildAzureBlobFileSystem.class); + + @Override + public Fs getFs(String user, String proxyUser) { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); + } + fs.setUser(user); + return fs; + } + + @Override + public Fs getFs(String user, String proxyUser, String label) { + AzureBlobFileSystem fs = new AzureBlobFileSystem(); + try { + fs.init(null); + } catch (IOException e) { + LOG.warn("get file system failed", e); + } + fs.setUser(user); + return fs; + } + + @Override + public String fsName() { + return StorageUtils.BLOB(); + } +} diff --git a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java index 67475aecf2..2c02fdfc43 100644 --- a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java +++ b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/AzureBlobFileSystem.java @@ -1,401 +1,427 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.linkis.storage.fs.impl; - -import com.azure.core.util.polling.SyncPoller; -import com.azure.storage.blob.BlobClient; -import com.azure.storage.blob.BlobContainerClient; -import com.azure.storage.blob.BlobServiceClient; -import com.azure.storage.blob.BlobServiceClientBuilder; -import com.azure.storage.blob.models.BlobCopyInfo; -import com.azure.storage.blob.models.BlobStorageException; -import com.azure.storage.blob.specialized.BlobOutputStream; -import com.azure.storage.blob.specialized.BlockBlobClient; -import org.apache.linkis.common.io.FsPath; -import org.apache.linkis.storage.exception.StorageWarnException; -import org.apache.linkis.storage.fs.FileSystem; -import org.apache.linkis.storage.utils.StorageConfiguration; -import org.apache.linkis.storage.utils.StorageUtils; - -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.time.Duration; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -import static org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.TO_BE_UNKNOW; -import static org.apache.linkis.storage.utils.StorageUtils.BLOB_SCHEMA; - -public class AzureBlobFileSystem extends FileSystem { - - private static final String SLASH = "/"; - - public static class PahtInfo { - private String schema = "http://"; // http - private String domain; // - private String container; // container name - private String blobName; // blob name - private String tail; - - public PahtInfo(String domain, String container, String blobName) { - this.domain = domain; - this.container = container; - this.blobName = blobName; - if (blobName != null) { - String[] names = blobName.split(SLASH, -1); - tail = names[names.length - 1]; - } - } - - public String toFullName() { - return schema + domain + SLASH + container + SLASH + blobName; - } - - public String getSchema() { - return schema; - } - - public String getDomain() { - return domain; - } - - public String getContainer() { - return container; - } - - public String getBlobName() { - return blobName; - } - - public String getTail() { - return tail; - } - - @Override - public String toString() { - return "PahtInfo{" + - "schema='" + schema + '\'' + - ", domain='" + domain + '\'' + - ", container='" + container + '\'' + - ", blobName='" + blobName + '\'' + - ", tail='" + tail + '\'' + - '}'; - } - } - - /** - * manipulate Azure storage resources and Blob container 管理命名空间下的存储资源和Blob容器 - */ - private BlobServiceClient serviceClient; - - /** - * getBlobContainerClient - * - * @param containerName - * @return client which can manipulate Azure Storage containers and their blobs.<br> - * 操作一个容器和其blobs的客户端 - */ - private BlobContainerClient getBlobContainerClient(String containerName) { - return serviceClient.getBlobContainerClient(containerName); - } - - private PahtInfo azureLocation(String path) { - return this.azureLocation(new FsPath(path)); - } - - /** - * @param dest - * @return domain name,container name,blob name - */ - private PahtInfo azureLocation(FsPath dest) { - //https://myaccount.blob.core.windows.net/mycontainer/dir/blobname - // returns myaccount.blob.core.windows.net/mycontainer/dir/blobname - String path = dest.getPath(); - // myaccount.blob.core.windows.net/mycontainer/dir/blobname - // will split to myaccount.blob.core.windows.net - // and mycontainer/dir/blobname - String[] paths = path.split(SLASH, 2); - if (paths.length < 2) { - throw new IllegalArgumentException("file path error,with out container:" + path); - } - // split to container and blob object, - // container/dir/blobname will split to container and dir/blobname - String[] names = paths[1].split(SLASH, 2); - if (names.length < 2) { - return new PahtInfo(paths[0], names[0], null); - } else { - return new PahtInfo(paths[0], names[0], names[1]); - } - } - - /** - * init serviceClient - * - * @param properties - * @throws IOException - */ - @Override - public void init(Map<String, String> properties) throws IOException { - - /** - * The storage account provides the top-level namespace for the Blob service. 每个账户提供了一个顶级的命名空间 - */ - String acctName = StorageConfiguration.AZURE_ACCT_NAME.getValue(properties); - String connectStr = StorageConfiguration.AZURE_ACCT_CONNECT_STR.getValue(properties); - // Azure SDK client builders accept the credential as a parameter - serviceClient = - new BlobServiceClientBuilder() - .endpoint(BLOB_SCHEMA + acctName + ".blob.core.windows.net/") - .connectionString(connectStr) - .buildClient(); - } - - /** - * name of the fileSystem - * - * @return - */ - @Override - public String fsName() { - return StorageUtils.BLOB; - } - - @Override - public String rootUserName() { - return ""; - } - - /** - * @param dest - * @return - * @throws IOException - */ - @Override - public FsPath get(String dest) throws IOException { - FsPath path = new FsPath(dest); - if (exists(path)) { - return path; - } else { - throw new StorageWarnException( - TO_BE_UNKNOW.getErrorCode(), - "File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); - } - } - - /** - * Opens a blob input stream to download the blob. - * - * @param dest - * @return - * @throws BlobStorageException – If a storage service error occurred. - */ - @Override - public InputStream read(FsPath dest) { - PahtInfo result = azureLocation(dest); - BlobClient blobclient = - getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); - return blobclient.openInputStream(); - } - - /** - * @param dest - * @param overwrite - * @return - * @throws BlobStorageException – If a storage service error occurred. - * @see BlockBlobClient #getBlobOutputStream - */ - @Override - public OutputStream write(FsPath dest, boolean overwrite) { - - PahtInfo result = azureLocation(dest); - BlobClient blobclient = - getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); - return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); - } - - /** - * create a blob<br> - * 创建一个对象("文件") - * - * @param dest - * @return - * @throws IOException - */ - @Override - public boolean create(String dest) throws IOException { - FsPath path = new FsPath(dest); - if (exists(path)) { - return false; - } - PahtInfo names = this.azureLocation(dest); - // TODO 如果是路径的话后面补一个文件. - if (!names.getTail().contains(".")) { - String tmp = names.toFullName() + SLASH + "_tmp.txt"; - names = this.azureLocation(tmp); - } - BlobContainerClient client = serviceClient.createBlobContainerIfNotExists(names.getContainer()); - try (BlobOutputStream bos = - client.getBlobClient(names.getBlobName()).getBlockBlobClient().getBlobOutputStream()) { - bos.write(1); - bos.flush(); - } - - return true; - } - - /** - * Flat listing 5000 results at a time,without deleted.<br> - * 扁平化展示未删除的blob对象,最多5000条 TODO 分页接口,迭代器接口? - * - * @param path - * @return - * @throws IOException - */ - @Override - public List<FsPath> list(FsPath path) throws IOException { - final PahtInfo result = azureLocation(path); - return getBlobContainerClient(result.getContainer()).listBlobs().stream() - // Azure不会返回已删除对象 - .filter(item -> !item.isDeleted()) - .map(item -> { - FsPath tmp = new FsPath(result.toFullName() + SLASH + item.getName()); - // TODO 根据观察使用contentType来区别"对象"和"路径",但文档中没有具体的说明 - if (item.getProperties().getContentType() == null) { - tmp.setIsdir(true); - } - return tmp; - }) - .collect(Collectors.toList()); - } - - @Override - public boolean canRead(FsPath dest) throws IOException { - if (this.exists(dest)) { - return true; - } else { - return false; - } - } - - @Override - public boolean canWrite(FsPath dest) throws IOException { - if (this.exists(dest)) { - return true; - } else { - return false; - } - } - - @Override - public boolean exists(FsPath dest) throws IOException { - PahtInfo file = this.azureLocation(dest); - return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).exists(); - } - - @Override - public boolean delete(FsPath dest) throws IOException { - PahtInfo file = this.azureLocation(dest); - return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).deleteIfExists(); - } - - @Override - public boolean copy(String origin, String dest) throws IOException { - PahtInfo oriNames = this.azureLocation(origin); - PahtInfo destNames = this.azureLocation(dest); - - BlobClient oriClient = - getBlobContainerClient(oriNames.getContainer()).getBlobClient(oriNames.getBlobName()); - BlockBlobClient destClient = - getBlobContainerClient(destNames.getContainer()) - .getBlobClient(destNames.getBlobName()) - .getBlockBlobClient(); - SyncPoller<BlobCopyInfo, Void> poller = destClient.beginCopy(oriClient.getBlobUrl(), Duration.ofSeconds(2)); - poller.waitForCompletion(); - return true; - } - - @Override - public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { - // 没有事务性保证 - this.copy(oldDest.getPath(), newDest.getPath()); - this.delete(oldDest); - return true; - } - - @Override - public boolean mkdir(FsPath dest) throws IOException { - return this.create(dest.getPath()); - } - - @Override - public boolean mkdirs(FsPath dest) throws IOException { - return this.mkdir(dest); - } - - // 下面这些方法可能都无法支持 - @Override - public String listRoot() throws IOException { - return ""; - } - - @Override - public long getTotalSpace(FsPath dest) throws IOException { - return 0; - } - - @Override - public long getFreeSpace(FsPath dest) throws IOException { - return 0; - } - - @Override - public long getUsableSpace(FsPath dest) throws IOException { - return 0; - } - - @Override - public boolean canExecute(FsPath dest) throws IOException { - return false; - } - - @Override - public boolean setOwner(FsPath dest, String user, String group) throws IOException { - return false; - } - - @Override - public boolean setOwner(FsPath dest, String user) throws IOException { - return false; - } - - @Override - public boolean setGroup(FsPath dest, String group) throws IOException { - return false; - } - - @Override - public boolean setPermission(FsPath dest, String permission) throws IOException { - return false; - } - - @Override - public void close() throws IOException { - } -} +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.linkis.storage.fs.impl; + +import org.apache.linkis.common.io.FsPath; +import org.apache.linkis.storage.exception.StorageWarnException; +import org.apache.linkis.storage.fs.FileSystem; +import org.apache.linkis.storage.utils.StorageConfiguration; +import org.apache.linkis.storage.utils.StorageUtils; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import com.azure.core.util.polling.SyncPoller; +import com.azure.storage.blob.BlobClient; +import com.azure.storage.blob.BlobContainerClient; +import com.azure.storage.blob.BlobServiceClient; +import com.azure.storage.blob.BlobServiceClientBuilder; +import com.azure.storage.blob.models.BlobCopyInfo; +import com.azure.storage.blob.models.BlobStorageException; +import com.azure.storage.blob.specialized.BlobOutputStream; +import com.azure.storage.blob.specialized.BlockBlobClient; + +import static org.apache.linkis.storage.errorcode.LinkisStorageErrorCodeSummary.TO_BE_UNKNOW; + +public class AzureBlobFileSystem extends FileSystem { + + private static final String SLASH = "/"; + + public static class PahtInfo { + private String schema = "http://"; // http + private String domain; // + private String container; // container name + private String blobName; // blob name + private String tail; + + public PahtInfo(String domain, String container, String blobName) { + this.domain = domain; + this.container = container; + this.blobName = blobName; + if (blobName != null) { + String[] names = blobName.split(SLASH, -1); + tail = names[names.length - 1]; + } + } + + public String toFullName() { + return schema + domain + SLASH + container + SLASH + blobName; + } + + public String getSchema() { + return schema; + } + + public String getDomain() { + return domain; + } + + public String getContainer() { + return container; + } + + public String getBlobName() { + return blobName; + } + + public String getTail() { + return tail; + } + + @Override + public String toString() { + return "PahtInfo{" + + "schema='" + + schema + + '\'' + + ", domain='" + + domain + + '\'' + + ", container='" + + container + + '\'' + + ", blobName='" + + blobName + + '\'' + + ", tail='" + + tail + + '\'' + + '}'; + } + } + + /** manipulate Azure storage resources and Blob container 管理命名空间下的存储资源和Blob容器 */ + private BlobServiceClient serviceClient; + + /** + * getBlobContainerClient + * + * @param containerName + * @return client which can manipulate Azure Storage containers and their blobs.<br> + * 操作一个容器和其blobs的客户端 + */ + private BlobContainerClient getBlobContainerClient(String containerName) { + return serviceClient.getBlobContainerClient(containerName); + } + + private PahtInfo azureLocation(String path) { + return this.azureLocation(new FsPath(path)); + } + + /** + * @param dest + * @return domain name,container name,blob name + */ + private PahtInfo azureLocation(FsPath dest) { + // https://myaccount.blob.core.windows.net/mycontainer/dir/blobname + // returns myaccount.blob.core.windows.net/mycontainer/dir/blobname + String path = dest.getPath(); + // myaccount.blob.core.windows.net/mycontainer/dir/blobname + // will split to myaccount.blob.core.windows.net + // and mycontainer/dir/blobname + String[] paths = path.split(SLASH, 2); + if (paths.length < 2) { + throw new IllegalArgumentException("file path error,with out container:" + path); + } + // split to container and blob object, + // container/dir/blobname will split to container and dir/blobname + String[] names = paths[1].split(SLASH, 2); + if (names.length < 2) { + return new PahtInfo(paths[0], names[0], null); + } else { + return new PahtInfo(paths[0], names[0], names[1]); + } + } + + /** + * init serviceClient + * + * @param properties + * @throws IOException + */ + @Override + public void init(Map<String, String> properties) throws IOException { + + /** + * The storage account provides the top-level namespace for the Blob service. 每个账户提供了一个顶级的命名空间 + */ + String acctName = StorageConfiguration.AZURE_ACCT_NAME().getValue(properties); + String connectStr = StorageConfiguration.AZURE_ACCT_CONNECT_STR().getValue(properties); + // Azure SDK client builders accept the credential as a parameter + serviceClient = + new BlobServiceClientBuilder() + .endpoint(StorageUtils.BLOB_SCHEMA() + acctName + ".blob.core.windows.net/") + .connectionString(connectStr) + .buildClient(); + } + + /** + * name of the fileSystem + * + * @return + */ + @Override + public String fsName() { + return StorageUtils.BLOB(); + } + + @Override + public String rootUserName() { + return ""; + } + + /** + * @param dest + * @return + * @throws IOException + */ + @Override + public FsPath get(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return path; + } else { + throw new StorageWarnException( + TO_BE_UNKNOW.getErrorCode(), + "File or folder does not exist or file name is garbled(文件或者文件夹不存在或者文件名乱码)"); + } + } + + /** + * Opens a blob input stream to download the blob. + * + * @param dest + * @return + * @throws BlobStorageException – If a storage service error occurred. + */ + @Override + public InputStream read(FsPath dest) { + PahtInfo result = azureLocation(dest); + BlobClient blobclient = + getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); + return blobclient.openInputStream(); + } + + /** + * @param dest + * @param overwrite + * @return + * @throws BlobStorageException – If a storage service error occurred. + * @see BlockBlobClient #getBlobOutputStream + */ + @Override + public OutputStream write(FsPath dest, boolean overwrite) { + + PahtInfo result = azureLocation(dest); + BlobClient blobclient = + getBlobContainerClient(result.getContainer()).getBlobClient(result.getBlobName()); + return blobclient.getBlockBlobClient().getBlobOutputStream(overwrite); + } + + /** + * create a blob<br> + * 创建一个对象("文件") + * + * @param dest + * @return + * @throws IOException + */ + @Override + public boolean create(String dest) throws IOException { + FsPath path = new FsPath(dest); + if (exists(path)) { + return false; + } + PahtInfo names = this.azureLocation(dest); + // TODO 如果是路径的话后面补一个文件. + if (!names.getTail().contains(".")) { + String tmp = names.toFullName() + SLASH + "_tmp.txt"; + names = this.azureLocation(tmp); + } + BlobContainerClient client = serviceClient.createBlobContainerIfNotExists(names.getContainer()); + try (BlobOutputStream bos = + client.getBlobClient(names.getBlobName()).getBlockBlobClient().getBlobOutputStream()) { + bos.write(1); + bos.flush(); + } + + return true; + } + + /** + * Flat listing 5000 results at a time,without deleted.<br> + * 扁平化展示未删除的blob对象,最多5000条 TODO 分页接口,迭代器接口? + * + * @param path + * @return + * @throws IOException + */ + @Override + public List<FsPath> list(FsPath path) throws IOException { + final PahtInfo result = azureLocation(path); + return getBlobContainerClient(result.getContainer()).listBlobs().stream() + // Azure不会返回已删除对象 + .filter(item -> !item.isDeleted()) + .map( + item -> { + FsPath tmp = new FsPath(result.toFullName() + SLASH + item.getName()); + // TODO 根据观察使用contentType来区别"对象"和"路径",但文档中没有具体的说明 + if (item.getProperties().getContentType() == null) { + tmp.setIsdir(true); + } + return tmp; + }) + .collect(Collectors.toList()); + } + + @Override + public boolean canRead(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; + } + } + + @Override + public boolean canRead(FsPath dest, String user) throws IOException { + return false; + } + + @Override + public boolean canWrite(FsPath dest) throws IOException { + if (this.exists(dest)) { + return true; + } else { + return false; + } + } + + @Override + public boolean exists(FsPath dest) throws IOException { + PahtInfo file = this.azureLocation(dest); + return getBlobContainerClient(file.getContainer()).getBlobClient(file.getBlobName()).exists(); + } + + @Override + public boolean delete(FsPath dest) throws IOException { + PahtInfo file = this.azureLocation(dest); + return getBlobContainerClient(file.getContainer()) + .getBlobClient(file.getBlobName()) + .deleteIfExists(); + } + + @Override + public boolean copy(String origin, String dest) throws IOException { + PahtInfo oriNames = this.azureLocation(origin); + PahtInfo destNames = this.azureLocation(dest); + + BlobClient oriClient = + getBlobContainerClient(oriNames.getContainer()).getBlobClient(oriNames.getBlobName()); + BlockBlobClient destClient = + getBlobContainerClient(destNames.getContainer()) + .getBlobClient(destNames.getBlobName()) + .getBlockBlobClient(); + SyncPoller<BlobCopyInfo, Void> poller = + destClient.beginCopy(oriClient.getBlobUrl(), Duration.ofSeconds(2)); + poller.waitForCompletion(); + return true; + } + + @Override + public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException { + // 没有事务性保证 + this.copy(oldDest.getPath(), newDest.getPath()); + this.delete(oldDest); + return true; + } + + @Override + public boolean mkdir(FsPath dest) throws IOException { + return this.create(dest.getPath()); + } + + @Override + public boolean mkdirs(FsPath dest) throws IOException { + return this.mkdir(dest); + } + + // 下面这些方法可能都无法支持 + @Override + public String listRoot() throws IOException { + return ""; + } + + @Override + public long getTotalSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getFreeSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getUsableSpace(FsPath dest) throws IOException { + return 0; + } + + @Override + public long getLength(FsPath dest) throws IOException { + return 0; + } + + @Override + public String checkSum(FsPath dest) throws IOException { + return null; + } + + @Override + public boolean canExecute(FsPath dest) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user, String group) throws IOException { + return false; + } + + @Override + public boolean setOwner(FsPath dest, String user) throws IOException { + return false; + } + + @Override + public boolean setGroup(FsPath dest, String group) throws IOException { + return false; + } + + @Override + public boolean setPermission(FsPath dest, String permission) throws IOException { + return false; + } + + @Override + public void close() throws IOException {} +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
