This is an automated email from the ASF dual-hosted git repository.

peacewong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/linkis.git

commit dc8a7ad3c32c032ddbba0dacd48e9f5cdaaff6e0
Author: peacewong <[email protected]>
AuthorDate: Sun Oct 1 23:55:08 2023 +0800

    optimize storage module
---
 .../java/org/apache/linkis/common/io/FsPath.java   |  1 +
 .../linkis/storage/conf/LinkisStorageConf.java     |  2 +-
 .../linkis/storage/csv/StorageCSVWriter.java       | 15 +++++++-----
 .../org/apache/linkis/storage/domain/DataType.java | 15 ++++++++++--
 .../org/apache/linkis/storage/domain/Dolphin.java  | 11 +++++++++
 .../linkis/storage/exception/StorageErrorCode.java |  5 +++-
 .../storage/resultset/ResultSetReaderFactory.java  |  1 +
 .../storage/resultset/StorageResultSetReader.java  | 27 +++++++++-------------
 .../storage/resultset/table/TableRecord.java       |  9 --------
 .../resultset/table/TableResultDeserializer.java   |  6 ++---
 .../resultset/table/TableResultSerializer.java     |  2 +-
 .../apache/linkis/storage/source/FileSource.java   |  1 +
 .../linkis/storage/source/ResultsetFileSource.java | 14 +++++++++--
 13 files changed, 68 insertions(+), 41 deletions(-)

diff --git 
a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java
 
b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java
index b5e9af86f..a1c0839b2 100644
--- 
a/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java
+++ 
b/linkis-commons/linkis-common/src/main/java/org/apache/linkis/common/io/FsPath.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.file.FileSystems;
+import java.nio.file.Path;
 import java.nio.file.Paths;
 import java.nio.file.attribute.PosixFilePermission;
 import java.nio.file.attribute.PosixFilePermissions;
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java
index 66fafeacd..74950c15f 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/conf/LinkisStorageConf.java
@@ -39,7 +39,7 @@ public class LinkisStorageConf {
   public static final String FILE_TYPE =
       CommonVars.apply(
               "wds.linkis.storage.file.type",
-              
"dolphin,sql,scala,py,hql,python,out,log,text,sh,jdbc,ngql,psql,fql,tsql")
+              
"dolphin,sql,scala,py,hql,python,out,log,text,txt,sh,jdbc,ngql,psql,fql,tsql")
           .getValue();
 
   private static volatile String[] fileTypeArr = null;
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java
index ba4d877f3..d98be4033 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/csv/StorageCSVWriter.java
@@ -91,14 +91,17 @@ public class StorageCSVWriter extends CSVFsWriter {
 
   private String compact(String[] row) {
     String quotationMarks = "\"";
+    String dealNewlineSymbolMarks = "\n";
     StringBuilder rowBuilder = new StringBuilder();
     for (String value : row) {
-      String decoratedValue =
-          StringUtils.isBlank(value)
-              ? value
-              : quoteRetouchEnable
-                  ? quotationMarks + value.replaceAll(quotationMarks, "") + 
quotationMarks
-                  : value;
+      String decoratedValue = value;
+      if (StringUtils.isNotBlank(value)) {
+        if (quoteRetouchEnable) {
+          decoratedValue = quotationMarks + value.replaceAll(quotationMarks, 
"") + quotationMarks;
+        }
+        decoratedValue = decoratedValue.replaceAll(dealNewlineSymbolMarks, " 
");
+        logger.debug("decorateValue with input: {} output: {} ", value, 
decoratedValue);
+      }
       rowBuilder.append(decoratedValue).append(delimiter);
     }
     if (rowBuilder.length() > 0 && rowBuilder.toString().endsWith(delimiter)) {
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java
index ad9e0ee88..6808f693e 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/DataType.java
@@ -26,6 +26,8 @@ import java.util.regex.Pattern;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import static org.apache.linkis.storage.domain.Dolphin.LINKIS_NULL;
+
 public enum DataType {
   NullType("void", 0),
   StringType("string", 12),
@@ -63,7 +65,8 @@ public enum DataType {
   public static final String LOWCASE_NULL_VALUE = "null";
 
   // TODO Change to fine-grained regular expressions(改为精细化正则表达式)
-  public static final Pattern DECIMAL_REGEX = 
Pattern.compile("^decimal\\(\\d*\\,\\d*\\)");
+  public static final Pattern DECIMAL_REGEX =
+      Pattern.compile("^decimal\\(\\s*\\d*\\s*,\\s*\\d*\\s*\\)");
 
   public static final Pattern SHORT_REGEX = Pattern.compile("^short.*");
   public static final Pattern INT_REGEX = Pattern.compile("^int.*");
@@ -130,7 +133,11 @@ public enum DataType {
   }
 
   public static Object toValue(DataType dataType, String value) {
+
     Object result = null;
+    if (isLinkisNull(value)) {
+      return result;
+    }
     try {
       switch (dataType) {
         case NullType:
@@ -187,12 +194,16 @@ public enum DataType {
           result = value;
       }
     } catch (Exception e) {
-      logger.debug("Failed to " + value + " switch to dataType:", e);
+      logger.debug("Failed to {} switch to dataType:", value, e);
       result = value;
     }
     return result;
   }
 
+  public static boolean isLinkisNull(String value) {
+    return value == null || value.equals(LINKIS_NULL);
+  }
+
   public static boolean isNull(String value) {
     return value == null || value.equals(NULL_VALUE) || 
value.trim().equals("");
   }
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java
index b6badd284..35c71295e 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/domain/Dolphin.java
@@ -59,6 +59,9 @@ public class Dolphin {
   public static final String NULL = "NULL";
   public static final byte[] NULL_BYTES = 
"NULL".getBytes(Charset.forName("utf-8"));
 
+  public static final String LINKIS_NULL = "LINKIS_NULL";
+  public static final byte[] LINKIS_NULL_BYTES = 
LINKIS_NULL.getBytes(Charset.forName("utf-8"));
+
   public static final int INT_LEN = 10;
 
   public static final int FILE_EMPTY = 31;
@@ -79,6 +82,14 @@ public class Dolphin {
     return new String(bytes, start, len, Dolphin.CHAR_SET);
   }
 
+  public static String toStringValue(String value) {
+    if (LINKIS_NULL.equals(value)) {
+      return NULL;
+    } else {
+      return value;
+    }
+  }
+
   /**
    * Read an integer value that converts the array to a byte of length 10 bytes
    * 读取整数值,该值为将数组转换为10字节长度的byte
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java
index 3c82ceb52..fad0d83a1 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/exception/StorageErrorCode.java
@@ -20,7 +20,10 @@ package org.apache.linkis.storage.exception;
 public enum StorageErrorCode {
 
   /** */
-  FS_NOT_INIT(53001, "please init first(请先初始化)");
+  FS_NOT_INIT(53001, "please init first"),
+
+  INCONSISTENT_DATA(53001, "Inconsistent row data read,read %s,need rowLen 
%s"),
+  FS_OOM(53002, "OOM occurred while reading the file");
 
   StorageErrorCode(int errorCode, String message) {
     this.code = errorCode;
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java
index 749ec9a24..3047b715a 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/ResultSetReaderFactory.java
@@ -99,6 +99,7 @@ public class ResultSetReaderFactory {
       }
 
       Fs fs = FSFactory.getFs(resPath);
+      logger.info("Try to init Fs with path:{}", resPath.getPath());
       try {
         fs.init(null);
         InputStream read = fs.read(resPath);
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java
index 35f7483c8..23d65e061 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/StorageResultSetReader.java
@@ -33,7 +33,6 @@ import org.apache.commons.lang3.StringUtils;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.Arrays;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -53,7 +52,6 @@ public class StorageResultSetReader<K extends MetaData, V 
extends Record>
   private Fs fs;
 
   private final int READ_CACHE = 1024;
-  private final byte[] bytes = new byte[READ_CACHE];
 
   public StorageResultSetReader(ResultSet<K, V> resultSet, InputStream 
inputStream) {
     super(resultSet, inputStream);
@@ -84,21 +82,18 @@ public class StorageResultSetReader<K extends MetaData, V 
extends Record>
       return null;
     }
 
-    byte[] rowBuffer = new byte[0];
     int len = 0;
-
-    while (rowLen > 0 && len >= 0) {
-      if (rowLen > READ_CACHE) {
-        len = StorageUtils.readBytes(inputStream, bytes, READ_CACHE);
-      } else {
-        len = StorageUtils.readBytes(inputStream, bytes, rowLen);
-      }
-
-      if (len > 0) {
-        rowLen -= len;
-        rowBuffer = Arrays.copyOf(rowBuffer, rowBuffer.length + len);
-        System.arraycopy(bytes, 0, rowBuffer, rowBuffer.length - len, len);
-      }
+    byte[] rowBuffer = null;
+    try {
+      rowBuffer = new byte[rowLen];
+      len = StorageUtils.readBytes(inputStream, rowBuffer, READ_CACHE);
+    } catch (OutOfMemoryError error) {
+      logger.error("Result set read oom, read size {} Byte", rowLen);
+      throw new RuntimeException(error);
+    }
+    if (len != rowLen) {
+      throw new RuntimeException(
+          "Can't get the value of the field, maybe the IO stream has been read 
or has been closed!(拿不到字段的值,也许IO流已读取完毕或已被关闭!)");
     }
     rowCount++;
     return rowBuffer;
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java
index f13d42b0b..c8270714b 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableRecord.java
@@ -19,9 +19,6 @@ package org.apache.linkis.storage.resultset.table;
 
 import org.apache.linkis.common.io.Record;
 import org.apache.linkis.storage.resultset.ResultRecord;
-import org.apache.linkis.storage.utils.StorageUtils;
-
-import java.util.Arrays;
 
 public class TableRecord implements ResultRecord {
 
@@ -35,10 +32,4 @@ public class TableRecord implements ResultRecord {
   public Record cloneRecord() {
     return new TableRecord(row.clone());
   }
-
-  public String[] tableRecordToString(String nullValue) {
-    return Arrays.stream(row)
-        .map(col -> StorageUtils.colToString(col, nullValue))
-        .toArray(String[]::new);
-  }
 }
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java
index 7b7838fd2..7e1d6c35f 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultDeserializer.java
@@ -52,13 +52,13 @@ public class TableResultDeserializer extends 
ResultDeserializer<TableMetaData, T
     List<Column> columns = new ArrayList<>();
     for (int i = 0; i < colArray.length; i += 3) {
       int len = Integer.parseInt(colArray[i]);
-      String colName = Dolphin.getString(bytes, index, len);
+      String colName = Dolphin.toStringValue(Dolphin.getString(bytes, index, 
len));
       index += len;
       len = Integer.parseInt(colArray[i + 1]);
-      String colType = Dolphin.getString(bytes, index, len);
+      String colType = Dolphin.toStringValue(Dolphin.getString(bytes, index, 
len));
       index += len;
       len = Integer.parseInt(colArray[i + 2]);
-      String colComment = Dolphin.getString(bytes, index, len);
+      String colComment = Dolphin.toStringValue(Dolphin.getString(bytes, 
index, len));
       index += len;
       columns.add(new Column(colName, DataType.toDataType(colType), 
colComment));
     }
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java
index 6abe4c56d..5f40aa33f 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/resultset/table/TableResultSerializer.java
@@ -61,7 +61,7 @@ public class TableResultSerializer extends ResultSerializer {
     int colByteLen = 0;
     int length = 0;
     for (Object data : line) {
-      byte[] bytes = data == null ? Dolphin.NULL_BYTES : 
Dolphin.getBytes(data);
+      byte[] bytes = data == null ? Dolphin.LINKIS_NULL_BYTES : 
Dolphin.getBytes(data);
       dataBytes.add(bytes);
       byte[] colBytes = Dolphin.getBytes(bytes.length);
       colIndex.add(colBytes);
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
index cee72dfcd..0ed650186 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/FileSource.java
@@ -135,6 +135,7 @@ public interface FileSource extends Closeable {
   }
 
   static FileSplit createResultSetFileSplit(FsPath fsPath, InputStream is) {
+    logger.info("try create result set file split with path:{}", 
fsPath.getPath());
     ResultSet resultset = 
ResultSetFactory.getInstance().getResultSetByPath(fsPath);
     ResultSetReader resultsetReader = 
ResultSetReaderFactory.getResultSetReader(resultset, is);
     return new FileSplit(resultsetReader, resultset.resultSetType());
diff --git 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java
 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java
index d8562b7c5..fb064a8f4 100644
--- 
a/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java
+++ 
b/linkis-commons/linkis-storage/src/main/java/org/apache/linkis/storage/source/ResultsetFileSource.java
@@ -17,6 +17,7 @@
 
 package org.apache.linkis.storage.source;
 
+import org.apache.linkis.storage.domain.Dolphin;
 import org.apache.linkis.storage.resultset.table.TableRecord;
 import org.apache.linkis.storage.utils.StorageUtils;
 
@@ -36,9 +37,18 @@ public class ResultsetFileSource extends AbstractFileSource {
                     .map(
                         r -> {
                           if (r == null || r.equals("NULL")) {
-                            return nullValue;
+                            if (nullValue.equals(Dolphin.LINKIS_NULL)) {
+                              return r;
+                            } else {
+                              return nullValue;
+                            }
                           } else if (r.equals("")) {
-                            return getParams().getOrDefault("nullValue", "");
+                            String emptyValue = 
getParams().getOrDefault("nullValue", "");
+                            if (emptyValue.equals(Dolphin.LINKIS_NULL)) {
+                              return "";
+                            } else {
+                              return nullValue;
+                            }
                           } else if (r instanceof Double) {
                             return StorageUtils.doubleToString((Double) r);
                           } else {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to