This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git


The following commit(s) were added to refs/heads/master by this push:
     new 64cdd4d1f [Feature] Enhanced FileSource to support more parameters 
feature (#4838)
64cdd4d1f is described below

commit 64cdd4d1fa0a6f22c76a7c7ec8d9c4902438e486
Author: 人生有如两个橘子 <[email protected]>
AuthorDate: Wed Aug 9 21:58:07 2023 +0800

    [Feature] Enhanced FileSource to support more parameters feature (#4838)
    
    * Enhanced FileSource to support more parameters feature
    
    * Delete param of `limitTotalLine` and it can be replaced by method of 
`fileInfo`
    
    * Remove redundant references
---
 .../linkis/storage/source/AbstractFileSource.java  | 13 +++++++
 .../apache/linkis/storage/source/FileSource.java   |  4 ++
 .../apache/linkis/storage/source/FileSplit.java    | 45 +++++++++++++++++++++-
 .../filesystem/conf/WorkSpaceConfiguration.java    |  7 ++++
 .../filesystem/restful/api/FsRestfulApi.java       | 12 +++++-
 5 files changed, 79 insertions(+), 2 deletions(-)

diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java
index f2eb65ed1..fc4e615b3 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/AbstractFileSource.java
@@ -108,4 +108,17 @@ public abstract class AbstractFileSource implements 
FileSource {
         .map(fileSplit -> fileSplit.getFileInfo(needToCountRowNumber))
         .toArray(Pair[]::new);
   }
+
+  @Override
+  public FileSource limitBytes(Long limitBytes) {
+    Arrays.stream(fileSplits).forEach(fileSplit -> 
fileSplit.setLimitBytes(limitBytes));
+    return this;
+  }
+
+  @Override
+  public FileSource limitColumnLength(int limitColumnLength) {
+    Arrays.stream(fileSplits)
+        .forEach(fileSplit -> 
fileSplit.setLimitColumnLength(limitColumnLength));
+    return this;
+  }
 }
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
index b7bcc8c84..cee72dfcd 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
@@ -80,6 +80,10 @@ public interface FileSource extends Closeable {
     return isResultSet(fsPath.getPath());
   }
 
+  FileSource limitBytes(Long limitBytes);
+
+  FileSource limitColumnLength(int limitColumnLength);
+
   /**
    * Currently only supports table multi-result sets
    *
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java
index a43b7feb0..3a6c05a54 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSplit.java
@@ -41,6 +41,8 @@ import org.apache.commons.math3.util.Pair;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.function.Consumer;
 import java.util.function.Function;
 import java.util.stream.Collectors;
@@ -60,6 +62,8 @@ public class FileSplit implements Closeable {
   protected Function<Record, Record> shuffler;
   private boolean pageTrigger = false;
   protected Map<String, String> params = new HashMap<>();
+  private long limitBytes = 0L;
+  private int limitColumnLength = 0;
 
   public FileSplit(FsReader<? extends MetaData, ? extends Record> fsReader) {
     this.fsReader = fsReader;
@@ -98,6 +102,14 @@ public class FileSplit implements Closeable {
     return totalLine;
   }
 
+  public void setLimitBytes(long limitBytes) {
+    this.limitBytes = limitBytes;
+  }
+
+  public void setLimitColumnLength(int limitColumnLength) {
+    this.limitColumnLength = limitColumnLength;
+  }
+
   public <M> M whileLoop(Function<MetaData, M> metaDataFunction, 
Consumer<Record> recordConsumer) {
     M m = null;
     try {
@@ -222,16 +234,47 @@ public class FileSplit implements Closeable {
 
   public Pair<Object, List<String[]>> collect() {
     List<String[]> recordList = new ArrayList<>();
+    final AtomicLong tmpBytes = new AtomicLong(0L);
+    final AtomicBoolean overFlag = new AtomicBoolean(false);
     Object metaData =
         whileLoop(
             collectMetaData -> collectMetaData(collectMetaData),
-            r -> recordList.add(collectRecord(r)));
+            r -> {
+              if (!overFlag.get()) {
+                String[] arr = collectRecord(r);
+                if (limitBytes > 0) {
+                  for (int i = 0; i < arr.length; i++) {
+                    tmpBytes.addAndGet(arr[i].getBytes().length);
+                    if (overFlag.get() || tmpBytes.get() > limitBytes) {
+                      overFlag.set(true);
+                      arr[i] = "";
+                    }
+                  }
+                  recordList.add(arr);
+                } else {
+                  recordList.add(arr);
+                }
+              }
+            });
     return new Pair<>(metaData, recordList);
   }
 
   public String[] collectRecord(Record record) {
     if (record instanceof TableRecord) {
       TableRecord tableRecord = (TableRecord) record;
+      if (limitColumnLength > 0) {
+        return Arrays.stream(tableRecord.row)
+            .map(
+                obj -> {
+                  String col = DataType.valueToString(obj);
+                  if (col.length() > limitColumnLength) {
+                    return col.substring(0, limitColumnLength);
+                  } else {
+                    return col;
+                  }
+                })
+            .toArray(String[]::new);
+      }
       return 
Arrays.stream(tableRecord.row).map(DataType::valueToString).toArray(String[]::new);
     } else if (record instanceof LineRecord) {
       LineRecord lineRecord = (LineRecord) record;
diff --git 
a/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/conf/WorkSpaceConfiguration.java
 
b/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/conf/WorkSpaceConfiguration.java
index b3eeba135..f7ea5086c 100644
--- 
a/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/conf/WorkSpaceConfiguration.java
+++ 
b/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/conf/WorkSpaceConfiguration.java
@@ -61,6 +61,13 @@ public class WorkSpaceConfiguration {
   public static final CommonVars<Boolean> ENABLE_USER_GROUP =
       CommonVars$.MODULE$.apply("linkis.os.user.group.enabled", true);
 
+  // default 63M
+  public static final CommonVars<Long> FILESYSTEM_LIMIT_BYTES =
+      CommonVars$.MODULE$.apply("linkis.filesystem.limit.bytes", 66060288L);
+
+  public static final CommonVars<Integer> FILESYSTEM_LIMIT_COLUMN_LENGTH =
+      CommonVars$.MODULE$.apply("linkis.filesystem.limit.column.length", 2000);
+
   public static final ExecutorService executorService =
       new ThreadPoolExecutor(
           FILESYSTEM_FS_THREAD_NUM.getValue(),
diff --git 
a/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
 
b/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
index 5a7316d15..ae7e19106 100644
--- 
a/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
+++ 
b/linkis-public-enhancements/linkis-script-dev/linkis-storage-script-dev-server/src/main/java/org/apache/linkis/filesystem/restful/api/FsRestfulApi.java
@@ -560,7 +560,9 @@ public class FsRestfulApi {
       @RequestParam(value = "path", required = false) String path,
       @RequestParam(value = "page", defaultValue = "1") Integer page,
       @RequestParam(value = "pageSize", defaultValue = "5000") Integer 
pageSize,
-      @RequestParam(value = "charset", defaultValue = "utf-8") String charset)
+      @RequestParam(value = "charset", defaultValue = "utf-8") String charset,
+      @RequestParam(value = "limitBytes", defaultValue = "0") Long limitBytes,
+      @RequestParam(value = "limitColumnLength", defaultValue = "0") Integer 
limitColumnLength)
       throws IOException, WorkSpaceException {
 
     Message message = Message.ok();
@@ -583,6 +585,14 @@ public class FsRestfulApi {
       if (FileSource.isResultSet(fsPath.getPath())) {
         fileSource = fileSource.page(page, pageSize);
       }
+      if (limitBytes > 0) {
+        fileSource = fileSource.limitBytes(Math.min(limitBytes, 
FILESYSTEM_LIMIT_BYTES.getValue()));
+      }
+      if (limitColumnLength > 0) {
+        fileSource =
+            fileSource.limitColumnLength(
+                Math.min(limitColumnLength, 
FILESYSTEM_LIMIT_COLUMN_LENGTH.getValue()));
+      }
       Pair<Object, List<String[]>> result = fileSource.collect()[0];
       IOUtils.closeQuietly(fileSource);
       message.data("metadata", result.getFirst()).data("fileContent", 
result.getSecond());


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to