This is an automated email from the ASF dual-hosted git repository.
peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git
The following commit(s) were added to refs/heads/master by this push:
new 6bc388f84d Enhance the functionality of s3filesystem (#5208)
6bc388f84d is described below
commit 6bc388f84d53990e19497948852c21c1a790fb57
Author: LiuGuoHua <[email protected]>
AuthorDate: Wed Dec 4 19:35:10 2024 +0800
Enhance the functionality of s3filesystem (#5208)
* 1. Enhance the functionality of s3filesystem to support multipart uploads.
2. Support the use of s3 storage for BML materials and workspaces.
* format code
---
.../linkis/storage/fs/impl/S3FileSystem.java | 190 ++++++++++--------
.../linkis/storage/fs/stream/S3OutputStream.java | 216 +++++++++++++++++++++
.../linkis/storage/utils/FileSystemUtils.scala | 4 +-
.../governance/common/utils/GovernanceUtils.scala | 2 +-
.../linkis/entrance/utils/CommonLogPathUtils.scala | 5 +-
.../linkis/bml/common/ResourceHelperFactory.java | 10 +-
.../apache/linkis/bml/common/S3ResourceHelper.java | 183 +++++++++++++++++
.../linkis/bml/conf/BmlServerConfiguration.scala | 29 +++
.../filesystem/restful/api/FsRestfulApi.java | 4 +
9 files changed, 558 insertions(+), 85 deletions(-)
diff --git
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
index 2aff3da7f5..a9f00b60d5 100644
---
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
+++
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/impl/S3FileSystem.java
@@ -21,13 +21,17 @@ import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.storage.domain.FsPathListWithError;
import org.apache.linkis.storage.exception.StorageWarnException;
import org.apache.linkis.storage.fs.FileSystem;
+import org.apache.linkis.storage.fs.stream.S3OutputStream;
import org.apache.linkis.storage.utils.StorageConfiguration;
import org.apache.linkis.storage.utils.StorageUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
-import java.io.*;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -99,6 +103,7 @@ public class S3FileSystem extends FileSystem {
public FsPath get(String dest) throws IOException {
FsPath ret = new FsPath(dest);
if (exists(ret)) {
+ ret.setIsdir(isDir(buildKey(ret.getPath())));
return ret;
} else {
logger.warn("File or folder does not exist or file name is
garbled(文件或者文件夹不存在或者文件名乱码)");
@@ -111,7 +116,7 @@ public class S3FileSystem extends FileSystem {
@Override
public InputStream read(FsPath dest) throws IOException {
try {
- return s3Client.getObject(bucket, buildPrefix(dest.getPath(),
false)).getObjectContent();
+ return s3Client.getObject(bucket,
buildKey(dest.getPath())).getObjectContent();
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " +
dest.getPath());
}
@@ -119,13 +124,23 @@ public class S3FileSystem extends FileSystem {
@Override
public OutputStream write(FsPath dest, boolean overwrite) throws IOException
{
- try (InputStream inputStream = read(dest);
- OutputStream outputStream =
- new S3OutputStream(s3Client, bucket, buildPrefix(dest.getPath(),
false))) {
+ InputStream inputStream = null;
+ try {
+ if (!exists(dest)) {
+ create(dest.getPath());
+ }
+
+ OutputStream outputStream = new S3OutputStream(s3Client, bucket,
buildKey(dest.getPath()));
+
if (!overwrite) {
+ inputStream = read(dest);
IOUtils.copy(inputStream, outputStream);
}
return outputStream;
+ } catch (IOException e) {
+ throw new IOException("You have not permission to access path " +
dest.getPath());
+ } finally {
+ IOUtils.closeQuietly(inputStream);
}
}
@@ -134,7 +149,7 @@ public class S3FileSystem extends FileSystem {
if (exists(new FsPath(dest))) {
return false;
}
- s3Client.putObject(bucket, dest, "");
+ s3Client.putObject(bucket, buildKey(dest), "");
return true;
}
@@ -142,16 +157,31 @@ public class S3FileSystem extends FileSystem {
public List<FsPath> list(FsPath path) throws IOException {
try {
if (!StringUtils.isEmpty(path.getPath())) {
- ListObjectsV2Result listObjectsV2Result =
s3Client.listObjectsV2(bucket, path.getPath());
- List<S3ObjectSummary> s3ObjectSummaries =
listObjectsV2Result.getObjectSummaries();
- return s3ObjectSummaries.stream()
- .filter(summary -> !isInitFile(summary))
- .map(
- summary -> {
- FsPath newPath = new FsPath(buildPath(summary.getKey()));
- return fillStorageFile(newPath, summary);
- })
- .collect(Collectors.toList());
+ ListObjectsV2Request listObjectsV2Request =
+ new ListObjectsV2Request()
+ .withBucketName(bucket)
+ .withPrefix(buildKey(path.getPath()) + "/")
+ .withDelimiter("/");
+ ListObjectsV2Result dirResult =
s3Client.listObjectsV2(listObjectsV2Request);
+ List<S3ObjectSummary> s3ObjectSummaries =
dirResult.getObjectSummaries();
+ List<String> commonPrefixes = dirResult.getCommonPrefixes();
+ List<FsPath> fsPaths =
+ s3ObjectSummaries.stream()
+ .filter(summary -> !isInitFile(summary))
+ .map(
+ summary -> {
+ FsPath newPath = new FsPath(buildPath(summary.getKey()));
+ return fillStorageFile(newPath, summary);
+ })
+ .collect(Collectors.toList());
+ if (commonPrefixes != null) {
+ for (String dir : commonPrefixes) {
+ FsPath newPath = new FsPath(buildPath(dir));
+ newPath.setIsdir(true);
+ fsPaths.add(newPath);
+ }
+ }
+ return fsPaths;
}
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " +
path.getPath());
@@ -173,7 +203,7 @@ public class S3FileSystem extends FileSystem {
ListObjectsV2Request listObjectsV2Request =
new ListObjectsV2Request()
.withBucketName(bucket)
- .withPrefix(buildPrefix(path.getPath()))
+ .withPrefix(buildKey(path.getPath()) + "/")
.withDelimiter("/");
ListObjectsV2Result dirResult =
s3Client.listObjectsV2(listObjectsV2Request);
List<S3ObjectSummary> s3ObjectSummaries =
dirResult.getObjectSummaries();
@@ -204,25 +234,15 @@ public class S3FileSystem extends FileSystem {
@Override
public boolean exists(FsPath dest) throws IOException {
try {
- if (new File(dest.getPath()).getName().contains(".")) {
- return existsFile(dest);
+ if (dest == null) {
+ return false;
}
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request
.withBucketName(bucket)
- .withPrefix(buildPrefix(dest.getPath()))
- .withDelimiter("/");
- return
s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().size()
- +
s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().size()
- > 0;
- } catch (AmazonS3Exception e) {
- return false;
- }
- }
-
- public boolean existsFile(FsPath dest) {
- try {
- return s3Client.doesObjectExist(bucket, buildPrefix(dest.getPath(),
false));
+ .withPrefix(buildKey(dest.getPath()))
+ .withMaxKeys(1);
+ return
!s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty();
} catch (AmazonS3Exception e) {
return false;
}
@@ -231,25 +251,41 @@ public class S3FileSystem extends FileSystem {
@Override
public boolean delete(FsPath dest) throws IOException {
try {
- ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
-
listObjectsV2Request.withBucketName(bucket).withPrefix(buildPrefix(dest.getPath(),
false));
- ListObjectsV2Result result =
s3Client.listObjectsV2(listObjectsV2Request);
- String[] keyList =
-
result.getObjectSummaries().stream().map(S3ObjectSummary::getKey).toArray(String[]::new);
- DeleteObjectsRequest deleteObjectsRequest =
- new DeleteObjectsRequest("test").withKeys(keyList);
- s3Client.deleteObjects(deleteObjectsRequest);
+ List<String> deleteKeys = new ArrayList<>();
+ delete(dest, deleteKeys);
+ if (!deleteKeys.isEmpty()) {
+ DeleteObjectsRequest deleteObjectsRequest =
+ new DeleteObjectsRequest(bucket).withKeys(deleteKeys.toArray(new
String[0]));
+ s3Client.deleteObjects(deleteObjectsRequest);
+ }
return true;
} catch (AmazonS3Exception e) {
throw new IOException("You have not permission to access path " +
dest.getPath());
}
}
+ public void delete(FsPath dest, List<String> keys) throws IOException {
+ if (isDir(buildKey(dest.getPath()))) {
+ FsPathListWithError fsPathListWithError = listPathWithError(dest, false);
+ List<FsPath> fsPaths = fsPathListWithError.getFsPaths();
+ fsPaths.forEach(
+ fsPath -> {
+ try {
+ delete(fsPath, keys);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ } else {
+ keys.add(buildKey(dest.getPath()));
+ }
+ }
+
@Override
public boolean renameTo(FsPath oldDest, FsPath newDest) throws IOException {
try {
- String newOriginPath = buildPrefix(oldDest.getPath(), false);
- String newDestPath = buildPrefix(newDest.getPath(), false);
+ String newOriginPath = buildKey(oldDest.getPath());
+ String newDestPath = buildKey(newDest.getPath());
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(newOriginPath);
ListObjectsV2Result result =
s3Client.listObjectsV2(listObjectsV2Request);
@@ -281,8 +317,8 @@ public class S3FileSystem extends FileSystem {
@Override
public boolean copy(String origin, String dest) throws IOException {
try {
- String newOrigin = buildPrefix(origin, false);
- String newDest = buildPrefix(dest, false);
+ String newOrigin = buildKey(origin);
+ String newDest = buildKey(dest);
ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
listObjectsV2Request.withBucketName(bucket).withPrefix(newOrigin);
ListObjectsV2Result result =
s3Client.listObjectsV2(listObjectsV2Request);
@@ -305,8 +341,16 @@ public class S3FileSystem extends FileSystem {
}
}
- private boolean isDir(S3ObjectSummary s3ObjectSummary, String prefix) {
- return s3ObjectSummary.getKey().substring(prefix.length()).contains("/");
+ private boolean isDir(String key) {
+ ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request();
+ listObjectsV2Request
+ .withBucketName(bucket)
+ .withPrefix(key + "/")
+ .withDelimiter("/")
+ .withMaxKeys(1);
+
+ return
!(s3Client.listObjectsV2(listObjectsV2Request).getCommonPrefixes().isEmpty()
+ &&
s3Client.listObjectsV2(listObjectsV2Request).getObjectSummaries().isEmpty());
}
private boolean isInitFile(S3ObjectSummary s3ObjectSummary) {
@@ -318,6 +362,13 @@ public class S3FileSystem extends FileSystem {
return "/";
}
+ /**
+ * s3没有目录概念,无法直接创建目录 S3 lacks the concept of directories and cannot create
directories directly.
+ *
+ * @param dest
+ * @return
+ * @throws IOException
+ */
@Override
public boolean mkdir(FsPath dest) throws IOException {
String path = new File(dest.getPath(), INIT_FILE_NAME).getPath();
@@ -339,7 +390,7 @@ public class S3FileSystem extends FileSystem {
fsPath.setOwner(owner.getDisplayName());
}
try {
- fsPath.setIsdir(isDir(s3ObjectSummary, fsPath.getParent().getPath()));
+ fsPath.setIsdir(isDir(s3ObjectSummary.getKey()));
} catch (Throwable e) {
logger.warn("Failed to fill storage file:" + fsPath.getPath(), e);
}
@@ -359,7 +410,7 @@ public class S3FileSystem extends FileSystem {
@Override
public boolean canRead(FsPath dest, String user) throws IOException {
- return false;
+ return true;
}
@Override
@@ -384,7 +435,10 @@ public class S3FileSystem extends FileSystem {
@Override
public long getLength(FsPath dest) throws IOException {
- return 0;
+ return s3Client
+ .getObject(bucket, buildKey(dest.getPath()))
+ .getObjectMetadata()
+ .getContentLength();
}
@Override
@@ -418,7 +472,9 @@ public class S3FileSystem extends FileSystem {
}
@Override
- public void close() throws IOException {}
+ public void close() throws IOException {
+ s3Client.shutdown();
+ }
public String getLabel() {
return label;
@@ -429,46 +485,22 @@ public class S3FileSystem extends FileSystem {
}
public String buildPath(String path) {
- if (path == null || "".equals(path)) return "";
+ if (path == null || path.isEmpty()) return "";
if (path.startsWith("/")) {
return StorageUtils.S3_SCHEMA() + path;
}
return StorageUtils.S3_SCHEMA() + "/" + path;
}
- public String buildPrefix(String path, boolean addTail) {
+ public String buildKey(String path) {
String res = path;
- if (path == null || "".equals(path)) return "";
+ if (path == null || path.isEmpty()) return "";
if (path.startsWith("/")) {
res = path.replaceFirst("/", "");
}
- if (!path.endsWith("/") && addTail) {
- res = res + "/";
+ if (path.endsWith("/") && !res.isEmpty()) {
+ res = res.substring(0, res.length() - 1);
}
return res;
}
-
- public String buildPrefix(String path) {
- return buildPrefix(path, true);
- }
-}
-
-class S3OutputStream extends ByteArrayOutputStream {
- private AmazonS3 s3Client;
- private String bucket;
- private String path;
-
- public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
- this.s3Client = s3Client;
- this.bucket = bucket;
- this.path = path;
- }
-
- @Override
- public void close() throws IOException {
- byte[] buffer = this.toByteArray();
- try (InputStream in = new ByteArrayInputStream(buffer)) {
- s3Client.putObject(bucket, path, in, new ObjectMetadata());
- }
- }
}
diff --git
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/stream/S3OutputStream.java
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/stream/S3OutputStream.java
new file mode 100644
index 0000000000..b5b39f6d5b
--- /dev/null
+++
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/fs/stream/S3OutputStream.java
@@ -0,0 +1,216 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.storage.fs.stream;
+
+import java.io.ByteArrayInputStream;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+import com.amazonaws.services.s3.AmazonS3;
+import com.amazonaws.services.s3.model.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3OutputStream extends OutputStream {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(S3OutputStream.class);
+
+ /** The bucket-name on Amazon S3 */
+ private final String bucket;
+
+ /** The path (key) name within the bucket */
+ private final String path;
+
+ int BUFFER_SIZE = 5 * 1024 * 1024;
+
+ private final byte[] buf = new byte[BUFFER_SIZE];;
+
+ private byte[] flashBuffer;
+
+ /** The position in the buffer */
+ private int position = 0;
+
+ /** Amazon S3 client. */
+ private final AmazonS3 s3Client;
+
+ /** The unique id for this upload */
+ private String uploadId;
+
+ /** Collection of the etags for the parts that have been uploaded */
+ private final List<PartETag> etags = new ArrayList<>();
+
+ /**
+ * Creates a new S3 OutputStream
+ *
+ * @param s3Client the AmazonS3 client
+ * @param bucket name of the bucket
+ * @param path path within the bucket
+ */
+ public S3OutputStream(AmazonS3 s3Client, String bucket, String path) {
+ if (s3Client == null) {
+ throw new IllegalArgumentException("The s3Client cannot be null.");
+ }
+ if (bucket == null || bucket.isEmpty()) {
+ throw new IllegalArgumentException("The bucket cannot be null or an
empty string.");
+ }
+ if (path == null || path.isEmpty()) {
+ throw new IllegalArgumentException("The path cannot be null or an empty
string.");
+ }
+ this.s3Client = s3Client;
+ this.bucket = bucket;
+ this.path = path;
+ }
+
+ /**
+ * Write an array to the S3 output stream.
+ *
+ * @param b the byte-array to append
+ */
+ @Override
+ public void write(byte[] b) {
+ write(b, 0, b.length);
+ }
+
+ /**
+ * Writes an array to the S3 Output Stream
+ *
+ * @param byteArray the array to write
+ * @param o the offset into the array
+ * @param l the number of bytes to write
+ */
+ @Override
+ public void write(final byte[] byteArray, final int o, final int l) {
+ int ofs = o, len = l;
+ int size;
+ while (len > (size = this.buf.length - position)) {
+ System.arraycopy(byteArray, ofs, this.buf, this.position, size);
+ this.position += size;
+ flushBufferAndRewind();
+ ofs += size;
+ len -= size;
+ }
+ System.arraycopy(byteArray, ofs, this.buf, this.position, len);
+ this.position += len;
+ }
+
+ /** Flushes the buffer by uploading a part to S3. */
+ @Override
+ public synchronized void flush() {}
+
+ protected void flushBufferAndRewind() {
+ if (uploadId == null) {
+ LOG.info("Starting a multipart upload for {}/{}", this.bucket,
this.path);
+ try {
+ final InitiateMultipartUploadRequest request =
+ new InitiateMultipartUploadRequest(this.bucket, this.path)
+ .withCannedACL(CannedAccessControlList.BucketOwnerFullControl);
+ InitiateMultipartUploadResult initResponse =
s3Client.initiateMultipartUpload(request);
+ this.uploadId = initResponse.getUploadId();
+ } catch (AmazonS3Exception e) {
+ LOG.error("Failed to start multipart upload: {}", e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ }
+ try {
+ uploadPart();
+ } catch (AmazonS3Exception e) {
+ LOG.error("Failed to upload part: {}", e.getMessage(), e);
+ this.s3Client.abortMultipartUpload(
+ new AbortMultipartUploadRequest(this.bucket, this.path,
this.uploadId));
+ throw new RuntimeException(e);
+ }
+ this.position = 0;
+ }
+
+ protected void uploadPart() {
+ LOG.debug("Uploading part {}", this.etags.size());
+ try {
+ UploadPartResult uploadResult =
+ s3Client.uploadPart(
+ new UploadPartRequest()
+ .withBucketName(this.bucket)
+ .withKey(this.path)
+ .withUploadId(this.uploadId)
+ .withInputStream(new ByteArrayInputStream(buf, 0,
this.position))
+ .withPartNumber(this.etags.size() + 1)
+ .withPartSize(this.position));
+ this.etags.add(uploadResult.getPartETag());
+ } catch (AmazonS3Exception e) {
+ LOG.error("Failed to upload part: {}", e.getMessage(), e);
+ this.s3Client.abortMultipartUpload(
+ new AbortMultipartUploadRequest(this.bucket, this.path,
this.uploadId));
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.uploadId != null) {
+ if (this.position > 0) {
+ uploadPart();
+ }
+ LOG.debug("Completing multipart");
+ try {
+ this.s3Client.completeMultipartUpload(
+ new CompleteMultipartUploadRequest(bucket, path, uploadId, etags));
+ } catch (AmazonS3Exception e) {
+ LOG.error("Failed to complete multipart upload: {}", e.getMessage(),
e);
+ throw new RuntimeException(e);
+ }
+ } else {
+ LOG.debug("Uploading object at once to {}/{}", this.bucket, this.path);
+ try {
+ final ObjectMetadata metadata = new ObjectMetadata();
+ metadata.setContentLength(this.position);
+ final PutObjectRequest request =
+ new PutObjectRequest(
+ this.bucket,
+ this.path,
+ new ByteArrayInputStream(this.buf, 0, this.position),
+ metadata)
+ .withCannedAcl(CannedAccessControlList.BucketOwnerFullControl);
+ this.s3Client.putObject(request);
+ } catch (AmazonS3Exception e) {
+ LOG.error("Failed to upload object: {}", e.getMessage(), e);
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public void cancel() {
+ if (this.uploadId != null) {
+ try {
+ LOG.debug("Aborting multipart upload");
+ this.s3Client.abortMultipartUpload(
+ new AbortMultipartUploadRequest(this.bucket, this.path,
this.uploadId));
+ } catch (AmazonS3Exception e) {
+ LOG.error("Failed to abort multipart upload: {}", e.getMessage(), e);
+ }
+ }
+ }
+
+ @Override
+ public void write(int b) {
+
+ if (position >= this.buf.length) {
+ flushBufferAndRewind();
+ }
+ this.buf[position++] = (byte) b;
+ }
+}
diff --git
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
index 9c344fa802..0e382128ca 100644
---
a/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
+++
b/linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/FileSystemUtils.scala
@@ -122,7 +122,7 @@ object FileSystemUtils extends Logging {
var parentPath = dest.getParent
val dirsToMake = new util.Stack[FsPath]()
dirsToMake.push(dest)
- while (!fileSystem.exists(parentPath)) {
+ while (parentPath != null && !fileSystem.exists(parentPath)) {
dirsToMake.push(parentPath)
parentPath = parentPath.getParent
}
@@ -153,7 +153,7 @@ object FileSystemUtils extends Logging {
var parentPath = dest.getParent
val dirsToMake = new util.Stack[FsPath]()
dirsToMake.push(dest)
- while (!fileSystem.exists(parentPath)) {
+ while (parentPath != null && !fileSystem.exists(parentPath)) {
dirsToMake.push(parentPath)
parentPath = parentPath.getParent
}
diff --git
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
index ae83749ecb..63c963d2d3 100644
---
a/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
+++
b/linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/GovernanceUtils.scala
@@ -132,7 +132,7 @@ object GovernanceUtils extends Logging {
* @return
*/
def getResultParentPath(creator: String): String = {
- val resPrefix = GovernanceCommonConf.DEFAULT_LOGPATH_PREFIX
+ val resPrefix = GovernanceCommonConf.RESULT_SET_STORE_PATH.getValue
val resStb = new StringBuilder()
if (resStb.endsWith("/")) {
resStb.append(resPrefix)
diff --git
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala
index 746774633e..3430c1809b 100644
---
a/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala
+++
b/linkis-computation-governance/linkis-entrance/src/main/scala/org/apache/linkis/entrance/utils/CommonLogPathUtils.scala
@@ -20,6 +20,7 @@ package org.apache.linkis.entrance.utils
import org.apache.linkis.common.io.FsPath
import org.apache.linkis.common.utils.Utils
import org.apache.linkis.entrance.conf.EntranceConfiguration
+import org.apache.linkis.governance.common.conf.GovernanceCommonConf
import org.apache.linkis.governance.common.entity.job.JobRequest
import org.apache.linkis.manager.label.utils.LabelUtil
import org.apache.linkis.storage.FSFactory
@@ -51,6 +52,8 @@ object CommonLogPathUtils {
val fsPath = new FsPath(commonPath)
if (StorageUtils.HDFS.equals(fsPath.getFsType)) {
FSFactory.getFs(StorageUtils.HDFS).asInstanceOf[FileSystem]
+ } else if (StorageUtils.S3.equals(fsPath.getFsType)) {
+ FSFactory.getFs(StorageUtils.S3).asInstanceOf[FileSystem]
} else {
FSFactory
.getFs(StorageUtils.FILE,
StorageConfiguration.LOCAL_ROOT_USER.getValue)
@@ -58,7 +61,7 @@ object CommonLogPathUtils {
}
}
- private val resPrefix = EntranceConfiguration.DEFAULT_LOGPATH_PREFIX.getValue
+ private val resPrefix = GovernanceCommonConf.RESULT_SET_STORE_PATH.getValue
/**
* get result path parentPath: resPrefix + dateStr + result + creator
subPath: parentPath +
diff --git
a/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java
b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java
index 83acf03a07..a6138fe510 100644
---
a/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java
+++
b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/ResourceHelperFactory.java
@@ -26,16 +26,22 @@ public class ResourceHelperFactory {
private static final Logger LOGGER =
LoggerFactory.getLogger(ResourceHelperFactory.class);
- private static final boolean IS_HDFS = (Boolean)
BmlServerConfiguration.BML_IS_HDFS().getValue();
+ private static final String FILESYSTEM_TYPE =
+ BmlServerConfiguration.BML_FILESYSTEM_TYPE().getValue();
private static final ResourceHelper HDFS_RESOURCE_HELPER = new
HdfsResourceHelper();
private static final ResourceHelper LOCAL_RESOURCE_HELPER = new
LocalResourceHelper();
+ private static final ResourceHelper S3_RESOURCE_HELPER = new
S3ResourceHelper();
+
public static ResourceHelper getResourceHelper() {
- if (IS_HDFS) {
+ if (FILESYSTEM_TYPE.equals("hdfs")) {
LOGGER.info("will store resource in hdfs");
return HDFS_RESOURCE_HELPER;
+ } else if (FILESYSTEM_TYPE.equals("s3")) {
+ LOGGER.info("will store resource in s3");
+ return S3_RESOURCE_HELPER;
} else {
LOGGER.info("will store resource in local");
return LOCAL_RESOURCE_HELPER;
diff --git
a/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/S3ResourceHelper.java
b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/S3ResourceHelper.java
new file mode 100644
index 0000000000..5370da9351
--- /dev/null
+++
b/linkis-public-enhancements/linkis-bml-server/src/main/java/org/apache/linkis/bml/common/S3ResourceHelper.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.linkis.bml.common;
+
+import org.apache.linkis.bml.conf.BmlServerConfiguration;
+import org.apache.linkis.common.io.Fs;
+import org.apache.linkis.common.io.FsPath;
+import org.apache.linkis.storage.FSFactory;
+import org.apache.linkis.storage.utils.FileSystemUtils;
+
+import org.apache.commons.codec.binary.Hex;
+import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.security.MessageDigest;
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.Map;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class S3ResourceHelper implements ResourceHelper {
+
+ private static final Logger logger =
LoggerFactory.getLogger(S3ResourceHelper.class);
+
+ private static final String SCHEMA = "s3://";
+
+ @Override
+ public long upload(
+ String path,
+ String user,
+ InputStream inputStream,
+ StringBuilder stringBuilder,
+ boolean overwrite)
+ throws UploadResourceException {
+ OutputStream outputStream = null;
+ InputStream is0 = null;
+ InputStream is1 = null;
+ long size = 0;
+ Fs fileSystem = null;
+ try {
+ FsPath fsPath = new FsPath(path);
+ fileSystem = FSFactory.getFsByProxyUser(fsPath, user);
+ fileSystem.init(null);
+ if (!fileSystem.exists(fsPath)) {
+ FileSystemUtils.createNewFile(fsPath, user, true);
+ }
+ byte[] buffer = new byte[1024];
+ long beforeSize = -1;
+ is0 = fileSystem.read(fsPath);
+ int ch0 = 0;
+ while ((ch0 = is0.read(buffer)) != -1) {
+ beforeSize += ch0;
+ }
+ outputStream = fileSystem.write(fsPath, overwrite);
+ int ch = 0;
+ MessageDigest md5Digest = DigestUtils.getMd5Digest();
+ while ((ch = inputStream.read(buffer)) != -1) {
+ md5Digest.update(buffer, 0, ch);
+ outputStream.write(buffer, 0, ch);
+ size += ch;
+ }
+ if (stringBuilder != null) {
+ stringBuilder.append(Hex.encodeHexString(md5Digest.digest()));
+ }
+ // 通过文件名获取的文件所有的字节,这样就避免了错误更新后的更新都是错的
+ long afterSize = -1;
+ is1 = fileSystem.read(fsPath);
+ int ch1 = 0;
+ while ((ch1 = is1.read(buffer)) != -1) {
+ afterSize += ch1;
+ }
+ size = Math.max(size, afterSize - beforeSize);
+ } catch (final IOException e) {
+ logger.error("{} write to {} failed, reason is, IOException:", user,
path, e);
+ UploadResourceException uploadResourceException = new
UploadResourceException();
+ uploadResourceException.initCause(e);
+ throw uploadResourceException;
+ } catch (final Throwable t) {
+ logger.error("{} write to {} failed, reason is", user, path, t);
+ UploadResourceException uploadResourceException = new
UploadResourceException();
+ uploadResourceException.initCause(t);
+ throw uploadResourceException;
+ } finally {
+ IOUtils.closeQuietly(outputStream);
+ IOUtils.closeQuietly(inputStream);
+ IOUtils.closeQuietly(is0);
+ IOUtils.closeQuietly(is1);
+ if (fileSystem != null) {
+ try {
+ fileSystem.close();
+ } catch (Exception e) {
+ logger.error("close filesystem failed", e);
+ }
+ }
+ }
+ return size;
+ }
+
+ @Override
+ public void update(String path) {}
+
+ @Override
+ public void getResource(String path, int start, int end) {}
+
+ @Override
+ public String getSchema() {
+ return SCHEMA;
+ }
+
+ /**
+ * Motivation to modify this path: This path is under /apps-data/hadoop/user
on hdfs, which is
+ * mixed with the result set file. Bml files cannot be synchronized
separately. When the user
+ * verifies the workflow, it is necessary to synchronize the full amount of
personal and full hdfs
+ * data each time, which is very inconvenient.
+ */
+ @Override
+ public String generatePath(String user, String fileName, Map<String, Object>
properties) {
+ String resourceHeader = (String) properties.get("resourceHeader");
+ SimpleDateFormat format = new SimpleDateFormat("yyyyMMdd");
+ String dateStr = format.format(new Date());
+ if (StringUtils.isNotEmpty(resourceHeader)) {
+ return getSchema()
+ + BmlServerConfiguration.BML_PREFIX().getValue()
+ + "/"
+ + user
+ + "/bml"
+ + "/"
+ + dateStr
+ + "/"
+ + resourceHeader
+ + "/"
+ + fileName;
+ } else {
+ return getSchema()
+ + BmlServerConfiguration.BML_PREFIX().getValue()
+ + "/"
+ + user
+ + "/bml"
+ + "/"
+ + dateStr
+ + "/"
+ + fileName;
+ }
+ }
+
+ @Override
+ public boolean checkIfExists(String path, String user) throws IOException {
+ Fs fileSystem = FSFactory.getFsByProxyUser(new FsPath(path), user);
+ fileSystem.init(null);
+ try {
+ return fileSystem.exists(new FsPath(path));
+ } finally {
+ fileSystem.close();
+ }
+ }
+
+ @Override
+ public boolean checkBmlResourceStoragePrefixPathIfChanged(String path) {
+ String prefixPath = getSchema() +
BmlServerConfiguration.BML_PREFIX().getValue();
+ return !path.startsWith(prefixPath);
+ }
+}
diff --git
a/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala
b/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala
index 7fde694ef6..3b2a0d112a 100644
---
a/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala
+++
b/linkis-public-enhancements/linkis-bml-server/src/main/scala/org/apache/linkis/bml/conf/BmlServerConfiguration.scala
@@ -29,6 +29,35 @@ object BmlServerConfiguration {
val BML_IS_HDFS: CommonVars[Boolean] =
CommonVars[Boolean]("wds.linkis.bml.is.hdfs", true)
+ /**
+ * BML_FILESYSTEM_TYPE: 用于区分BML的文件系统类型,目前支持hdfs、file、s3三种类型
默认值通过BML_IS_HDFS判断,以便版本向下兼容
+ * BML_FILESYSTEM_TYPE: Used to distinguish the type of file system for BML.
Currently, it
+ * supports three types: hdfs, file, and s3. The default value is determined
by BML_IS_HDFS to
+ * ensure backward compatibility with previous versions.
+ */
+ val BML_FILESYSTEM_TYPE = CommonVars(
+ "linkis.bml.filesystem.type",
+ if (BML_IS_HDFS.getValue) {
+ "hdfs"
+ } else {
+ "file"
+ }
+ )
+
+ /**
+ * BML_PREFIX: BML的文件系统前缀 默认值通过BML_IS_HDFS判断,以便版本向下兼容 BML_PREFIX: The file
system prefix for BML.
+ * The default value is determined based on BML_IS_HDFS to ensure backward
compatibility with
+ * previous versions.
+ */
+ val BML_PREFIX = CommonVars(
+ "linkis.bml.prefix",
+ if (BML_IS_HDFS.getValue) {
+ BML_HDFS_PREFIX.getValue
+ } else {
+ BML_LOCAL_PREFIX.getValue
+ }
+ )
+
val BML_CLEAN_EXPIRED_TIME: CommonVars[Int] =
CommonVars[Int]("wds.linkis.bml.cleanExpired.time", 100)
diff --git
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
index 28dc65dcc5..1e3ce7599c 100644
---
a/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
+++
b/linkis-public-enhancements/linkis-pes-publicservice/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
@@ -148,6 +148,9 @@ public class FsRestfulApi {
if (StorageUtils.HDFS().equalsIgnoreCase(pathType)) {
path = hdfsUserRootPathPrefix + userName + hdfsUserRootPathSuffix;
returnType = StorageUtils.HDFS().toUpperCase();
+ } else if (StorageUtils.S3().equalsIgnoreCase(pathType)) {
+ path = localUserRootPath + userName;
+ returnType = StorageUtils.S3().toUpperCase();
} else {
path = localUserRootPath + userName;
returnType = LOCAL_RETURN_TYPE;
@@ -1490,6 +1493,7 @@ public class FsRestfulApi {
// 返回成功消息并包含文件地址
return Message.ok().data("filePath", newPath);
}
+
/**
* *
*
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]