This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new d9df926330 [INLONG-9622][Sort] Update all deserializationInfo in
sort-common module (#9623)
d9df926330 is described below
commit d9df926330ec22e78e4867b388a72ef491343361
Author: baomingyu <[email protected]>
AuthorDate: Mon Jan 29 14:21:25 2024 +0800
[INLONG-9622][Sort] Update all deserializationInfo in sort-common module
(#9623)
---
.../deserialization/CsvDeserializationInfo.java | 60 ++++++++++++++++++-
.../deserialization/DeserializationInfo.java | 1 +
.../InLongMsgCsv2DeserializationInfo.java | 44 +++++++++++++-
.../InLongMsgCsvDeserializationInfo.java | 42 +++++++++++++-
.../InLongMsgKvDeserializationInfo.java | 59 ++++++++++++++++++-
.../InLongMsgTlogCsvDeserializationInfo.java | 45 ++++++++++++++-
.../InLongMsgTlogKvDeserializationInfo.java | 48 +++++++++++++++-
.../deserialization/KvDeserializationInfo.java | 67 ++++++++++++++++++++--
.../org/apache/inlong/sort/formats/csv/Csv.java | 2 +-
.../apache/inlong/sort/formats/csv/CsvTest.java | 4 +-
.../formats/inlongmsgbinlog/InLongMsgBinlog.java | 2 +-
.../inlongmsgbinlog/InLongMsgBinlogTest.java | 2 +-
.../sort/formats/inlongmsgcsv/InLongMsgCsv.java | 2 +-
.../formats/inlongmsgcsv/InLongMsgCsvTest.java | 2 +-
.../sort/formats/inlongmsgkv/InLongMsgKv.java | 2 +-
.../InLongMsgKvMixedFormatConverter.java | 2 +-
.../sort/formats/inlongmsgkv/InLongMsgKvTest.java | 2 +-
.../InLongMsgBinlogFormatFactory.java | 2 +-
...nLongMsgBinlogRowDataDeserializationSchema.java | 2 +-
.../InLongMsgBinlogFormatFactoryTest.java | 10 ++--
20 files changed, 367 insertions(+), 33 deletions(-)
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
index 575c99fd01..abbd5c149b 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
@@ -17,28 +17,82 @@
package org.apache.inlong.sort.protocol.deserialization;
+import org.apache.commons.lang3.StringUtils;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
/**
* Csv deserialization info
*/
-public class CsvDeserializationInfo implements DeserializationInfo {
+public class CsvDeserializationInfo extends InLongMsgDeserializationInfo {
- private static final long serialVersionUID = -5035426390567887081L;
+ private static final long serialVersionUID = 7424482369272150638L;
private final char splitter;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final Character escapeChar;
+
+ private final String tid;
+
// TODO: support mapping index to field
+ public CsvDeserializationInfo(
+ @JsonProperty("splitter") char splitter) {
+ this(TID_DEFAULT_VALUE, splitter, null);
+ }
+
+ public CsvDeserializationInfo(
+ @JsonProperty("splitter") char splitter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar) {
+ this(TID_DEFAULT_VALUE, splitter, escapeChar);
+ }
@JsonCreator
public CsvDeserializationInfo(
- @JsonProperty("splitter") char splitter) {
+ @JsonProperty("tid") String tid,
+ @JsonProperty("splitter") char splitter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar) {
+ super(tid);
+ this.tid = (StringUtils.isEmpty(tid) ? TID_DEFAULT_VALUE : tid);
this.splitter = splitter;
+ this.escapeChar = escapeChar;
}
@JsonProperty("splitter")
public char getSplitter() {
return splitter;
}
+
+ @JsonProperty("escape_char")
+ @Nullable
+ public Character getEscapeChar() {
+ return escapeChar;
+ }
+
+ @JsonProperty("tid")
+ public String getTid() {
+ return tid;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ CsvDeserializationInfo other = (CsvDeserializationInfo) o;
+ return Objects.equals(tid, other.getTid()) && splitter ==
other.splitter
+ && Objects.equals(escapeChar, other.escapeChar);
+ }
+
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
index b598d975ea..dfe68472fd 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
@@ -41,4 +41,5 @@ import java.io.Serializable;
})
public interface DeserializationInfo extends Serializable {
+ String TID_DEFAULT_VALUE = "-";
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
index 0806717fbf..7b45ed0026 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
@@ -18,8 +18,13 @@
package org.apache.inlong.sort.protocol.deserialization;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
/**
* It represents CSV2 format of InLongMsg(m=9).
*/
@@ -29,16 +34,51 @@ public class InLongMsgCsv2DeserializationInfo extends
InLongMsgDeserializationIn
private final char delimiter;
- @JsonCreator
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final Character escapeChar;
+
public InLongMsgCsv2DeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter) {
+ this(tid, delimiter, null);
+ }
+
+ @JsonCreator
+ public InLongMsgCsv2DeserializationInfo(
+ @JsonProperty("tid") String tid,
+ @JsonProperty("delimiter") char delimiter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar) {
super(tid);
this.delimiter = delimiter;
+ this.escapeChar = escapeChar;
}
@JsonProperty("delimiter")
public char getDelimiter() {
return delimiter;
}
-}
+
+ @JsonProperty("escape_char")
+ @Nullable
+ public Character getEscapeChar() {
+ return escapeChar;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ InLongMsgCsv2DeserializationInfo other =
(InLongMsgCsv2DeserializationInfo) o;
+ return super.equals(other)
+ && delimiter == other.delimiter
+ && Objects.equals(escapeChar, other.escapeChar);
+ }
+
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
index 890cea3aac..79ec62f9f7 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
@@ -22,6 +22,10 @@ import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInc
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
/**
* It represents CSV format of InLongMsg(m=0).
*/
@@ -31,22 +35,35 @@ public class InLongMsgCsvDeserializationInfo extends
InLongMsgDeserializationInf
private final char delimiter;
+ @JsonInclude(Include.NON_NULL)
+ @Nullable
+ private final Character escapeChar;
+
@JsonInclude(Include.NON_NULL)
private final boolean deleteHeadDelimiter;
public InLongMsgCsvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter) {
- this(tid, delimiter, true);
+ this(tid, delimiter, null, false);
+ }
+
+ public InLongMsgCsvDeserializationInfo(
+ @JsonProperty("tid") String tid,
+ @JsonProperty("delimiter") char delimiter,
+ @JsonProperty("delete_head_delimiter") boolean
deleteHeadDelimiter) {
+ this(tid, delimiter, null, deleteHeadDelimiter);
}
@JsonCreator
public InLongMsgCsvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar,
@JsonProperty("delete_head_delimiter") boolean
deleteHeadDelimiter) {
super(tid);
this.delimiter = delimiter;
+ this.escapeChar = escapeChar;
this.deleteHeadDelimiter = deleteHeadDelimiter;
}
@@ -55,8 +72,31 @@ public class InLongMsgCsvDeserializationInfo extends
InLongMsgDeserializationInf
return delimiter;
}
+ @JsonProperty("escape_char")
+ @Nullable
+ public Character getEscapeChar() {
+ return escapeChar;
+ }
+
@JsonProperty("delete_head_delimiter")
public boolean isDeleteHeadDelimiter() {
return deleteHeadDelimiter;
}
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ InLongMsgCsvDeserializationInfo other =
(InLongMsgCsvDeserializationInfo) o;
+ return super.equals(other)
+ && delimiter == other.delimiter
+ && Objects.equals(escapeChar, other.escapeChar)
+ && deleteHeadDelimiter == other.deleteHeadDelimiter;
+ }
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
index 47d74cbfd1..1a5c8ff67e 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
@@ -17,8 +17,15 @@
package org.apache.inlong.sort.protocol.deserialization;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
/**
* It represents KV format of InLongMsg(m=5).
*/
@@ -30,13 +37,33 @@ public class InLongMsgKvDeserializationInfo extends
InLongMsgDeserializationInfo
private final char kvDelimiter;
+ @JsonInclude(Include.NON_NULL)
+ @Nullable
+ private final Character escapeChar;
+
+ @JsonInclude(Include.NON_NULL)
+ @Nullable
+ private final Character lineDelimiter;
+
public InLongMsgKvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter) {
+ this(tid, entryDelimiter, kvDelimiter, null, null);
+ }
+
+ @JsonCreator
+ public InLongMsgKvDeserializationInfo(
+ @JsonProperty("tid") String tid,
+ @JsonProperty("entry_delimiter") char entryDelimiter,
+ @JsonProperty("kv_delimiter") char kvDelimiter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar,
+ @JsonProperty("line_delimiter") @Nullable Character lineDelimiter)
{
super(tid);
this.entryDelimiter = entryDelimiter;
this.kvDelimiter = kvDelimiter;
+ this.escapeChar = escapeChar;
+ this.lineDelimiter = lineDelimiter == null ? '\n' : lineDelimiter;
}
@JsonProperty("entry_delimiter")
@@ -48,4 +75,34 @@ public class InLongMsgKvDeserializationInfo extends
InLongMsgDeserializationInfo
public char getKvDelimiter() {
return kvDelimiter;
}
-}
+
+ @JsonProperty("escape_char")
+ @Nullable
+ public Character getEscapeChar() {
+ return escapeChar;
+ }
+
+ @JsonProperty("line_delimiter")
+ @Nullable
+ public Character getLineDelimiter() {
+ return lineDelimiter;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ InLongMsgKvDeserializationInfo other =
(InLongMsgKvDeserializationInfo) o;
+ return super.equals(other)
+ && entryDelimiter == other.entryDelimiter
+ && kvDelimiter == other.kvDelimiter
+ && Objects.equals(escapeChar, other.escapeChar)
+ && Objects.equals(lineDelimiter, other.lineDelimiter);
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
index 53426ff060..fbfb95c1fd 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
@@ -18,8 +18,14 @@
package org.apache.inlong.sort.protocol.deserialization;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
/**
* It represents TLog CSV format of InLongMsg(m=10).
*/
@@ -29,16 +35,51 @@ public class InLongMsgTlogCsvDeserializationInfo extends
InLongMsgDeserializatio
private final char delimiter;
- @JsonCreator
+ @JsonInclude(Include.NON_NULL)
+ @Nullable
+ private final Character escapeChar;
+
public InLongMsgTlogCsvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter) {
+ this(tid, delimiter, null);
+ }
+
+ @JsonCreator
+ public InLongMsgTlogCsvDeserializationInfo(
+ @JsonProperty("tid") String tid,
+ @JsonProperty("delimiter") char delimiter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar) {
super(tid);
this.delimiter = delimiter;
+ this.escapeChar = escapeChar;
}
@JsonProperty("delimiter")
public char getDelimiter() {
return delimiter;
}
-}
+
+ @JsonProperty("escape_char")
+ @Nullable
+ public Character getEscapeChar() {
+ return escapeChar;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ InLongMsgTlogCsvDeserializationInfo other =
(InLongMsgTlogCsvDeserializationInfo) o;
+ return super.equals(other)
+ && delimiter == other.delimiter
+ && Objects.equals(escapeChar, other.escapeChar);
+ }
+
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
index 4c9bd16ea4..f3b2985128 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
@@ -17,8 +17,15 @@
package org.apache.inlong.sort.protocol.deserialization;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude.Include;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
/**
* It represents TLog KV format of InLongMsg(m=15).
*/
@@ -32,15 +39,30 @@ public class InLongMsgTlogKvDeserializationInfo extends
InLongMsgDeserialization
private final char kvDelimiter;
+ @JsonInclude(Include.NON_NULL)
+ @Nullable
+ private final Character escapeChar;
+
public InLongMsgTlogKvDeserializationInfo(
@JsonProperty("tid") String tid,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter) {
+ this(tid, delimiter, entryDelimiter, kvDelimiter, null);
+ }
+
+ @JsonCreator
+ public InLongMsgTlogKvDeserializationInfo(
+ @JsonProperty("tid") String tid,
+ @JsonProperty("delimiter") char delimiter,
+ @JsonProperty("entry_delimiter") char entryDelimiter,
+ @JsonProperty("kv_delimiter") char kvDelimiter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar) {
super(tid);
this.delimiter = delimiter;
this.entryDelimiter = entryDelimiter;
this.kvDelimiter = kvDelimiter;
+ this.escapeChar = escapeChar;
}
@JsonProperty("delimiter")
@@ -57,4 +79,28 @@ public class InLongMsgTlogKvDeserializationInfo extends
InLongMsgDeserialization
public char getKvDelimiter() {
return kvDelimiter;
}
-}
+
+ @JsonProperty("escape_char")
+ @Nullable
+ public Character getEscapeChar() {
+ return escapeChar;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ InLongMsgTlogKvDeserializationInfo other =
(InLongMsgTlogKvDeserializationInfo) o;
+ return super.equals(other)
+ && delimiter == other.delimiter
+ && entryDelimiter == other.entryDelimiter
+ && kvDelimiter == other.kvDelimiter
+ && Objects.equals(escapeChar, other.escapeChar);
+ }
+}
\ No newline at end of file
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
index 73529a14de..c15839f50b 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
@@ -17,28 +17,56 @@
package org.apache.inlong.sort.protocol.deserialization;
+import org.apache.commons.lang3.StringUtils;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
-import static com.google.common.base.Preconditions.checkNotNull;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
/**
* Kv deserialization info
*/
-public class KvDeserializationInfo implements DeserializationInfo {
+public class KvDeserializationInfo extends InLongMsgDeserializationInfo {
- private static final long serialVersionUID = 1976031542480774581L;
+ private static final long serialVersionUID = -3182881360079888043L;
private final char entrySplitter;
private final char kvSplitter;
- @JsonCreator
+ private final String tid;
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ @Nullable
+ private final Character escapeChar;
+
public KvDeserializationInfo(
@JsonProperty("entry_splitter") char entrySplitter,
@JsonProperty("kv_splitter") char kvSplitter) {
- this.entrySplitter = checkNotNull(entrySplitter);
- this.kvSplitter = checkNotNull(kvSplitter);
+ this(TID_DEFAULT_VALUE, entrySplitter, kvSplitter, null);
+ }
+
+ public KvDeserializationInfo(
+ @JsonProperty("entry_splitter") char entrySplitter,
+ @JsonProperty("kv_splitter") char kvSplitter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar) {
+ this(TID_DEFAULT_VALUE, entrySplitter, kvSplitter, escapeChar);
+ }
+
+ @JsonCreator
+ public KvDeserializationInfo(
+ @JsonProperty("tid") String tid,
+ @JsonProperty("entry_splitter") char entrySplitter,
+ @JsonProperty("kv_splitter") char kvSplitter,
+ @JsonProperty("escape_char") @Nullable Character escapeChar) {
+ super(tid);
+ this.tid = (StringUtils.isEmpty(tid) ? TID_DEFAULT_VALUE : tid);
+ this.entrySplitter = entrySplitter;
+ this.kvSplitter = kvSplitter;
+ this.escapeChar = escapeChar;
}
@JsonProperty("entry_splitter")
@@ -50,4 +78,31 @@ public class KvDeserializationInfo implements
DeserializationInfo {
public char getKvSplitter() {
return kvSplitter;
}
+
+ @JsonProperty("escape_char")
+ @Nullable
+ public Character getEscapeChar() {
+ return escapeChar;
+ }
+
+ @JsonProperty("tid")
+ public String getTid() {
+ return tid;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ KvDeserializationInfo other = (KvDeserializationInfo) o;
+ return Objects.equals(tid, other.getTid()) && entrySplitter ==
other.entrySplitter
+ && kvSplitter == other.kvSplitter
+ && Objects.equals(escapeChar, other.escapeChar);
+ }
}
diff --git
a/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/Csv.java
b/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/Csv.java
index 307cd7edfd..e84c8f9ea9 100644
---
a/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/Csv.java
+++
b/inlong-sort/sort-formats/format-row/format-csv/src/main/java/org/apache/inlong/sort/formats/csv/Csv.java
@@ -30,7 +30,7 @@ import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_DE
*/
public class Csv extends TextFormatDescriptor<Csv> {
- public static final String FORMAT_TYPE_VALUE = "InLong-CSV";
+ public static final String FORMAT_TYPE_VALUE = "inlong-csv";
public Csv() {
super(FORMAT_TYPE_VALUE, 1);
diff --git
a/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvTest.java
b/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvTest.java
index d3d90ad8d2..93726d4632 100644
---
a/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvTest.java
+++
b/inlong-sort/sort-formats/format-row/format-csv/src/test/java/org/apache/inlong/sort/formats/csv/CsvTest.java
@@ -92,7 +92,7 @@ public class CsvTest extends DescriptorTestBase {
@Override
public List<Map<String, String>> properties() {
final Map<String, String> props1 = new HashMap<>();
- props1.put("format.type", "InLong-CSV");
+ props1.put("format.type", "inlong-csv");
props1.put("format.property-version", "1");
props1.put("format.schema", TEST_SCHEMA);
props1.put("format.delimiter", ";");
@@ -102,7 +102,7 @@ public class CsvTest extends DescriptorTestBase {
props1.put("format.null-literal", "n/a");
final Map<String, String> props2 = new HashMap<>();
- props2.put("format.type", "InLong-CSV");
+ props2.put("format.type", "inlong-csv");
props2.put("format.property-version", "1");
props2.put("format.derive-schema", "true");
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
index 7875f526ac..02fad5a8eb 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlog.java
@@ -41,7 +41,7 @@ import static
org.apache.inlong.sort.formats.inlongmsgbinlog.InLongMsgBinlogUtil
*/
public class InLongMsgBinlog extends FormatDescriptor {
- public static final String FORMAT_TYPE_VALUE = "InLongMsg-Binlog";
+ public static final String FORMAT_TYPE_VALUE = "inlong-msg-binlog";
private DescriptorProperties internalProperties = new
DescriptorProperties(true);
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogTest.java
index 7b970d3a20..29a27b7d13 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogTest.java
@@ -76,7 +76,7 @@ public class InLongMsgBinlogTest extends DescriptorTestBase {
@Override
public List<Map<String, String>> properties() {
final Map<String, String> props1 = new HashMap<>();
- props1.put("format.type", "InLongMsg-Binlog");
+ props1.put("format.type", "inlong-msg-binlog");
props1.put("format.property-version", "1");
props1.put("format.schema", marshall(TEST_SCHEMA));
props1.put("format.time-field-name", "time");
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
index 1567ea0d3e..96e807a327 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsv.java
@@ -36,7 +36,7 @@ import static
org.apache.inlong.sort.formats.inlongmsgcsv.InLongMsgCsvUtils.FORM
*/
public class InLongMsgCsv extends TextFormatDescriptor<InLongMsgCsv> {
- public static final String FORMAT_TYPE_VALUE = "InLongMsg-CSV";
+ public static final String FORMAT_TYPE_VALUE = "inlong-msg-csv";
public InLongMsgCsv() {
super(FORMAT_TYPE_VALUE, 1);
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
index 72acf4486b..e23967dabb 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/test/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvTest.java
@@ -88,7 +88,7 @@ public class InLongMsgCsvTest extends DescriptorTestBase {
@Override
public List<Map<String, String>> properties() {
final Map<String, String> props1 = new HashMap<>();
- props1.put("format.type", "InLongMsg-CSV");
+ props1.put("format.type", "inlong-msg-csv");
props1.put("format.property-version", "1");
props1.put("format.schema", TEST_SCHEMA);
props1.put("format.time-field-name", "time");
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
index 52eb340ae1..1a1b93cc16 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKv.java
@@ -32,7 +32,7 @@ import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.FORMAT_TIM
*/
public class InLongMsgKv extends TextFormatDescriptor<InLongMsgKv> {
- public static final String FORMAT_TYPE_VALUE = "InLongMsg-KV";
+ public static final String FORMAT_TYPE_VALUE = "inlong-msg-kv";
public InLongMsgKv() {
super(FORMAT_TYPE_VALUE, 1);
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
index 59195b87e1..a46fa78459 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
@@ -37,7 +37,7 @@ import java.util.Map;
import java.util.Objects;
/**
- * Converter used to deserialize a mixed row in InLongMsg-kv format.
+ * Converter used to deserialize a mixed row in inlong-msg-kv format.
*/
public class InLongMsgKvMixedFormatConverter extends
AbstractInLongMsgMixedFormatConverter {
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java
index 8b84fa125c..d384ac306c 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/test/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvTest.java
@@ -88,7 +88,7 @@ public class InLongMsgKvTest extends DescriptorTestBase {
@Override
public List<Map<String, String>> properties() {
final Map<String, String> props1 = new HashMap<>();
- props1.put("format.type", "InLongMsg-KV");
+ props1.put("format.type", "inlong-msg-kv");
props1.put("format.property-version", "1");
props1.put("format.schema", TEST_SCHEMA);
props1.put("format.time-field-name", "time");
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
index af3e7d320d..447bafcf57 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactory.java
@@ -44,7 +44,7 @@ import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgOptions.TIME_FIE
*/
public final class InLongMsgBinlogFormatFactory implements
DeserializationFormatFactory {
- public static final String IDENTIFIER = "InLongMsg-Binlog";
+ public static final String IDENTIFIER = "inlong-msg-binlog";
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java
index ade0705f0b..3d0d88d401 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogRowDataDeserializationSchema.java
@@ -29,7 +29,7 @@ import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_AT
import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgUtils.DEFAULT_TIME_FIELD_NAME;
/**
- * Deserialization schema from InLongMsg-Binlog to Flink Table & SQL internal
data structures.
+ * Deserialization schema from inlong-msg-binlog to Flink Table & SQL internal
data structures.
*/
public class InLongMsgBinlogRowDataDeserializationSchema extends
AbstractInLongMsgDeserializationSchema {
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactoryTest.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactoryTest.java
index 8b27c74137..ccb05d7f08 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactoryTest.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/test/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogFormatFactoryTest.java
@@ -109,11 +109,11 @@ public class InLongMsgBinlogFormatFactoryTest {
options.put("buffer-size", "1000");
options.put("format", InLongMsgBinlogFormatFactory.IDENTIFIER);
- options.put("InLongMsg-Binlog.row.format.info",
FormatUtils.marshall(testFormatInfo));
- options.put("InLongMsg-Binlog.format.time-field-name", "time");
- options.put("InLongMsg-Binlog.format.attribute-field-name",
"attributes");
- options.put("InLongMsg-Binlog.format.ignore-errors", "true");
- options.put("InLongMsg-Binlog.format.include-update-before", "false");
+ options.put("inlong-msg-binlog.row.format.info",
FormatUtils.marshall(testFormatInfo));
+ options.put("inlong-msg-binlog.format.time-field-name", "time");
+ options.put("inlong-msg-binlog.format.attribute-field-name",
"attributes");
+ options.put("inlong-msg-binlog.format.ignore-errors", "true");
+ options.put("inlong-msg-binlog.format.include-update-before", "false");
return options;
}
}