This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bc388f84d  Enhance the functionality of s3filesystem (#5208)
6bc388f84d is described below

commit 6bc388f84d53990e19497948852c21c1a790fb57
Author: LiuGuoHua <[email protected]>
AuthorDate: Wed Dec 4 19:35:10 2024 +0800

     Enhance the functionality of s3filesystem (#5208)
    
    * 1. Enhance the functionality of s3filesystem to support multipart uploads.
    2. Support the use of s3 storage for BML materials and workspaces.
    
    * format code
---
 .../linkis/storage/fs/impl/S3FileSystem.java       | 190 ++++++++++--------
 .../linkis/storage/fs/stream/S3OutputStream.java   | 216 +++++++++++++++++++++
 .../linkis/storage/utils/FileSystemUtils.scala     |   4 +-
 .../governance/common/utils/GovernanceUtils.scala  |   2 +-
 .../linkis/entrance/utils/CommonLogPathUtils.scala |   5 +-
 .../linkis/bml/common/ResourceHelperFactory.java   |  10 +-
 .../apache/linkis/bml/common/S3ResourceHelper.java | 183 +++++++++++++++++
 .../linkis/bml/conf/BmlServerConfiguration.scala   |  29 +++
 .../filesystem/restful/api/FsRestfulApi.java       |   4 +
 9 files changed, 558 insertions(+), 85 deletions(-)

diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
index 2aff3da7f5..a9f00b60d5 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
@@ -21,13 +21,17 @@ import org.apache.linkis.common.io.FsPath;
 import org.apache.linkis.storage.domain.FsPathListWithError;
 import org.apache.linkis.storage.exception.StorageWarnException;
 import org.apache.linkis.storage.fs.FileSystem;
+import org.apache.linkis.storage.fs.stream.S3OutputStream;
 import org.apache.linkis.storage.utils.StorageConfiguration;
 import org.apache.linkis.storage.utils.StorageUtils;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
 
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
@@ -99,6 +103,7 @@ public class S3FileSystem extends FileSystem {
   public FsPath get(String dest) throws IOException {
     FsPath ret = new FsPath(dest);
     if (exists(ret)) {
+      ret.setIsdir(isDir(buildKey(ret.getPath())));
       return ret;
     } else {
       logger.warn("File or folder does not exist or file name is 
garbled(文件或者文件夹不存在或者文件名乱码)");
@@ -111,7 +116,7 @@ public class S3FileSystem extends FileSystem {
   @Override
   public InputStream read(FsPath dest) throws IOException {
     try {
-      return s3Client.getObject(bucket, buildPrefix(dest.getPath(), 
false)).getObjectContent();
+      return s3Client.getObject(bucket, 
buildKey(dest.getPath())).getObjectContent();
     } catch (AmazonS3Exception e) {
       throw new IOException("You have not permission to access path " + 
dest.getPath());
     }
@@ -119,13 +124,23 @@ public class S3FileSystem extends FileSystem {
 
   @Override
   public OutputStream write(FsPath dest, boolean overwrite) throws IOException 
{
-    try (InputStream inputStream = read(dest);
-        OutputStream outputStream =
-            new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(), 
false))) {
+    InputStream inputStream = null;
+    try {
+      if (!exists(dest)) {
+        create(dest.getPath());
+      }
+
+      OutputStream outputStream = new S3OutputStream(s3Client, bucket, 
buildKey(dest.getPath()));
+
       if (!overwrite) {
+        inputStream = read(dest);
         IOUtils.copy(inputStream, outputStream);
       }
       return outputStream;
+    } catch (IOException e) {
+      throw new IOException("You have not permission to access path " + 
dest.getPath());
+    } finally {
+      IOUtils.closeQuietly(inputStream);
     }
   }
 
@@ -134,7 +149,7 @@ public class S3FileSystem extends FileSystem {
     if (exists(new FsPath(dest))) {
       return false;
     }
-    s3Client.putObject(bucket, dest, "");
+    s3Client.putObject(bucket, buildKey(dest), "");
     return true;
   }
 
@@ -142,16 +157,31 @@ public class S3FileSystem extends FileSystem {
   public List<FsPath> list(FsPath path) throws IOException {
     try {
       if (!StringUtils.isEmpty(path.getPath())) {
-        ListObjectsV2Result listObjectsV2Result = 
s3Client.listObjectsV2(bucket, path.getPath());
-        List<S3ObjectSummary> s3ObjectSummaries = 
listObjectsV2Result.getObjectSummaries();
-        return s3ObjectSummaries.stream()
-            .filter(summary -> !isInitFile(summary))
-            .map(
-                summary -> {
-                  FsPath newPath = new FsPath(buildPath(summary.getKey()));
-                  return fillStorageFile(newPath, summary);
-                })
-            .collect(Collectors.toList());
+        ListObjectsV2Request listObjectsV2Request =
+            new ListObjectsV2Request()
+                .withBucketName(bucket)
+                .withPrefix(buildKey(path.getPath()) + "/")
+                .withDelimiter("/");
+        ListObjectsV2Result dirResult = 
s3Client.listObjectsV2(listObjectsV2Request);
+        List<S3ObjectSummary> s3ObjectSummaries = 
dirResult.getObjectSummaries();
+        List<String> commonPrefixes = dirResult.getCommonPrefixes();
+        List<FsPath> fsPaths =
+            s3ObjectSummaries.stream()
+                .filter(summary -> !isInitFile(summary))
+                .map(
+                    summary -> {
+                      FsPath newPath = new FsPath(buildPath(summary.getKey()));
+                      return fillStorageFile(newPath, summary);
+                    })
+                .collect(Collectors.toList());
+        if (commonPrefixes != null) {
+          for (String dir : commonPrefixes) {
+            FsPath newPath = new FsPath(buildPath(dir));
+            newPath.setIsdir(true);
+            fsPaths.add(newPath);
+          }
+        }
+        return fsPaths;
       }
     } catch (AmazonS3Exception e) {
       throw new IOException("You have not permission to access path " + 
path.getPath());
@@ -173,7 +203,7 @@ public class S3FileSystem extends FileSystem {
         ListObjectsV2Request listObjectsV2Request =
             new ListObjectsV2Request()
                 .withBucketName(bucket)
-                .withPrefix(buildPrefix(path.getPath()))
+                .withPrefix(buildKey(path.getPath()) + "/")
                 .withDelimiter("/");
         ListObjectsV2Result dirResult = 
s3Client.listObjectsV2(listObjectsV2Request);
         List<S3ObjectSummary> s3ObjectSummaries = 
dirResult.getObjectSummaries();
@@ -204,25 +234,15 @@ public class S3FileSystem extends FileSystem {
   @Override
   public boolean exists(FsPath dest) throws IOException {
     try {
-      if (new File(dest.getPath()).getName().contains(".")) {
-        return existsFile(dest);
+      if (dest == null) {
+        return false;
       }
       ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
       listObjectsV2Request
           .withBucketName(bucket)
-          .withPrefix(buildPrefix(dest.getPath()))
-          .withDelimiter("/");
-      return 
s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size()
-              + 
s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size()
-          > 0;
-    } catch (AmazonS3Exception e) {
-      return false;
-    }
-  }
-
-  public boolean existsFile(FsPath dest) {
-    try {
-      return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(), 
false));
+          .withPrefix(buildKey(dest.getPath()))
+          .withMaxKeys(1);
+      return 
!s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty();
     } catch (AmazonS3Exception e) {
       return false;
     }
@@ -231,25 +251,41 @@ public class S3FileSystem extends FileSystem {
   @Override
   public boolean delete(FsPath dest) throws IOException {
     try {
-      ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
-      
listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(),
 false));
-      ListObjectsV2Result result = 
s3Client.listObjectsV2(listObjectsV2Request);
-      String[] keyList =
-          
result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
-      DeleteObjectsRequest deleteObjectsRequest =
-          new DeleteObjectsRequest("test").withKeys(keyList);
-      s3Client.deleteObjects(deleteObjectsRequest);
+      List<String> deleteKeys = new ArrayList<>();
+      delete(dest, deleteKeys);
+      if (!deleteKeys.isEmpty()) {
+        DeleteObjectsRequest deleteObjectsRequest =
+            new DeleteObjectsRequest(bucket).withKeys(deleteKeys.toArray(new 
String[0]));
+        s3Client.deleteObjects(deleteObjectsRequest);
+      }
       return true;
     } catch (AmazonS3Exception e) {
       throw new IOException("You have not permission to access path " + 
dest.getPath());
     }
   }
 
+  public void delete(FsPath dest, List<String> keys) throws IOException {
+    if (isDir(buildKey(dest.getPath()))) {
+      FsPathListWithError fsPathListWithError = listPathWithError(dest, false);
+      List<FsPath> fsPaths = fsPathListWithError.getFsPaths();
+      fsPaths.forEach(
+          fsPath -> {
+            try {
+              delete(fsPath, keys);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          });
+    } else {
+      keys.add(buildKey(dest.getPath()));
+    }
+  }
+
   @Override
   public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
     try {
-      String newOriginPath = buildPrefix(oldDest.getPath(), false);
-      String newDestPath = buildPrefix(newDest.getPath(), false);
+      String newOriginPath = buildKey(oldDest.getPath());
+      String newDestPath = buildKey(newDest.getPath());
       ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
       listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath);
       ListObjectsV2Result result = 
s3Client.listObjectsV2(listObjectsV2Request);
@@ -281,8 +317,8 @@ public class S3FileSystem extends FileSystem {
   @Override
   public boolean copy(String origin, String dest) throws IOException {
     try {
-      String newOrigin = buildPrefix(origin, false);
-      String newDest = buildPrefix(dest, false);
+      String newOrigin = buildKey(origin);
+      String newDest = buildKey(dest);
       ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
       listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin);
       ListObjectsV2Result result = 
s3Client.listObjectsV2(listObjectsV2Request);
@@ -305,8 +341,16 @@ public class S3FileSystem extends FileSystem {
     }
   }
 
-  private boolean isDir(S3ObjectSummary s3ObjectSummary, String prefix) {
-    return s3ObjectSummary.getKey().substring(prefix.length()).contains("/");
+  private boolean isDir(String key) {
+    ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
+    listObjectsV2Request
+        .withBucketName(bucket)
+        .withPrefix(key + "/")
+        .withDelimiter("/")
+        .withMaxKeys(1);
+
+    return 
!(s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().isEmpty()
+        && 
s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty());
   }
 
   private boolean isInitFile(S3ObjectSummary s3ObjectSummary) {
@@ -318,6 +362,13 @@ public class S3FileSystem extends FileSystem {
     return "/";
   }
 
+  /**
+   * s3没有目录概念,无法直接创建目录 S3 lacks the concept of directories and cannot create 
directories directly.
+   *
+   * @param dest
+   * @return
+   * @throws IOException
+   */
   @Override
   public boolean mkdir(FsPath dest) throws IOException {
     String path = new File(dest.getPath(), INIT_FILE_NAME).getPath();
@@ -339,7 +390,7 @@ public class S3FileSystem extends FileSystem {
       fsPath.setOwner(owner.getDisplayName());
     }
     try {
-      fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath()));
+      fsPath.setIsdir(isDir(s3ObjectSummary.getKey()));
     } catch (Throwable e) {
       logger.warn("Failed to fill storage file:" + fsPath.getPath(), e);
     }
@@ -359,7 +410,7 @@ public class S3FileSystem extends FileSystem {
 
   @Override
   public boolean canRead(FsPath dest, String user) throws IOException {
-    return false;
+    return true;
   }
 
   @Override
@@ -384,7 +435,10 @@ public class S3FileSystem extends FileSystem {
 
   @Override
   public long getLength(FsPath dest) throws IOException {
-    return 0;
+    return s3Client
+        .getObject(bucket, buildKey(dest.getPath()))
+        .getObjectMetadata()
+        .getContentLength();
   }
 
   @Override
@@ -418,7 +472,9 @@ public class S3FileSystem extends FileSystem {
   }
 
   @Override
-  public void close() throws IOException {}
+  public void close() throws IOException {
+    s3Client.shutdown();
+  }
 
   public String getLabel() {
     return label;
@@ -429,46 +485,22 @@ public class S3FileSystem extends FileSystem {
   }
 
   public String buildPath(String path) {
-    if (path == null || "".equals(path)) return "";
+    if (path == null || path.isEmpty()) return "";
     if (path.startsWith("/")) {
       return StorageUtils.S3_SCHEMA() + path;
     }
     return StorageUtils.S3_SCHEMA() + "/" + path;
   }
 
-  public String buildPrefix(String path, boolean addTail) {
+  public String buildKey(String path) {
     String res = path;
-    if (path == null || "".equals(path)) return "";
+    if (path == null || path.isEmpty()) return "";
     if (path.startsWith("/")) {
       res = path.replaceFirst("/", "");
     }
-    if (!path.endsWith("/") && addTail) {
-      res = res + "/";
+    if (path.endsWith("/") && !res.isEmpty()) {
+      res = res.substring(0, res.length() - 1);
     }
     return res;
   }
-
-  public String buildPrefix(String path) {
-    return buildPrefix(path, true);
-  }
-}
-
-class S3OutputStream extends ByteArrayOutputStream {
-  private AmazonS3 s3Client;
-  private String bucket;
-  private String path;
-
-  public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
-    this.s3Client = s3Client;
-    this.bucket = bucket;
-    this.path = path;
-  }
-
-  @Override
-  public void close() throws IOException {
-    byte[] buffer = this.toByteArray();
-    try (InputStream in = new ByteArrayInputStream(buffer)) {
-      s3Client.putObject(bucket, path, in, new ObjectMetadata());
-    }
-  }
 }
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/stream/S3OutputStream.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/stream/S3OutputStream.java
new file mode 100644
index 0000000000..b5b39f6d5b
--- /dev/null
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/stream/S3OutputStream.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.storage.fs.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3OutputStream extends OutputStream {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(S3OutputStream.class);
+
+  /** The bucket-name on Amazon S3 */
+  private final String bucket;
+
+  /** The path (key) name within the bucket */
+  private final String path;
+
+  int BUFFER_SIZE = 5 * 1024 * 1024;
+
+  private final byte[] buf = new byte[BUFFER_SIZE];;
+
+  private byte[] flashBuffer;
+
+  /** The position in the buffer */
+  private int position = 0;
+
+  /** Amazon S3 client. */
+  private final AmazonS3 s3Client;
+
+  /** The unique id for this upload */
+  private String uploadId;
+
+  /** Collection of the etags for the parts that have been uploaded */
+  private final List<PartETag> etags = new ArrayList<>();
+
+  /**
+   * Creates a new S3 OutputStream
+   *
+   * @param s3Client the AmazonS3 client
+   * @param bucket name of the bucket
+   * @param path path within the bucket
+   */
+  public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
+    if (s3Client == null) {
+      throw new IllegalArgumentException("The s3Client cannot be null.");
+    }
+    if (bucket == null || bucket.isEmpty()) {
+      throw new IllegalArgumentException("The bucket cannot be null or an 
empty string.");
+    }
+    if (path == null || path.isEmpty()) {
+      throw new IllegalArgumentException("The path cannot be null or an empty 
string.");
+    }
+    this.s3Client = s3Client;
+    this.bucket = bucket;
+    this.path = path;
+  }
+
+  /**
+   * Write an array to the S3 output stream.
+   *
+   * @param b the byte-array to append
+   */
+  @Override
+  public void write(byte[] b) {
+    write(b, 0, b.length);
+  }
+
+  /**
+   * Writes an array to the S3 Output Stream
+   *
+   * @param byteArray the array to write
+   * @param o the offset into the array
+   * @param l the number of bytes to write
+   */
+  @Override
+  public void write(final byte[] byteArray, final int o, final int l) {
+    int ofs = o, len = l;
+    int size;
+    while (len > (size = this.buf.length - position)) {
+      System.arraycopy(byteArray, ofs, this.buf, this.position, size);
+      this.position += size;
+      flushBufferAndRewind();
+      ofs += size;
+      len -= size;
+    }
+    System.arraycopy(byteArray, ofs, this.buf, this.position, len);
+    this.position += len;
+  }
+
+  /** Flushes the buffer by uploading a part to S3. */
+  @Override
+  public synchronized void flush() {}
+
+  protected void flushBufferAndRewind() {
+    if (uploadId == null) {
+      LOG.info("Starting a multipart upload for {}/{}", this.bucket, 
this.path);
+      try {
+        final InitiateMultipartUploadRequest request =
+            new InitiateMultipartUploadRequest(this.bucket, this.path)
+                .withCannedACL(CannedAccessControlList.BucketOwnerFullControl);
+        InitiateMultipartUploadResult initResponse = 
s3Client.initiateMultipartUpload(request);
+        this.uploadId = initResponse.getUploadId();
+      } catch (AmazonS3Exception e) {
+        LOG.error("Failed to start multipart upload: {}", e.getMessage(), e);
+        throw new RuntimeException(e);
+      }
+    }
+    try {
+      uploadPart();
+    } catch (AmazonS3Exception e) {
+      LOG.error("Failed to upload part: {}", e.getMessage(), e);
+      this.s3Client.abortMultipartUpload(
+          new AbortMultipartUploadRequest(this.bucket, this.path, 
this.uploadId));
+      throw new RuntimeException(e);
+    }
+    this.position = 0;
+  }
+
+  protected void uploadPart() {
+    LOG.debug("Uploading part {}", this.etags.size());
+    try {
+      UploadPartResult uploadResult =
+          s3Client.uploadPart(
+              new UploadPartRequest()
+                  .withBucketName(this.bucket)
+                  .withKey(this.path)
+                  .withUploadId(this.uploadId)
+                  .withInputStream(new ByteArrayInputStream(buf, 0, 
this.position))
+                  .withPartNumber(this.etags.size() + 1)
+                  .withPartSize(this.position));
+      this.etags.add(uploadResult.getPartETag());
+    } catch (AmazonS3Exception e) {
+      LOG.error("Failed to upload part: {}", e.getMessage(), e);
+      this.s3Client.abortMultipartUpload(
+          new AbortMultipartUploadRequest(this.bucket, this.path, 
this.uploadId));
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+    if (this.uploadId != null) {
+      if (this.position > 0) {
+        uploadPart();
+      }
+      LOG.debug("Completing multipart");
+      try {
+        this.s3Client.completeMultipartUpload(
+            new CompleteMultipartUploadRequest(bucket, path, uploadId, etags));
+      } catch (AmazonS3Exception e) {
+        LOG.error("Failed to complete multipart upload: {}", e.getMessage(), 
e);
+        throw new RuntimeException(e);
+      }
+    } else {
+      LOG.debug("Uploading object at once to {}/{}", this.bucket, this.path);
+      try {
+        final ObjectMetadata metadata = new ObjectMetadata();
+        metadata.setContentLength(this.position);
+        final PutObjectRequest request =
+            new PutObjectRequest(
+                    this.bucket,
+                    this.path,
+                    new ByteArrayInputStream(this.buf, 0, this.position),
+                    metadata)
+                .withCannedAcl(CannedAccessControlList.BucketOwnerFullControl);
+        this.s3Client.putObject(request);
+      } catch (AmazonS3Exception e) {
+        LOG.error("Failed to upload object: {}", e.getMessage(), e);
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  public void cancel() {
+    if (this.uploadId != null) {
+      try {
+        LOG.debug("Aborting multipart upload");
+        this.s3Client.abortMultipartUpload(
+            new AbortMultipartUploadRequest(this.bucket, this.path, 
this.uploadId));
+      } catch (AmazonS3Exception e) {
+        LOG.error("Failed to abort multipart upload: {}", e.getMessage(), e);
+      }
+    }
+  }
+
+  @Override
+  public void write(int b) {
+
+    if (position >= this.buf.length) {
+      flushBufferAndRewind();
+    }
+    this.buf[position++] = (byte) b;
+  }
+}
diff --git 
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
 
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
index 9c344fa802..0e382128ca 100644
--- 
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
+++ 
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
@@ -122,7 +122,7 @@ object FileSystemUtils extends Logging {
     var parentPath = dest.getParent
     val dirsToMake = new util.Stack[FsPath]()
     dirsToMake.push(dest)
-    while (!fileSystem.exists(parentPath)) {
+    while (parentPath != null && !fileSystem.exists(parentPath)) {
       dirsToMake.push(parentPath)
       parentPath = parentPath.getParent
     }
@@ -153,7 +153,7 @@ object FileSystemUtils extends Logging {
     var parentPath = dest.getParent
     val dirsToMake = new util.Stack[FsPath]()
     dirsToMake.push(dest)
-    while (!fileSystem.exists(parentPath)) {
+    while (parentPath != null && !fileSystem.exists(parentPath)) {
       dirsToMake.push(parentPath)
       parentPath = parentPath.getParent
     }
diff --git 
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
index ae83749ecb..63c963d2d3 100644
--- 
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
+++ 
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
@@ -132,7 +132,7 @@ object GovernanceUtils extends Logging {
    * @return
    */
   def getResultParentPath(creator: String): String = {
-    val resPrefix = GovernanceCommonConf.DEFAULT_LOGPATH_PREFIX
+    val resPrefix = GovernanceCommonConf.RESULT_SET_STORE_PATH.getValue
     val resStb = new StringBuilder()
     if (resStb.endsWith("/")) {
       resStb.append(resPrefix)
diff --git 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala
 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala
index 746774633e..3430c1809b 100644
--- 
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala
+++ 
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.entrance.utils
 import org.apache.linkis.common.io.FsPath
 import org.apache.linkis.common.utils.Utils
 import org.apache.linkis.entrance.conf.EntranceConfiguration
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
 import org.apache.linkis.governance.common.entity.job.JobRequest
 import org.apache.linkis.manager.label.utils.LabelUtil
 import org.apache.linkis.storage.FSFactory
@@ -51,6 +52,8 @@ object CommonLogPathUtils {
     val fsPath = new FsPath(commonPath)
     if (StorageUtils.HDFS.equals(fsPath.getFsType)) {
       FSFactory.getFs(StorageUtils.HDFS).asInstanceOf[FileSystem]
+    } else if (StorageUtils.S3.equals(fsPath.getFsType)) {
+      FSFactory.getFs(StorageUtils.S3).asInstanceOf[FileSystem]
     } else {
       FSFactory
         .getFs(StorageUtils.FILE, 
StorageConfiguration.LOCAL_ROOT_USER.getValue)
@@ -58,7 +61,7 @@ object CommonLogPathUtils {
     }
   }
 
-  private val resPrefix = EntranceConfiguration.DEFAULT_LOGPATH_PREFIX.getValue
+  private val resPrefix = GovernanceCommonConf.RESULT_SET_STORE_PATH.getValue
 
   /**
    * get result path parentPath: resPrefix + dateStr + result + creator 
subPath: parentPath +
diff --git 
a/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java
 
b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java
index 83acf03a07..a6138fe510 100644
--- 
a/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java
+++ 
b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java
@@ -26,16 +26,22 @@ public class ResourceHelperFactory {
 
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ResourceHelperFactory.class);
 
-  private static final boolean IS_HDFS = (Boolean) 
BmlServerConfiguration.BML_IS_HDFS().getValue();
+  private static final String FILESYSTEM_TYPE =
+      BmlServerConfiguration.BML_FILESYSTEM_TYPE().getValue();
 
   private static final ResourceHelper HDFS_RESOURCE_HELPER = new 
HdfsResourceHelper();
 
   private static final ResourceHelper LOCAL_RESOURCE_HELPER = new 
LocalResourceHelper();
 
+  private static final ResourceHelper S3_RESOURCE_HELPER = new 
S3ResourceHelper();
+
   public static ResourceHelper getResourceHelper() {
-    if (IS_HDFS) {
+    if (FILESYSTEM_TYPE.equals("hdfs")) {
       LOGGER.info("will store resource in hdfs");
       return HDFS_RESOURCE_HELPER;
+    } else if (FILESYSTEM_TYPE.equals("s3")) {
+      LOGGER.info("will store resource in s3");
+      return S3_RESOURCE_HELPER;
     } else {
       LOGGER.info("will store resource in local");
       return LOCAL_RESOURCE_HELPER;
diff --git 
a/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/S3ResourceHelper.java
 
b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/S3ResourceHelper.java
new file mode 100644
index 0000000000..5370da9351
--- /dev/null
+++ 
b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/S3ResourceHelper.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.bml.common;
+
+import org.apache.linkis.bml.conf.BmlServerConfiguration;
+import org.apache.linkis.common.io.Fs;
+import org.apache.linkis.common.io.FsPath;
+import org.apache.linkis.storage.FSFactory;
+import org.apache.linkis.storage.utils.FileSystemUtils;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.MessageDigest;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3ResourceHelper implements ResourceHelper {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(S3ResourceHelper.class);
+
+  private static final String SCHEMA = "s3://";
+
+  @Override
+  public long upload(
+      String path,
+      String user,
+      InputStream inputStream,
+      StringBuilder stringBuilder,
+      boolean overwrite)
+      throws UploadResourceException {
+    OutputStream outputStream = null;
+    InputStream is0 = null;
+    InputStream is1 = null;
+    long size = 0;
+    Fs fileSystem = null;
+    try {
+      FsPath fsPath = new FsPath(path);
+      fileSystem = FSFactory.getFsByProxyUser(fsPath, user);
+      fileSystem.init(null);
+      if (!fileSystem.exists(fsPath)) {
+        FileSystemUtils.createNewFile(fsPath, user, true);
+      }
+      byte[] buffer = new byte[1024];
+      long beforeSize = -1;
+      is0 = fileSystem.read(fsPath);
+      int ch0 = 0;
+      while ((ch0 = is0.read(buffer)) != -1) {
+        beforeSize += ch0;
+      }
+      outputStream = fileSystem.write(fsPath, overwrite);
+      int ch = 0;
+      MessageDigest md5Digest = DigestUtils.getMd5Digest();
+      while ((ch = inputStream.read(buffer)) != -1) {
+        md5Digest.update(buffer, 0, ch);
+        outputStream.write(buffer, 0, ch);
+        size += ch;
+      }
+      if (stringBuilder != null) {
+        stringBuilder.append(Hex.encodeHexString(md5Digest.digest()));
+      }
+      // 通过文件名获取的文件所有的字节,这样就避免了错误更新后的更新都是错的
+      long afterSize = -1;
+      is1 = fileSystem.read(fsPath);
+      int ch1 = 0;
+      while ((ch1 = is1.read(buffer)) != -1) {
+        afterSize += ch1;
+      }
+      size = Math.max(size, afterSize - beforeSize);
+    } catch (final IOException e) {
+      logger.error("{} write to {} failed, reason is, IOException:", user, 
path, e);
+      UploadResourceException uploadResourceException = new 
UploadResourceException();
+      uploadResourceException.initCause(e);
+      throw uploadResourceException;
+    } catch (final Throwable t) {
+      logger.error("{} write to {} failed, reason is", user, path, t);
+      UploadResourceException uploadResourceException = new 
UploadResourceException();
+      uploadResourceException.initCause(t);
+      throw uploadResourceException;
+    } finally {
+      IOUtils.closeQuietly(outputStream);
+      IOUtils.closeQuietly(inputStream);
+      IOUtils.closeQuietly(is0);
+      IOUtils.closeQuietly(is1);
+      if (fileSystem != null) {
+        try {
+          fileSystem.close();
+        } catch (Exception e) {
+          logger.error("close filesystem failed", e);
+        }
+      }
+    }
+    return size;
+  }
+
+  @Override
+  public void update(String path) {}
+
+  @Override
+  public void getResource(String path, int start, int end) {}
+
+  @Override
+  public String getSchema() {
+    return SCHEMA;
+  }
+
+  /**
+   * Motivation to modify this path: This path is under /apps-data/hadoop/user 
on hdfs, which is
+   * mixed with the result set file. Bml files cannot be synchronized 
separately. When the user
+   * verifies the workflow, it is necessary to synchronize the full amount of 
personal and full hdfs
+   * data each time, which is very inconvenient.
+   */
+  @Override
+  public String generatePath(String user, String fileName, Map<String, Object> 
properties) {
+    String resourceHeader = (String) properties.get("resourceHeader");
+    SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
+    String dateStr = format.format(new Date());
+    if (StringUtils.isNotEmpty(resourceHeader)) {
+      return getSchema()
+          + BmlServerConfiguration.BML_PREFIX().getValue()
+          + "/"
+          + user
+          + "/bml"
+          + "/"
+          + dateStr
+          + "/"
+          + resourceHeader
+          + "/"
+          + fileName;
+    } else {
+      return getSchema()
+          + BmlServerConfiguration.BML_PREFIX().getValue()
+          + "/"
+          + user
+          + "/bml"
+          + "/"
+          + dateStr
+          + "/"
+          + fileName;
+    }
+  }
+
+  @Override
+  public boolean checkIfExists(String path, String user) throws IOException {
+    Fs fileSystem = FSFactory.getFsByProxyUser(new FsPath(path), user);
+    fileSystem.init(null);
+    try {
+      return fileSystem.exists(new FsPath(path));
+    } finally {
+      fileSystem.close();
+    }
+  }
+
+  @Override
+  public boolean checkBmlResourceStoragePrefixPathIfChanged(String path) {
+    String prefixPath = getSchema() + 
BmlServerConfiguration.BML_PREFIX().getValue();
+    return !path.startsWith(prefixPath);
+  }
+}
diff --git 
a/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala
 
b/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala
index 7fde694ef6..3b2a0d112a 100644
--- 
a/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala
+++ 
b/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala
@@ -29,6 +29,35 @@ object BmlServerConfiguration {
 
   val BML_IS_HDFS: CommonVars[Boolean] = 
CommonVars[Boolean]("wds.linkis.bml.is.hdfs", true)
 
+  /**
+   * BML_FILESYSTEM_TYPE: 用于区分BML的文件系统类型,目前支持hdfs、file、s3三种类型 
默认值通过BML_IS_HDFS判断,以便版本向下兼容
+   * BML_FILESYSTEM_TYPE: Used to distinguish the type of file system for BML. 
Currently, it
+   * supports three types: hdfs, file, and s3. The default value is determined 
by BML_IS_HDFS to
+   * ensure backward compatibility with previous versions.
+   */
+  val BML_FILESYSTEM_TYPE = CommonVars(
+    "linkis.bml.filesystem.type",
+    if (BML_IS_HDFS.getValue) {
+      "hdfs"
+    } else {
+      "file"
+    }
+  )
+
+  /**
+   * BML_PREFIX: BML的文件系统前缀 默认值通过BML_IS_HDFS判断,以便版本向下兼容 BML_PREFIX: The file 
system prefix for BML.
+   * The default value is determined based on BML_IS_HDFS to ensure backward 
compatibility with
+   * previous versions.
+   */
+  val BML_PREFIX = CommonVars(
+    "linkis.bml.prefix",
+    if (BML_IS_HDFS.getValue) {
+      BML_HDFS_PREFIX.getValue
+    } else {
+      BML_LOCAL_PREFIX.getValue
+    }
+  )
+
   val BML_CLEAN_EXPIRED_TIME: CommonVars[Int] =
     CommonVars[Int]("wds.linkis.bml.cleanExpired.time", 100)
 
diff --git 
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
 
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
index 28dc65dcc5..1e3ce7599c 100644
--- 
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
+++ 
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
@@ -148,6 +148,9 @@ public class FsRestfulApi {
     if (StorageUtils.HDFS().equalsIgnoreCase(pathType)) {
       path = hdfsUserRootPathPrefix + userName + hdfsUserRootPathSuffix;
       returnType = StorageUtils.HDFS().toUpperCase();
+    } else if (StorageUtils.S3().equalsIgnoreCase(pathType)) {
+      path = localUserRootPath + userName;
+      returnType = StorageUtils.S3().toUpperCase();
     } else {
       path = localUserRootPath + userName;
       returnType = LOCAL_RETURN_TYPE;
@@ -1490,6 +1493,7 @@ public class FsRestfulApi {
     // 返回成功消息并包含文件地址
     return Message.ok().data("filePath", newPath);
   }
+
   /**
    * *
    *


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to