This is an automated email from the ASF dual-hosted git repository.
aloyszhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 8044a40b83 [INLONG-11382][SDK] Optimize all columns select of
Transform SDK (#11383)
8044a40b83 is described below
commit 8044a40b836f5fa0a23099a212ad8a1a447d2b31
Author: vernedeng <[email protected]>
AuthorDate: Tue Oct 22 09:37:16 2024 +0800
[INLONG-11382][SDK] Optimize all columns select of Transform SDK (#11383)
---
.../sdk/transform/decode/AvroSourceDecoder.java | 2 +-
.../sdk/transform/decode/BsonSourceDecoder.java | 2 +-
.../sdk/transform/decode/CsvSourceDecoder.java | 6 ++----
.../sdk/transform/decode/JsonSourceDecoder.java | 2 +-
.../sdk/transform/decode/KvSourceDecoder.java | 6 ++----
.../sdk/transform/decode/ParquetSourceDecoder.java | 2 +-
.../sdk/transform/decode/PbSourceDecoder.java | 2 +-
.../inlong/sdk/transform/decode/SourceDecoder.java | 23 +++++++++++++++++++---
.../sdk/transform/decode/XmlSourceDecoder.java | 2 +-
.../sdk/transform/decode/YamlSourceDecoder.java | 2 +-
.../sdk/transform/encode/CsvSinkEncoder.java | 13 ++----------
.../inlong/sdk/transform/encode/KvSinkEncoder.java | 14 ++-----------
.../sdk/transform/encode/MapSinkEncoder.java | 11 +++--------
.../sdk/transform/encode/ParquetSinkEncoder.java | 12 ++---------
.../inlong/sdk/transform/encode/PbSinkEncoder.java | 10 +++-------
.../inlong/sdk/transform/encode/SinkEncoder.java | 18 ++++++++++++++---
.../inlong/sdk/transform/pojo/FieldInfo.java | 4 ++++
.../sdk/transform/process/TransformProcessor.java | 13 ++++++------
.../process/processor/TestCsv2StarProcessor.java | 2 +-
19 files changed, 69 insertions(+), 77 deletions(-)
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
index 0f71f28209..992dea88c6 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/AvroSourceDecoder.java
@@ -36,7 +36,7 @@ import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
-public class AvroSourceDecoder implements SourceDecoder<byte[]> {
+public class AvroSourceDecoder extends SourceDecoder<byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(AvroSourceDecoder.class);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java
index 880467ea43..6dcde13ade 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/BsonSourceDecoder.java
@@ -32,7 +32,7 @@ import java.math.BigDecimal;
* BsonSourceDecoder
*/
@Slf4j
-public class BsonSourceDecoder implements SourceDecoder<byte[]> {
+public class BsonSourceDecoder extends SourceDecoder<byte[]> {
private final JsonSourceDecoder decoder;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
index 2ff65b26d9..830d5c2c01 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/CsvSourceDecoder.java
@@ -25,21 +25,20 @@ import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import java.nio.charset.Charset;
-import java.util.List;
/**
* CsvSourceDecoder
*
*/
-public class CsvSourceDecoder implements SourceDecoder<String> {
+public class CsvSourceDecoder extends SourceDecoder<String> {
protected CsvSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
private Character delimiter = '|';
private Character escapeChar = null;
- private List<FieldInfo> fields;
public CsvSourceDecoder(CsvSourceInfo sourceInfo) {
+ super(sourceInfo.getFields());
this.sourceInfo = sourceInfo;
if (sourceInfo.getDelimiter() != null) {
this.delimiter = sourceInfo.getDelimiter();
@@ -50,7 +49,6 @@ public class CsvSourceDecoder implements
SourceDecoder<String> {
if (!StringUtils.isBlank(sourceInfo.getCharset())) {
this.srcCharset = Charset.forName(sourceInfo.getCharset());
}
- this.fields = sourceInfo.getFields();
}
@Override
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
index ec67d95d9e..0332094090 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/JsonSourceDecoder.java
@@ -35,7 +35,7 @@ import java.util.List;
* JsonSourceDecoder
*
*/
-public class JsonSourceDecoder implements SourceDecoder<String> {
+public class JsonSourceDecoder extends SourceDecoder<String> {
protected JsonSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
index 2bb1c366b0..e62fcccc37 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/KvSourceDecoder.java
@@ -32,7 +32,7 @@ import java.util.Map;
* KvSourceDecoder
*
*/
-public class KvSourceDecoder implements SourceDecoder<String> {
+public class KvSourceDecoder extends SourceDecoder<String> {
protected KvSourceInfo sourceInfo;
private Character entryDelimiter = '&';
@@ -41,9 +41,9 @@ public class KvSourceDecoder implements SourceDecoder<String>
{
private Character quoteChar = '\"';
private Character lineDelimiter = '\n';
private Charset srcCharset = Charset.defaultCharset();
- private List<FieldInfo> fields;
public KvSourceDecoder(KvSourceInfo sourceInfo) {
+ super(sourceInfo.getFields());
this.sourceInfo = sourceInfo;
if (!StringUtils.isBlank(sourceInfo.getCharset())) {
this.srcCharset = Charset.forName(sourceInfo.getCharset());
@@ -63,8 +63,6 @@ public class KvSourceDecoder implements SourceDecoder<String>
{
if (sourceInfo.getLineDelimiter() != null) {
this.lineDelimiter = sourceInfo.getLineDelimiter();
}
-
- this.fields = sourceInfo.getFields();
}
@Override
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
index 11312370eb..85e3ba319d 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/ParquetSourceDecoder.java
@@ -41,7 +41,7 @@ import java.nio.charset.Charset;
/**
* PbSourceDecoder
*/
-public class ParquetSourceDecoder implements SourceDecoder<byte[]> {
+public class ParquetSourceDecoder extends SourceDecoder<byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(ParquetSourceDecoder.class);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
index 48f3749c45..4ee704bd0e 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/PbSourceDecoder.java
@@ -39,7 +39,7 @@ import java.util.concurrent.ConcurrentHashMap;
* PbSourceDecoder
*
*/
-public class PbSourceDecoder implements SourceDecoder<byte[]> {
+public class PbSourceDecoder extends SourceDecoder<byte[]> {
private static final Logger LOG =
LoggerFactory.getLogger(PbSourceDecoder.class);
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
index 2e410d24c3..26bbbbbaac 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/SourceDecoder.java
@@ -17,15 +17,32 @@
package org.apache.inlong.sdk.transform.decode;
+import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.process.Context;
+import com.google.common.collect.ImmutableList;
+import lombok.Getter;
+
+import java.util.List;
+
/**
* SourceDecoder
*/
-public interface SourceDecoder<Input> {
+@Getter
+public abstract class SourceDecoder<Input> {
+
+ protected final List<FieldInfo> fields;
+
+ public SourceDecoder() {
+ this(ImmutableList.of());
+ }
+
+ public SourceDecoder(List<FieldInfo> fields) {
+ this.fields = fields;
+ }
- SourceData decode(byte[] srcBytes, Context context);
+ public abstract SourceData decode(byte[] srcBytes, Context context);
- SourceData decode(Input input, Context context);
+ public abstract SourceData decode(Input input, Context context);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
index 9f86c16eff..a9a6bb9a66 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/XmlSourceDecoder.java
@@ -37,7 +37,7 @@ import java.util.Map;
* XmlSourceDecoder
*/
@Slf4j
-public class XmlSourceDecoder implements SourceDecoder<String> {
+public class XmlSourceDecoder extends SourceDecoder<String> {
protected XmlSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java
index b7a2ba915f..5be1a0a93f 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/decode/YamlSourceDecoder.java
@@ -32,7 +32,7 @@ import java.util.List;
import java.util.Map;
@Slf4j
-public class YamlSourceDecoder implements SourceDecoder<String> {
+public class YamlSourceDecoder extends SourceDecoder<String> {
protected YamlSourceInfo sourceInfo;
private Charset srcCharset = Charset.defaultCharset();
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
index 89f6f364a0..97b43c02b9 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/CsvSinkEncoder.java
@@ -24,21 +24,20 @@ import org.apache.inlong.sdk.transform.process.Context;
import org.apache.commons.lang3.StringUtils;
import java.nio.charset.Charset;
-import java.util.List;
/**
* CsvSinkEncoder
*/
-public class CsvSinkEncoder implements SinkEncoder<String> {
+public class CsvSinkEncoder extends SinkEncoder<String> {
protected CsvSinkInfo sinkInfo;
protected Charset sinkCharset = Charset.defaultCharset();
private Character delimiter = '|';
private Character escapeChar = null;
- private List<FieldInfo> fields;
private StringBuilder builder = new StringBuilder();
public CsvSinkEncoder(CsvSinkInfo sinkInfo) {
+ super(sinkInfo.getFields());
this.sinkInfo = sinkInfo;
if (sinkInfo.getDelimiter() != null) {
this.delimiter = sinkInfo.getDelimiter();
@@ -49,7 +48,6 @@ public class CsvSinkEncoder implements SinkEncoder<String> {
if (!StringUtils.isBlank(sinkInfo.getCharset())) {
this.sinkCharset = Charset.forName(sinkInfo.getCharset());
}
- this.fields = sinkInfo.getFields();
}
/**
@@ -89,11 +87,4 @@ public class CsvSinkEncoder implements SinkEncoder<String> {
return builder.substring(0, builder.length() - 1);
}
- /**
- * get fields
- * @return the fields
- */
- public List<FieldInfo> getFields() {
- return fields;
- }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
index 2822374c41..094f4d6884 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/KvSinkEncoder.java
@@ -24,21 +24,20 @@ import org.apache.inlong.sdk.transform.process.Context;
import org.apache.commons.lang3.StringUtils;
import java.nio.charset.Charset;
-import java.util.List;
/**
* KvSinkEncoder
*/
-public class KvSinkEncoder implements SinkEncoder<String> {
+public class KvSinkEncoder extends SinkEncoder<String> {
protected KvSinkInfo sinkInfo;
protected Charset sinkCharset = Charset.defaultCharset();
private Character entryDelimiter = '&';
private Character kvDelimiter = '=';
- private List<FieldInfo> fields;
private StringBuilder builder = new StringBuilder();
public KvSinkEncoder(KvSinkInfo sinkInfo) {
+ super(sinkInfo.getFields());
this.sinkInfo = sinkInfo;
if (!StringUtils.isBlank(sinkInfo.getCharset())) {
this.sinkCharset = Charset.forName(sinkInfo.getCharset());
@@ -49,7 +48,6 @@ public class KvSinkEncoder implements SinkEncoder<String> {
if (sinkInfo.getKvDelimiter() != null) {
this.kvDelimiter = sinkInfo.getKvDelimiter();
}
- this.fields = sinkInfo.getFields();
}
/**
@@ -78,12 +76,4 @@ public class KvSinkEncoder implements SinkEncoder<String> {
}
return builder.substring(0, builder.length() - 1);
}
-
- /**
- * get fields
- * @return the fields
- */
- public List<FieldInfo> getFields() {
- return fields;
- }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
index 0b05ce6076..c76c4e80ff 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/MapSinkEncoder.java
@@ -25,17 +25,17 @@ import
org.apache.inlong.sdk.transform.process.converter.TypeConverter;
import lombok.extern.slf4j.Slf4j;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
@Slf4j
-public class MapSinkEncoder implements SinkEncoder<Map<String, Object>> {
+public class MapSinkEncoder extends SinkEncoder<Map<String, Object>> {
private final MapSinkInfo sinkInfo;
private final Map<String, TypeConverter> converters;
public MapSinkEncoder(MapSinkInfo sinkInfo) {
+ super(sinkInfo.getFields());
this.sinkInfo = sinkInfo;
this.converters = sinkInfo.getFields()
.stream()
@@ -47,7 +47,7 @@ public class MapSinkEncoder implements
SinkEncoder<Map<String, Object>> {
@Override
public Map<String, Object> encode(SinkData sinkData, Context context) {
Map<String, Object> esMap = new HashMap<>();
- for (FieldInfo fieldInfo : sinkInfo.getFields()) {
+ for (FieldInfo fieldInfo : fields) {
String fieldName = fieldInfo.getName();
String strValue = sinkData.getField(fieldName);
TypeConverter converter = converters.get(fieldName);
@@ -65,9 +65,4 @@ public class MapSinkEncoder implements
SinkEncoder<Map<String, Object>> {
return esMap;
}
-
- @Override
- public List<FieldInfo> getFields() {
- return sinkInfo.getFields();
- }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
index 168d7d0c44..6d377b061c 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/ParquetSinkEncoder.java
@@ -28,7 +28,6 @@ import org.apache.parquet.schema.Types;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
@@ -38,17 +37,14 @@ import static
org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY;
/**
* ParquetSinkEncoder
*/
-public class ParquetSinkEncoder implements SinkEncoder<ByteArrayOutputStream> {
+public class ParquetSinkEncoder extends SinkEncoder<ByteArrayOutputStream> {
protected ParquetSinkInfo sinkInfo;
- protected Charset sinkCharset = Charset.defaultCharset();
-
- private final List<FieldInfo> fields;
private ParquetByteArrayWriter<Object[]> writer;
public ParquetSinkEncoder(ParquetSinkInfo sinkInfo) {
+ super(sinkInfo.getFields());
this.sinkInfo = sinkInfo;
- this.fields = sinkInfo.getFields();
ArrayList<Type> typesList = new ArrayList<>();
for (FieldInfo fieldInfo : this.fields) {
typesList.add(Types.required(BINARY)
@@ -88,10 +84,6 @@ public class ParquetSinkEncoder implements
SinkEncoder<ByteArrayOutputStream> {
return writer.getByteArrayOutputStream();
}
- @Override
- public List<FieldInfo> getFields() {
- return this.fields;
- }
public byte[] mergeByteArray(List<ByteArrayOutputStream> list) {
if (list.isEmpty()) {
return null;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
index 367152a3ae..226405c515 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/PbSinkEncoder.java
@@ -28,10 +28,9 @@ import com.google.protobuf.DynamicMessage;
import java.util.Base64;
import java.util.HashMap;
-import java.util.List;
import java.util.Map;
-public class PbSinkEncoder implements SinkEncoder<byte[]> {
+public class PbSinkEncoder extends SinkEncoder<byte[]> {
protected PbSinkInfo sinkInfo;
@@ -40,9 +39,10 @@ public class PbSinkEncoder implements SinkEncoder<byte[]> {
private final Map<String, Descriptors.FieldDescriptor.Type> fieldTypes;
public PbSinkEncoder(PbSinkInfo pbSinkInfo) {
+ super(pbSinkInfo.getFields());
this.sinkInfo = pbSinkInfo;
this.fieldTypes = new HashMap<>();
- for (FieldInfo field : pbSinkInfo.getFields()) {
+ for (FieldInfo field : this.fields) {
fieldTypes.put(field.getName(),
Descriptors.FieldDescriptor.Type.STRING);
}
// decode protoDescription
@@ -108,8 +108,4 @@ public class PbSinkEncoder implements SinkEncoder<byte[]> {
}
}
- @Override
- public List<FieldInfo> getFields() {
- return sinkInfo.getFields();
- }
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
index a63f970295..bd804cd771 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/encode/SinkEncoder.java
@@ -20,16 +20,28 @@ package org.apache.inlong.sdk.transform.encode;
import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.process.Context;
+import com.google.common.collect.ImmutableList;
+import lombok.Getter;
+
import java.util.List;
/**
* SinkEncoder
*/
-public interface SinkEncoder<Output> {
+@Getter
+public abstract class SinkEncoder<Output> {
public static final String ALL_SOURCE_FIELD_SIGN = "*";
- Output encode(SinkData sinkData, Context context);
+ protected final List<FieldInfo> fields;
+
+ public SinkEncoder() {
+ this(ImmutableList.of());
+ }
+
+ public SinkEncoder(List<FieldInfo> fields) {
+ this.fields = fields;
+ }
- List<FieldInfo> getFields();
+ public abstract Output encode(SinkData sinkData, Context context);
}
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
index 2a7834112a..fe08d00bb1 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/pojo/FieldInfo.java
@@ -34,6 +34,10 @@ public class FieldInfo {
}
+ public FieldInfo(String name) {
+ this(name, TypeConverter.DefaultTypeConverter());
+ }
+
public FieldInfo(String name, TypeConverter converter) {
this.name = name;
this.converter = converter;
diff --git
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
index 6d650cfd4a..6cfd3cd5f7 100644
---
a/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
+++
b/inlong-sdk/transform-sdk/src/main/java/org/apache/inlong/sdk/transform/process/TransformProcessor.java
@@ -25,19 +25,19 @@ import org.apache.inlong.sdk.transform.pojo.FieldInfo;
import org.apache.inlong.sdk.transform.pojo.TransformConfig;
import org.apache.inlong.sdk.transform.process.operator.ExpressionOperator;
import org.apache.inlong.sdk.transform.process.operator.OperatorTools;
+import org.apache.inlong.sdk.transform.process.parser.ColumnParser;
import org.apache.inlong.sdk.transform.process.parser.ValueParser;
import com.google.common.collect.ImmutableMap;
import net.sf.jsqlparser.JSQLParserException;
import net.sf.jsqlparser.parser.CCJSqlParserManager;
+import net.sf.jsqlparser.schema.Column;
import net.sf.jsqlparser.statement.select.AllColumns;
import net.sf.jsqlparser.statement.select.PlainSelect;
import net.sf.jsqlparser.statement.select.Select;
import net.sf.jsqlparser.statement.select.SelectExpressionItem;
import net.sf.jsqlparser.statement.select.SelectItem;
import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.StringReader;
import java.util.ArrayList;
@@ -50,8 +50,6 @@ import java.util.Map;
*/
public class TransformProcessor<I, O> {
- private static final Logger LOG =
LoggerFactory.getLogger(TransformProcessor.class);
-
private static final Map<String, Object> EMPTY_EXT_PARAMS =
ImmutableMap.of();
private static final String DUMMY_SELECT = "select *";
@@ -127,9 +125,10 @@ public class TransformProcessor<I, O> {
this.selectItems
.add(new ValueParserNode(fieldName,
OperatorTools.buildParser(exprItem.getExpression())));
} else if (item instanceof AllColumns) {
- fieldName = item.toString();
- this.encoder.getFields().clear();
- this.selectItems.add(new ValueParserNode(fieldName, null));
+ for (FieldInfo fieldInfo : decoder.getFields()) {
+ String name = fieldInfo.getName();
+ this.selectItems.add(new ValueParserNode(name, new
ColumnParser(new Column(name))));
+ }
}
}
}
diff --git
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2StarProcessor.java
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2StarProcessor.java
index 9fdbf0b5d0..bcd1cfe11a 100644
---
a/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2StarProcessor.java
+++
b/inlong-sdk/transform-sdk/src/test/java/org/apache/inlong/sdk/transform/process/processor/TestCsv2StarProcessor.java
@@ -76,6 +76,6 @@ public class TestCsv2StarProcessor extends
AbstractProcessorTestBase {
List<String> output4 = processor4.transform("2024-04-28 00:00:00|nok",
new HashMap<>());
Assert.assertEquals(1, output4.size());
- Assert.assertEquals(output4.get(0), "2024-04-28
00:00:00|nok|nok|2024-04-28 00:00:00");
+ Assert.assertEquals(output4.get(0), "nok|2024-04-28 00:00:00");
}
}