This is an automated email from the ASF dual-hosted git repository.
zirui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new e9cbc3671f [INLONG-10370][Manager] Support configuration of kV data
format (#10371)
e9cbc3671f is described below
commit e9cbc3671fc3ecad4b6ca70e628015d4fbb1e03b
Author: fuweng11 <[email protected]>
AuthorDate: Fri Jun 7 10:08:23 2024 +0800
[INLONG-10370][Manager] Support configuration of kV data format (#10371)
---
.../org/apache/inlong/common/enums/DataTypeEnum.java | 1 +
.../org/apache/inlong/common/util/StringUtil.java | 9 +++++++++
.../pojo/sort/node/base/ExtractNodeProvider.java | 20 ++++++++++++++------
.../pojo/sort/node/provider/KafkaProvider.java | 2 ++
.../pojo/sort/node/provider/PulsarProvider.java | 2 ++
.../pojo/sort/node/provider/TubeMqProvider.java | 2 ++
.../manager/pojo/source/kafka/KafkaSource.java | 3 +++
.../manager/pojo/source/kafka/KafkaSourceDTO.java | 8 ++++++--
.../pojo/source/kafka/KafkaSourceRequest.java | 9 +++++----
.../manager/pojo/source/pulsar/PulsarSource.java | 3 +++
.../manager/pojo/source/pulsar/PulsarSourceDTO.java | 8 ++++++--
.../pojo/source/pulsar/PulsarSourceRequest.java | 9 +++++----
.../manager/pojo/source/tubemq/TubeMQSource.java | 9 +++++++++
.../manager/pojo/source/tubemq/TubeMQSourceDTO.java | 12 +++++++++++-
.../pojo/source/tubemq/TubeMQSourceRequest.java | 9 +++++++++
15 files changed, 87 insertions(+), 19 deletions(-)
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
index 385c5ed71d..e0dc5c1251 100644
---
a/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
+++
b/inlong-common/src/main/java/org/apache/inlong/common/enums/DataTypeEnum.java
@@ -25,6 +25,7 @@ import java.util.Locale;
public enum DataTypeEnum {
CSV("csv"),
+ KV("kv"),
AVRO("avro"),
JSON("json"),
CANAL("canal"),
diff --git
a/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java
b/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java
index cddef9dc8f..7df1eb6975 100644
--- a/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java
+++ b/inlong-common/src/main/java/org/apache/inlong/common/util/StringUtil.java
@@ -17,6 +17,8 @@
package org.apache.inlong.common.util;
+import org.apache.commons.lang3.StringUtils;
+
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
@@ -274,4 +276,11 @@ public class StringUtil {
}
}
+ public static String parseChar(String charStr) {
+ if (StringUtils.isNumeric(charStr)) {
+ char numberChar = (char) Integer.parseInt(charStr);
+ charStr = Character.toString(numberChar);
+ }
+ return charStr;
+ }
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
index 81b9763d79..52c09062d7 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/base/ExtractNodeProvider.java
@@ -19,6 +19,7 @@ package org.apache.inlong.manager.pojo.sort.node.base;
import org.apache.inlong.common.enums.DataTypeEnum;
import org.apache.inlong.common.enums.MessageWrapType;
+import org.apache.inlong.common.util.StringUtil;
import
org.apache.inlong.manager.common.fieldtype.strategy.FieldTypeMappingStrategy;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.stream.StreamField;
@@ -32,10 +33,10 @@ import
org.apache.inlong.sort.protocol.node.format.DebeziumJsonFormat;
import org.apache.inlong.sort.protocol.node.format.Format;
import org.apache.inlong.sort.protocol.node.format.InLongMsgFormat;
import org.apache.inlong.sort.protocol.node.format.JsonFormat;
+import org.apache.inlong.sort.protocol.node.format.KvFormat;
import org.apache.inlong.sort.protocol.node.format.RawFormat;
import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.lang3.StringUtils;
import java.util.List;
import java.util.Objects;
@@ -90,8 +91,10 @@ public interface ExtractNodeProvider extends NodeProvider {
* Parse format
*
* @param serializationType data serialization, support: csv, json, canal,
avro, etc
- * @param wrapWithInlongMsg whether wrap content with {@link
InLongMsgFormat}
+ * @param wrapType whether wrap content with {@link InLongMsgFormat}
* @param separatorStr the separator of data content
+ * @param kvSeparatorStr the kv separator
+ * @param escapeCharStr the escape char
* @param ignoreParseErrors whether ignore deserialization error data
* @return the format for serialized content
*/
@@ -99,15 +102,14 @@ public interface ExtractNodeProvider extends NodeProvider {
String serializationType,
String wrapType,
String separatorStr,
+ String kvSeparatorStr,
+ String escapeCharStr,
Boolean ignoreParseErrors) {
Format format;
DataTypeEnum dataType = DataTypeEnum.forType(serializationType);
switch (dataType) {
case CSV:
- if (StringUtils.isNumeric(separatorStr)) {
- char dataSeparator = (char) Integer.parseInt(separatorStr);
- separatorStr = Character.toString(dataSeparator);
- }
+ separatorStr = StringUtil.parseChar(separatorStr);
CsvFormat csvFormat = new CsvFormat(separatorStr);
csvFormat.setIgnoreParseErrors(ignoreParseErrors);
format = csvFormat;
@@ -131,6 +133,12 @@ public interface ExtractNodeProvider extends NodeProvider {
case RAW:
format = new RawFormat();
break;
+ case KV:
+ separatorStr = StringUtil.parseChar(separatorStr);
+ kvSeparatorStr = StringUtil.parseChar(kvSeparatorStr);
+ escapeCharStr = StringUtil.parseChar(escapeCharStr);
+ format = new KvFormat(separatorStr, kvSeparatorStr,
escapeCharStr, ignoreParseErrors, null, null, null);
+ break;
default:
throw new IllegalArgumentException(String.format("Unsupported
dataType=%s", dataType));
}
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
index 9e602c0293..e7ca76241e 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/KafkaProvider.java
@@ -68,6 +68,8 @@ public class KafkaProvider implements ExtractNodeProvider,
LoadNodeProvider {
kafkaSource.getSerializationType(),
kafkaSource.getWrapType(),
kafkaSource.getDataSeparator(),
+ kafkaSource.getKvSeparator(),
+ kafkaSource.getDataEscapeChar(),
kafkaSource.getIgnoreParseError());
KafkaScanStartupMode startupMode =
parseStartupMode(kafkaSource.getAutoOffsetReset());
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
index 1d8ace1480..9493f78f41 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/PulsarProvider.java
@@ -59,6 +59,8 @@ public class PulsarProvider implements ExtractNodeProvider {
Format format = parsingFormat(pulsarSource.getSerializationType(),
pulsarSource.getWrapType(),
pulsarSource.getDataSeparator(),
+ pulsarSource.getKvSeparator(),
+ pulsarSource.getDataEscapeChar(),
pulsarSource.getIgnoreParseError());
PulsarScanStartupMode startupMode =
PulsarScanStartupMode.forName(pulsarSource.getScanStartupMode());
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
index 6b80d4735e..d2553a76ab 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/sort/node/provider/TubeMqProvider.java
@@ -55,6 +55,8 @@ public class TubeMqProvider implements ExtractNodeProvider {
source.getSerializationType(),
source.getWrapType(),
source.getDataSeparator(),
+ source.getKvSeparator(),
+ source.getDataEscapeChar(),
source.getIgnoreParseError());
Map<String, String> properties =
parseProperties(source.getProperties());
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
index 061e9176af..5c8afdbf5c 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSource.java
@@ -90,6 +90,9 @@ public class KafkaSource extends StreamSource {
@ApiModelProperty(value = "Data separator")
private String dataSeparator;
+ @ApiModelProperty(value = "KV separator")
+ private String kvSeparator;
+
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
index 082b1adf7c..730f63f7b3 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceDTO.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
@@ -91,10 +92,13 @@ public class KafkaSourceDTO {
private String primaryKey;
@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
- private String dataEncoding;
+ private String dataEncoding = StandardCharsets.UTF_8.toString();
@ApiModelProperty(value = "Data separator")
- private String dataSeparator;
+ private String dataSeparator = String.valueOf((int) '|');
+
+ @ApiModelProperty(value = "KV separator")
+ private String kvSeparator;
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
index e9343253df..89da06d277 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/kafka/KafkaSourceRequest.java
@@ -27,8 +27,6 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import java.nio.charset.StandardCharsets;
-
/**
* Kafka source request
*/
@@ -80,10 +78,13 @@ public class KafkaSourceRequest extends SourceRequest {
private String primaryKey;
@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
- private String dataEncoding = StandardCharsets.UTF_8.toString();
+ private String dataEncoding;
@ApiModelProperty(value = "Data separator")
- private String dataSeparator = String.valueOf((int) '|');
+ private String dataSeparator;
+
+ @ApiModelProperty(value = "KV separator")
+ private String kvSeparator;
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
index 884100c988..a000e58147 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSource.java
@@ -73,6 +73,9 @@ public class PulsarSource extends StreamSource {
@ApiModelProperty(value = "Data separator")
private String dataSeparator;
+ @ApiModelProperty(value = "KV separator")
+ private String kvSeparator;
+
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
index 2f9c9b1ab1..5fb40984bc 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceDTO.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
/**
@@ -64,10 +65,13 @@ public class PulsarSourceDTO {
private String primaryKey;
@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
- private String dataEncoding;
+ private String dataEncoding = StandardCharsets.UTF_8.toString();
@ApiModelProperty(value = "Data separator")
- private String dataSeparator;
+ private String dataSeparator = String.valueOf((int) '|');
+
+ @ApiModelProperty(value = "KV separator")
+ private String kvSeparator;
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
index 0c6946bc6e..6e7be5125b 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/pulsar/PulsarSourceRequest.java
@@ -27,8 +27,6 @@ import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.ToString;
-import java.nio.charset.StandardCharsets;
-
/**
* Pulsar source request
*/
@@ -61,10 +59,13 @@ public class PulsarSourceRequest extends SourceRequest {
private String primaryKey;
@ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
- private String dataEncoding = StandardCharsets.UTF_8.toString();
+ private String dataEncoding;
@ApiModelProperty(value = "Data separator")
- private String dataSeparator = String.valueOf((int) '|');
+ private String dataSeparator;
+
+ @ApiModelProperty(value = "KV separator")
+ private String kvSeparator;
@ApiModelProperty(value = "Data field escape symbol")
private String dataEscapeChar;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
index a2539b4c98..786aef0259 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
@@ -59,9 +59,18 @@ public class TubeMQSource extends StreamSource {
@ApiModelProperty("Session key of the TubeMQ")
private String sessionKey;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding;
+
@ApiModelProperty(value = "Data separator")
private String dataSeparator;
+ @ApiModelProperty(value = "KV separator")
+ private String kvSeparator;
+
+ @ApiModelProperty(value = "Data field escape symbol")
+ private String dataEscapeChar;
+
/**
* The TubeMQ consumers use this streamId set to filter records reading
from server.
*/
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
index bae3b951bd..dbae1cbb0b 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
@@ -31,6 +31,7 @@ import org.apache.commons.lang3.StringUtils;
import javax.validation.constraints.NotNull;
+import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.TreeSet;
@@ -55,8 +56,17 @@ public class TubeMQSourceDTO {
@ApiModelProperty("Session key of the TubeMQ")
private String sessionKey;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding = StandardCharsets.UTF_8.toString();
+
@ApiModelProperty(value = "Data separator")
- private String dataSeparator;
+ private String dataSeparator = String.valueOf((int) '|');
+
+ @ApiModelProperty(value = "KV separator")
+ private String kvSeparator;
+
+ @ApiModelProperty(value = "Data field escape symbol")
+ private String dataEscapeChar;
@ApiModelProperty(value = "The message body wrap wrap type, including:
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
index 7ef7ca4c34..85ea5c10da 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
@@ -51,9 +51,18 @@ public class TubeMQSourceRequest extends SourceRequest {
@ApiModelProperty("Session key of the TubeMQ")
private String sessionKey;
+ @ApiModelProperty(value = "Data encoding format: UTF-8, GBK")
+ private String dataEncoding;
+
@ApiModelProperty(value = "Data separator")
private String dataSeparator;
+ @ApiModelProperty(value = "KV separator")
+ private String kvSeparator;
+
+ @ApiModelProperty(value = "Data field escape symbol")
+ private String dataEscapeChar;
+
@ApiModelProperty(value = "The message body wrap wrap type, including:
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
private String wrapType;