This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 3d1b611dc5 [INLONG-9626][Sort] Change the variable name from tid to
streamId (#9629)
3d1b611dc5 is described below
commit 3d1b611dc510676323b02bfe7c28247759ed2569
Author: baomingyu <[email protected]>
AuthorDate: Wed Jan 31 17:22:07 2024 +0800
[INLONG-9626][Sort] Change the variable name from tid to streamId (#9629)
---
.../manager/pojo/source/tubemq/TubeMQSource.java | 2 +-
.../manager/pojo/source/tubemq/TubeMQSourceDTO.java | 4 ++--
.../pojo/source/tubemq/TubeMQSourceRequest.java | 4 ++--
.../sdk/dataproxy/example/UdpClientExample.java | 2 +-
.../deserialization/CsvDeserializationInfo.java | 20 ++++++++++----------
.../deserialization/DeserializationInfo.java | 2 +-
.../InLongMsgCsv2DeserializationInfo.java | 8 ++++----
.../InLongMsgCsvDeserializationInfo.java | 12 ++++++------
.../InLongMsgDeserializationInfo.java | 12 ++++++------
.../InLongMsgKvDeserializationInfo.java | 8 ++++----
.../InLongMsgTlogCsvDeserializationInfo.java | 8 ++++----
.../InLongMsgTlogKvDeserializationInfo.java | 8 ++++----
.../deserialization/KvDeserializationInfo.java | 20 ++++++++++----------
.../inlong/sort/tubemq/table/TubeMQOptions.java | 2 +-
.../inlong/sort/formats/inlongmsg/InLongMsgBody.java | 12 ++++++------
.../inlong/sort/formats/inlongmsg/InLongMsgHead.java | 16 ++++++++--------
.../sort/formats/inlongmsg/InLongMsgMetadata.java | 2 +-
.../AbstractInLongMsgMixedFormatConverter.java | 6 +++---
.../formats/inlongmsg/InLongMsgDecodingFormat.java | 2 +-
.../sort/formats/inlongmsg/InLongMsgUtils.java | 10 ++++++----
.../InLongMsgBinlogMixedFormatConverter.java | 2 +-
.../InLongMsgBinlogMixedFormatDeserializer.java | 2 +-
.../inlongmsgbinlog/InLongMsgBinlogUtils.java | 8 ++++----
.../InLongMsgCsvMixedFormatConverter.java | 2 +-
.../InLongMsgCsvMixedFormatDeserializer.java | 2 +-
.../sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java | 8 ++++----
.../inlongmsgkv/InLongMsgKvMixedFormatConverter.java | 2 +-
.../InLongMsgKvMixedFormatDeserializer.java | 2 +-
.../sort/formats/inlongmsgkv/InLongMsgKvUtils.java | 8 ++++----
.../InLongMsgTlogCsvMixedFormatConverter.java | 2 +-
.../InLongMsgTlogCsvMixedFormatDeserializer.java | 2 +-
.../inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java | 4 ++--
.../InLongMsgTlogKvMixedFormatConverter.java | 2 +-
.../InLongMsgTlogKvMixedFormatDeserializer.java | 2 +-
.../inlongmsgtlogkv/InLongMsgTlogKvUtils.java | 4 ++--
.../AbstractInLongMsgMixedFormatConverter.java | 6 +++---
.../sort/formats/inlongmsg/InLongMsgUtils.java | 11 +++++++----
.../inlongmsgbinlog/InLongMsgBinlogUtils.java | 10 +++++-----
.../org/apache/flink/connectors/tubemq/Tubemq.java | 16 ++++++++--------
.../flink/connectors/tubemq/TubemqOptions.java | 6 +++---
.../flink/connectors/tubemq/TubemqSinkFunction.java | 10 +++++-----
.../connectors/tubemq/TubemqSourceFunction.java | 16 ++++++++--------
.../flink/connectors/tubemq/TubemqTableSource.java | 16 ++++++++--------
.../tubemq/TubemqTableSourceSinkFactory.java | 14 +++++++-------
.../flink/connectors/tubemq/TubemqValidator.java | 8 ++++----
.../apache/flink/connectors/tubemq/TubemqTest.java | 4 ++--
46 files changed, 167 insertions(+), 162 deletions(-)
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
index 7714ba6229..a2539b4c98 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSource.java
@@ -65,7 +65,7 @@ public class TubeMQSource extends StreamSource {
/**
* The TubeMQ consumers use this streamId set to filter records reading
from server.
*/
- @ApiModelProperty("Tid of the TubeMQ")
+ @ApiModelProperty("StreamId of the TubeMQ")
private TreeSet<String> streamId;
@ApiModelProperty(value = "The message body wrap wrap type, including:
RAW, INLONG_MSG_V0, INLONG_MSG_V1, etc")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
index 717e06c631..bae3b951bd 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceDTO.java
@@ -62,9 +62,9 @@ public class TubeMQSourceDTO {
private String wrapType;
/**
- * The tubemq consumers use this tid set to filter records reading from
server.
+ * The tubemq consumers use this streamId set to filter records reading
from server.
*/
- @ApiModelProperty("Tid of the TubeMQ")
+ @ApiModelProperty("streamId of the TubeMQ")
private TreeSet<String> streamId;
@ApiModelProperty("Properties for TubeMQ")
diff --git
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
index a584a9d16e..7ef7ca4c34 100644
---
a/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
+++
b/inlong-manager/manager-pojo/src/main/java/org/apache/inlong/manager/pojo/source/tubemq/TubeMQSourceRequest.java
@@ -58,9 +58,9 @@ public class TubeMQSourceRequest extends SourceRequest {
private String wrapType;
/**
- * The TubeMQ consumers use this tid set to filter records reading from
server.
+ * The TubeMQ consumers use this streamId set to filter records reading
from server.
*/
- @ApiModelProperty("Tid of the TubeMQ")
+ @ApiModelProperty("streamId of the TubeMQ")
private TreeSet<String> streamId;
public TubeMQSourceRequest() {
diff --git
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
index 500b7f133f..26e490fc37 100644
---
a/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
+++
b/inlong-sdk/dataproxy-sdk/src/main/java/org/apache/inlong/sdk/dataproxy/example/UdpClientExample.java
@@ -206,7 +206,7 @@ public class UdpClientExample {
if (Utils.isNotBlank(endAttr)) {
endAttr = endAttr + "&";
}
- endAttr = (endAttr + "bid=" + object.getGroupId() + "&tid="
+ endAttr = (endAttr + "groupId=" + object.getGroupId() +
"&streamId="
+ object.getStreamId());
}
if (Utils.isNotBlank(object.getMsgUUID())) {
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
index abbd5c149b..e43a2f129f 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/CsvDeserializationInfo.java
@@ -39,27 +39,27 @@ public class CsvDeserializationInfo extends
InLongMsgDeserializationInfo {
@Nullable
private final Character escapeChar;
- private final String tid;
+ private final String streamId;
// TODO: support mapping index to field
public CsvDeserializationInfo(
@JsonProperty("splitter") char splitter) {
- this(TID_DEFAULT_VALUE, splitter, null);
+ this(STREAM_ID_DEFAULT_VALUE, splitter, null);
}
public CsvDeserializationInfo(
@JsonProperty("splitter") char splitter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
- this(TID_DEFAULT_VALUE, splitter, escapeChar);
+ this(STREAM_ID_DEFAULT_VALUE, splitter, escapeChar);
}
@JsonCreator
public CsvDeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("splitter") char splitter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
- super(tid);
- this.tid = (StringUtils.isEmpty(tid) ? TID_DEFAULT_VALUE : tid);
+ super(streamId);
+ this.streamId = (StringUtils.isEmpty(streamId) ?
STREAM_ID_DEFAULT_VALUE : streamId);
this.splitter = splitter;
this.escapeChar = escapeChar;
}
@@ -75,9 +75,9 @@ public class CsvDeserializationInfo extends
InLongMsgDeserializationInfo {
return escapeChar;
}
- @JsonProperty("tid")
- public String getTid() {
- return tid;
+ @JsonProperty("streamId")
+ public String getStreamId() {
+ return streamId;
}
@Override
@@ -91,7 +91,7 @@ public class CsvDeserializationInfo extends
InLongMsgDeserializationInfo {
}
CsvDeserializationInfo other = (CsvDeserializationInfo) o;
- return Objects.equals(tid, other.getTid()) && splitter ==
other.splitter
+ return Objects.equals(streamId, other.getStreamId()) && splitter ==
other.splitter
&& Objects.equals(escapeChar, other.escapeChar);
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
index dfe68472fd..56fa49312c 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/DeserializationInfo.java
@@ -41,5 +41,5 @@ import java.io.Serializable;
})
public interface DeserializationInfo extends Serializable {
- String TID_DEFAULT_VALUE = "-";
+ String STREAM_ID_DEFAULT_VALUE = "-";
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
index 7b45ed0026..0b8e45d241 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsv2DeserializationInfo.java
@@ -39,17 +39,17 @@ public class InLongMsgCsv2DeserializationInfo extends
InLongMsgDeserializationIn
private final Character escapeChar;
public InLongMsgCsv2DeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("delimiter") char delimiter) {
- this(tid, delimiter, null);
+ this(streamId, delimiter, null);
}
@JsonCreator
public InLongMsgCsv2DeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
- super(tid);
+ super(streamId);
this.delimiter = delimiter;
this.escapeChar = escapeChar;
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
index 79ec62f9f7..d5817b4503 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgCsvDeserializationInfo.java
@@ -43,25 +43,25 @@ public class InLongMsgCsvDeserializationInfo extends
InLongMsgDeserializationInf
private final boolean deleteHeadDelimiter;
public InLongMsgCsvDeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("delimiter") char delimiter) {
- this(tid, delimiter, null, false);
+ this(streamId, delimiter, null, false);
}
public InLongMsgCsvDeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("delete_head_delimiter") boolean
deleteHeadDelimiter) {
- this(tid, delimiter, null, deleteHeadDelimiter);
+ this(streamId, delimiter, null, deleteHeadDelimiter);
}
@JsonCreator
public InLongMsgCsvDeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar,
@JsonProperty("delete_head_delimiter") boolean
deleteHeadDelimiter) {
- super(tid);
+ super(streamId);
this.delimiter = delimiter;
this.escapeChar = escapeChar;
this.deleteHeadDelimiter = deleteHeadDelimiter;
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
index 6b86577e81..357a4fb886 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgDeserializationInfo.java
@@ -28,14 +28,14 @@ public abstract class InLongMsgDeserializationInfo
implements DeserializationInf
private static final long serialVersionUID = 3707412713264864315L;
- private final String tid;
+ private final String streamId;
- public InLongMsgDeserializationInfo(@JsonProperty("tid") String tid) {
- this.tid = checkNotNull(tid);
+ public InLongMsgDeserializationInfo(@JsonProperty("streamId") String
streamId) {
+ this.streamId = checkNotNull(streamId);
}
- @JsonProperty("tid")
- public String getTid() {
- return tid;
+ @JsonProperty("streamId")
+ public String getStreamId() {
+ return streamId;
}
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
index 1a5c8ff67e..99ff27f8ba 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgKvDeserializationInfo.java
@@ -46,20 +46,20 @@ public class InLongMsgKvDeserializationInfo extends
InLongMsgDeserializationInfo
private final Character lineDelimiter;
public InLongMsgKvDeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter) {
- this(tid, entryDelimiter, kvDelimiter, null, null);
+ this(streamId, entryDelimiter, kvDelimiter, null, null);
}
@JsonCreator
public InLongMsgKvDeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar,
@JsonProperty("line_delimiter") @Nullable Character lineDelimiter)
{
- super(tid);
+ super(streamId);
this.entryDelimiter = entryDelimiter;
this.kvDelimiter = kvDelimiter;
this.escapeChar = escapeChar;
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
index fbfb95c1fd..223a12d2b5 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogCsvDeserializationInfo.java
@@ -40,17 +40,17 @@ public class InLongMsgTlogCsvDeserializationInfo extends
InLongMsgDeserializatio
private final Character escapeChar;
public InLongMsgTlogCsvDeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("delimiter") char delimiter) {
- this(tid, delimiter, null);
+ this(streamId, delimiter, null);
}
@JsonCreator
public InLongMsgTlogCsvDeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
- super(tid);
+ super(streamId);
this.delimiter = delimiter;
this.escapeChar = escapeChar;
}
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
index f3b2985128..77ad77ac82 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/InLongMsgTlogKvDeserializationInfo.java
@@ -44,21 +44,21 @@ public class InLongMsgTlogKvDeserializationInfo extends
InLongMsgDeserialization
private final Character escapeChar;
public InLongMsgTlogKvDeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter) {
- this(tid, delimiter, entryDelimiter, kvDelimiter, null);
+ this(streamId, delimiter, entryDelimiter, kvDelimiter, null);
}
@JsonCreator
public InLongMsgTlogKvDeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("delimiter") char delimiter,
@JsonProperty("entry_delimiter") char entryDelimiter,
@JsonProperty("kv_delimiter") char kvDelimiter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
- super(tid);
+ super(streamId);
this.delimiter = delimiter;
this.entryDelimiter = entryDelimiter;
this.kvDelimiter = kvDelimiter;
diff --git
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
index c15839f50b..583316c783 100644
---
a/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
+++
b/inlong-sort/sort-common/src/main/java/org/apache/inlong/sort/protocol/deserialization/KvDeserializationInfo.java
@@ -37,7 +37,7 @@ public class KvDeserializationInfo extends
InLongMsgDeserializationInfo {
private final char kvSplitter;
- private final String tid;
+ private final String streamId;
@JsonInclude(JsonInclude.Include.NON_NULL)
@Nullable
@@ -46,24 +46,24 @@ public class KvDeserializationInfo extends
InLongMsgDeserializationInfo {
public KvDeserializationInfo(
@JsonProperty("entry_splitter") char entrySplitter,
@JsonProperty("kv_splitter") char kvSplitter) {
- this(TID_DEFAULT_VALUE, entrySplitter, kvSplitter, null);
+ this(STREAM_ID_DEFAULT_VALUE, entrySplitter, kvSplitter, null);
}
public KvDeserializationInfo(
@JsonProperty("entry_splitter") char entrySplitter,
@JsonProperty("kv_splitter") char kvSplitter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
- this(TID_DEFAULT_VALUE, entrySplitter, kvSplitter, escapeChar);
+ this(STREAM_ID_DEFAULT_VALUE, entrySplitter, kvSplitter, escapeChar);
}
@JsonCreator
public KvDeserializationInfo(
- @JsonProperty("tid") String tid,
+ @JsonProperty("streamId") String streamId,
@JsonProperty("entry_splitter") char entrySplitter,
@JsonProperty("kv_splitter") char kvSplitter,
@JsonProperty("escape_char") @Nullable Character escapeChar) {
- super(tid);
- this.tid = (StringUtils.isEmpty(tid) ? TID_DEFAULT_VALUE : tid);
+ super(streamId);
+ this.streamId = (StringUtils.isEmpty(streamId) ?
STREAM_ID_DEFAULT_VALUE : streamId);
this.entrySplitter = entrySplitter;
this.kvSplitter = kvSplitter;
this.escapeChar = escapeChar;
@@ -85,9 +85,9 @@ public class KvDeserializationInfo extends
InLongMsgDeserializationInfo {
return escapeChar;
}
- @JsonProperty("tid")
- public String getTid() {
- return tid;
+ @JsonProperty("streamId")
+ public String getStreamId() {
+ return streamId;
}
@Override
@@ -101,7 +101,7 @@ public class KvDeserializationInfo extends
InLongMsgDeserializationInfo {
}
KvDeserializationInfo other = (KvDeserializationInfo) o;
- return Objects.equals(tid, other.getTid()) && entrySplitter ==
other.entrySplitter
+ return Objects.equals(streamId, other.getStreamId()) && entrySplitter
== other.entrySplitter
&& kvSplitter == other.kvSplitter
&& Objects.equals(escapeChar, other.escapeChar);
}
diff --git
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
index 357da2dd09..dd4ef5b6f2 100644
---
a/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
+++
b/inlong-sort/sort-flink/sort-flink-v1.13/sort-connectors/tubemq/src/main/java/org/apache/inlong/sort/tubemq/table/TubeMQOptions.java
@@ -152,7 +152,7 @@ public class TubeMQOptions {
.stringType()
.asList()
.noDefaultValue()
- .withDescription("The tid owned this topic.");
+ .withDescription("The streamId owned this topic.");
public static final ConfigOption<Integer> MAX_RETRIES =
ConfigOptions.key("max.retries")
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
index f0860b1b9c..77a7812b22 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgBody.java
@@ -37,7 +37,7 @@ public class InLongMsgBody implements Serializable {
/**
* The interface of the record.
*/
- private final String tid;
+ private final String streamId;
/**
* The fields extracted from the body.
@@ -51,11 +51,11 @@ public class InLongMsgBody implements Serializable {
public InLongMsgBody(
byte[] data,
- String tid,
+ String streamId,
List<String> fields,
Map<String, String> entries) {
this.data = data;
- this.tid = tid;
+ this.streamId = streamId;
this.fields = fields;
this.entries = entries;
}
@@ -64,8 +64,8 @@ public class InLongMsgBody implements Serializable {
return data;
}
- public String getTid() {
- return tid;
+ public String getStreamId() {
+ return streamId;
}
public List<String> getFields() {
@@ -97,7 +97,7 @@ public class InLongMsgBody implements Serializable {
@Override
public String toString() {
- return "InLongMsgBody{" + "data=" + Arrays.toString(data) + ", tid='"
+ tid + '\''
+ return "InLongMsgBody{" + "data=" + Arrays.toString(data) + ",
streamId='" + streamId + '\''
+ ", fields=" + fields + ", entries=" + entries + '}';
}
}
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
index 61e6f6234f..e651c023dd 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgHead.java
@@ -40,7 +40,7 @@ public class InLongMsgHead implements Serializable {
/**
* The interface of the record.
*/
- private final String tid;
+ private final String streamId;
/**
* The time of the record.
@@ -54,11 +54,11 @@ public class InLongMsgHead implements Serializable {
public InLongMsgHead(
Map<String, String> attributes,
- String tid,
+ String streamId,
Timestamp time,
List<String> predefinedFields) {
this.attributes = attributes;
- this.tid = tid;
+ this.streamId = streamId;
this.time = time;
this.predefinedFields = predefinedFields;
}
@@ -67,8 +67,8 @@ public class InLongMsgHead implements Serializable {
return attributes;
}
- public String getTid() {
- return tid;
+ public String getStreamId() {
+ return streamId;
}
public Timestamp getTime() {
@@ -91,21 +91,21 @@ public class InLongMsgHead implements Serializable {
InLongMsgHead that = (InLongMsgHead) o;
return Objects.equals(attributes, that.attributes)
- && Objects.equals(tid, that.tid)
+ && Objects.equals(streamId, that.streamId)
&& Objects.equals(time, that.time)
&& Objects.equals(predefinedFields, that.predefinedFields);
}
@Override
public int hashCode() {
- return Objects.hash(attributes, tid, time, predefinedFields);
+ return Objects.hash(attributes, streamId, time, predefinedFields);
}
@Override
public String toString() {
return "InLongMsgHead{"
+ "attributes=" + attributes
- + ", tid='" + tid + '\''
+ + ", streamId='" + streamId + '\''
+ ", time=" + time
+ ", predefinedFields=" + predefinedFields
+ '}';
diff --git
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMetadata.java
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMetadata.java
index f5a1eca427..6e31220e12 100644
---
a/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMetadata.java
+++
b/inlong-sort/sort-formats/format-common/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgMetadata.java
@@ -30,7 +30,7 @@ public class InLongMsgMetadata {
*/
public enum ReadableMetadata {
- TID("metadata-tid", DataTypes.STRING());
+ STREAMID("metadata-streamId", DataTypes.STRING());
final String key;
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
index fa15b20127..165f6532fb 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
@@ -52,7 +52,7 @@ public abstract class AbstractInLongMsgMixedFormatConverter
public abstract List<Row> convertRows(
Map<String, String> attributes,
byte[] data,
- String tid,
+ String streamId,
Timestamp time,
List<String> predefinedFields,
List<String> fields,
@@ -66,14 +66,14 @@ public abstract class AbstractInLongMsgMixedFormatConverter
try {
Map<String, String> attributes =
InLongMsgUtils.getAttributesFromMixedRow(row);
byte[] data = InLongMsgUtils.getDataFromMixedRow(row);
- String tid = InLongMsgUtils.getTidFromMixedRow(row);
+ String streamId = InLongMsgUtils.getStreamIdFromMixedRow(row);
Timestamp time = InLongMsgUtils.getTimeFromMixedRow(row);
List<String> predefinedFields =
InLongMsgUtils.getPredefinedFieldsFromMixedRow(row);
List<String> fields = InLongMsgUtils.getFieldsFromMixedRow(row);
Map<String, String> entries =
InLongMsgUtils.getEntriesFromMixedRow(row);
convertedRows =
- convertRows(attributes, data, tid, time, predefinedFields,
fields, entries);
+ convertRows(attributes, data, streamId, time,
predefinedFields, fields, entries);
} catch (Throwable t) {
String errorMessage =
String.format("Could not properly convert the mixed row.
Row=[%s].", row);
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
index 93e970aa85..779ff6b8e8 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgDecodingFormat.java
@@ -206,7 +206,7 @@ public class InLongMsgDecodingFormat implements
DecodingFormat<DeserializationSc
@Override
public Object read(InLongMsgHead head) {
- return StringData.fromString(head.getTid());
+ return StringData.fromString(head.getStreamId());
}
});
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index 969b7fc19e..131c8099ec 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -62,6 +62,8 @@ public class InLongMsgUtils {
// keys in attributes
public static final String INLONGMSG_ATTR_STREAM_ID = "streamId";
+
+ @Deprecated
public static final String INLONGMSG_ATTR_TID = "tid";
public static final String INLONGMSG_ATTR_TIME_T = "t";
public static final String INLONGMSG_ATTR_TIME_DT = "dt";
@@ -80,7 +82,7 @@ public class InLongMsgUtils {
new String[]{
"attributes",
"data",
- "tid",
+ "streamId",
"time",
"predefinedFields",
"fields",
@@ -254,11 +256,11 @@ public class InLongMsgUtils {
public static Row buildMixedRow(
InLongMsgHead head,
InLongMsgBody body,
- String tid) {
+ String streamId) {
Row row = new Row(7);
row.setField(0, head.getAttributes());
row.setField(1, body.getData());
- row.setField(2, tid);
+ row.setField(2, streamId);
row.setField(3, head.getTime());
row.setField(4, head.getPredefinedFields());
row.setField(5, body.getFields());
@@ -276,7 +278,7 @@ public class InLongMsgUtils {
return (byte[]) row.getField(1);
}
- public static String getTidFromMixedRow(Row row) {
+ public static String getStreamIdFromMixedRow(Row row) {
return (String) row.getField(2);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
index a5d5f8c5d1..e92b2b1217 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatConverter.java
@@ -111,7 +111,7 @@ public class InLongMsgBinlogMixedFormatConverter extends
AbstractInLongMsgMixedF
public List<Row> convertRows(
Map<String, String> attributes,
byte[] data,
- String tid,
+ String streamId,
Timestamp time,
List<String> predefinedFields,
List<String> fields,
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
index 9e09b6c037..59d4959991 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogMixedFormatDeserializer.java
@@ -68,7 +68,7 @@ public class InLongMsgBinlogMixedFormatDeserializer extends
AbstractInLongMsgMix
@Override
protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body)
throws IOException {
- Row row = InLongMsgUtils.buildMixedRow(head, body, head.getTid());
+ Row row = InLongMsgUtils.buildMixedRow(head, body, head.getStreamId());
return Collections.singletonList(row);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 76ad558ea8..e9bb0a5a5d 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -112,11 +112,11 @@ public class InLongMsgBinlogUtils {
Map<String, String> attributes = parseAttr(attr);
// Extracts interface from the attributes.
- String tid;
+ String streamId;
if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
- tid = attributes.get(INLONGMSG_ATTR_STREAM_ID);
+ streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
} else if (attributes.containsKey(INLONGMSG_ATTR_TID)) {
- tid = attributes.get(INLONGMSG_ATTR_TID);
+ streamId = attributes.get(INLONGMSG_ATTR_TID);
} else {
throw new IllegalArgumentException(
"Could not find " + INLONGMSG_ATTR_STREAM_ID
@@ -127,7 +127,7 @@ public class InLongMsgBinlogUtils {
Timestamp time =
parseEpochTime(Long.toString(System.currentTimeMillis()));
List<String> predefinedFields = getPredefinedFields(attributes);
- return new InLongMsgHead(attributes, tid, time, predefinedFields);
+ return new InLongMsgHead(attributes, streamId, time, predefinedFields);
}
public static InLongMsgBody parseBody(byte[] bytes) {
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
index f7add9121a..c0d4d1de62 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatConverter.java
@@ -92,7 +92,7 @@ public class InLongMsgCsvMixedFormatConverter extends
AbstractInLongMsgMixedForm
public List<Row> convertRows(
Map<String, String> attributes,
byte[] data,
- String tid,
+ String streamId,
Timestamp time,
List<String> predefinedFields,
List<String> fields,
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
index a0f1522ab9..194ca2fa85 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvMixedFormatDeserializer.java
@@ -138,7 +138,7 @@ public final class InLongMsgCsvMixedFormatDeserializer
extends AbstractInLongMsg
@Override
protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) {
- Row row = InLongMsgUtils.buildMixedRow(head, body, head.getTid());
+ Row row = InLongMsgUtils.buildMixedRow(head, body, head.getStreamId());
return Collections.singletonList(row);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
index ac17f3a444..cad6ae9dc0 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-csv/src/main/java/org/apache/inlong/sort/formats/inlongmsgcsv/InLongMsgCsvUtils.java
@@ -59,12 +59,12 @@ public class InLongMsgCsvUtils {
Map<String, String> attributes = parseAttr(attr);
// Extracts interface from the attributes.
- String tid;
+ String streamId;
if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
- tid = attributes.get(INLONGMSG_ATTR_STREAM_ID);
+ streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
} else if (attributes.containsKey(INLONGMSG_ATTR_TID)) {
- tid = attributes.get(INLONGMSG_ATTR_TID);
+ streamId = attributes.get(INLONGMSG_ATTR_TID);
} else {
throw new IllegalArgumentException(
"Could not find " + INLONGMSG_ATTR_STREAM_ID
@@ -90,7 +90,7 @@ public class InLongMsgCsvUtils {
// Extracts predefined fields from the attributes
List<String> predefinedFields = getPredefinedFields(attributes);
- return new InLongMsgHead(attributes, tid, time, predefinedFields);
+ return new InLongMsgHead(attributes, streamId, time, predefinedFields);
}
public static List<InLongMsgBody> parseBodyList(
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
index a46fa78459..efc1c7642c 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatConverter.java
@@ -91,7 +91,7 @@ public class InLongMsgKvMixedFormatConverter extends
AbstractInLongMsgMixedForma
public List<Row> convertRows(
Map<String, String> attributes,
byte[] data,
- String tid,
+ String streamId,
Timestamp time,
List<String> predefinedFields,
List<String> fields,
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
index 34e67780a3..cceefd517e 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvMixedFormatDeserializer.java
@@ -148,7 +148,7 @@ public final class InLongMsgKvMixedFormatDeserializer
@Override
protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body) {
- Row row = InLongMsgUtils.buildMixedRow(head, body, head.getTid());
+ Row row = InLongMsgUtils.buildMixedRow(head, body, head.getStreamId());
return Collections.singletonList(row);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
index 9964429c2e..b25115fdfe 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-kv/src/main/java/org/apache/inlong/sort/formats/inlongmsgkv/InLongMsgKvUtils.java
@@ -52,11 +52,11 @@ public class InLongMsgKvUtils {
public static InLongMsgHead parseHead(String attr) {
Map<String, String> attributes = parseAttr(attr);
- String tid;
+ String streamId;
if (attributes.containsKey(INLONGMSG_ATTR_STREAM_ID)) {
- tid = attributes.get(INLONGMSG_ATTR_STREAM_ID);
+ streamId = attributes.get(INLONGMSG_ATTR_STREAM_ID);
} else if (attributes.containsKey(INLONGMSG_ATTR_TID)) {
- tid = attributes.get(INLONGMSG_ATTR_TID);
+ streamId = attributes.get(INLONGMSG_ATTR_TID);
} else {
throw new IllegalArgumentException(
"Could not find " + INLONGMSG_ATTR_STREAM_ID +
@@ -78,7 +78,7 @@ public class InLongMsgKvUtils {
List<String> predefinedFields = getPredefinedFields(attributes);
- return new InLongMsgHead(attributes, tid, time, predefinedFields);
+ return new InLongMsgHead(attributes, streamId, time, predefinedFields);
}
public static List<InLongMsgBody> parseBodyList(
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
index da210ed2ce..d97bb341da 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatConverter.java
@@ -93,7 +93,7 @@ public class InLongMsgTlogCsvMixedFormatConverter extends
AbstractInLongMsgMixed
public List<Row> convertRows(
Map<String, String> attributes,
byte[] data,
- String tid,
+ String streamId,
Timestamp time,
List<String> predefinedFields,
List<String> fields,
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
index ca0f0b9f86..46723f058c 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvMixedFormatDeserializer.java
@@ -110,7 +110,7 @@ public final class InLongMsgTlogCsvMixedFormatDeserializer
@Override
protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body)
throws Exception {
- Row row = InLongMsgUtils.buildMixedRow(head, body, body.getTid());
+ Row row = InLongMsgUtils.buildMixedRow(head, body, body.getStreamId());
return Collections.singletonList(row);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
index 8a5ef167a2..6683e08f5e 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogcsv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogcsv/InLongMsgTlogCsvUtils.java
@@ -89,11 +89,11 @@ public class InLongMsgTlogCsvUtils {
String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar);
- String tid = segments[0];
+ String streamId = segments[0];
List<String> fields =
Arrays.stream(segments, 1,
segments.length).collect(Collectors.toList());
- return new InLongMsgBody(bytes, tid, fields, Collections.emptyMap());
+ return new InLongMsgBody(bytes, streamId, fields,
Collections.emptyMap());
}
/**
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
index 2892dfc755..560d571685 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatConverter.java
@@ -93,7 +93,7 @@ public class InLongMsgTlogKvMixedFormatConverter extends
AbstractInLongMsgMixedF
public List<Row> convertRows(
Map<String, String> attributes,
byte[] data,
- String tid,
+ String streamId,
Timestamp time,
List<String> predefinedFields,
List<String> fields,
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
index bccba0276f..84a6823e35 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvMixedFormatDeserializer.java
@@ -143,7 +143,7 @@ public final class InLongMsgTlogKvMixedFormatDeserializer
@Override
protected List<Row> convertRows(InLongMsgHead head, InLongMsgBody body)
throws Exception {
- Row row = InLongMsgUtils.buildMixedRow(head, body, body.getTid());
+ Row row = InLongMsgUtils.buildMixedRow(head, body, body.getStreamId());
return Collections.singletonList(row);
}
diff --git
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
index a039079304..8628d6cb79 100644
---
a/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
+++
b/inlong-sort/sort-formats/format-row/format-inlongmsg-tlogkv/src/main/java/org/apache/inlong/sort/formats/inlongmsgtlogkv/InLongMsgTlogKvUtils.java
@@ -85,7 +85,7 @@ public class InLongMsgTlogKvUtils {
String[] segments = splitCsv(text, delimiter, escapeChar, quoteChar);
- String tid = segments[0];
+ String streamId = segments[0];
Map<String, String> entries;
if (segments.length > 1) {
@@ -94,7 +94,7 @@ public class InLongMsgTlogKvUtils {
entries = Collections.emptyMap();
}
- return new InLongMsgBody(bytes, tid, Collections.emptyList(), entries);
+ return new InLongMsgBody(bytes, streamId, Collections.emptyList(),
entries);
}
/**
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
index fb56e6956a..87168cb15b 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/AbstractInLongMsgMixedFormatConverter.java
@@ -52,7 +52,7 @@ public abstract class AbstractInLongMsgMixedFormatConverter
public abstract List<RowData> convertRowDatas(
Map<String, String> attributes,
byte[] data,
- String tid,
+ String streamId,
Timestamp time,
List<String> predefinedFields,
List<String> fields,
@@ -66,14 +66,14 @@ public abstract class AbstractInLongMsgMixedFormatConverter
try {
Map<String, String> attributes =
InLongMsgUtils.getAttributesFromMixedRowData(rowData);
byte[] data = InLongMsgUtils.getDataFromMixedRowData(rowData);
- String tid = InLongMsgUtils.getTidFromMixedRowData(rowData);
+ String streamId = InLongMsgUtils.getTidFromMixedRowData(rowData);
Timestamp time = InLongMsgUtils.getTimeFromMixedRowData(rowData);
List<String> predefinedFields =
InLongMsgUtils.getPredefinedFieldsFromMixedRowData(rowData);
List<String> fields =
InLongMsgUtils.getFieldsFromMixedRowData(rowData);
Map<String, String> entries =
InLongMsgUtils.getEntriesFromMixedRowData(rowData);
convertedRowDatas =
- convertRowDatas(attributes, data, tid, time,
predefinedFields, fields, entries);
+ convertRowDatas(attributes, data, streamId, time,
predefinedFields, fields, entries);
} catch (Throwable t) {
String errorMessage =
String.format("Could not properly convert the mixed row.
Row=[%s].", rowData);
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
index 50b0c0e0b0..ce0b5c9dbe 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-base/src/main/java/org/apache/inlong/sort/formats/inlongmsg/InLongMsgUtils.java
@@ -51,7 +51,7 @@ import java.util.stream.Stream;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_ATTRIBUTE_FIELD_NAME;
import static
org.apache.inlong.sort.formats.base.TableFormatConstants.FORMAT_TIME_FIELD_NAME;
import static
org.apache.inlong.sort.formats.base.TableFormatUtils.validateSchema;
-import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMetadata.ReadableMetadata.TID;
+import static
org.apache.inlong.sort.formats.inlongmsg.InLongMsgMetadata.ReadableMetadata.STREAMID;
import static org.apache.inlong.sort.formats.util.StringUtils.splitKv;
/**
@@ -63,8 +63,11 @@ public class InLongMsgUtils {
public static final char INLONGMSG_ATTR_KV_DELIMITER = '=';
// keys in attributes
+ @Deprecated
public static final String INLONGMSG_ATTR_INTERFACE_NAME = "iname";
+ @Deprecated
public static final String INLONGMSG_ATTR_INTERFACE_ID = "id";
+ @Deprecated
public static final String INLONGMSG_ATTR_INTERFACE_TID = "tid";
public static final String INLONGMSG_ATTR_STREAMID = "streamId";
public static final String INLONGMSG_ATTR_TIME_T = "t";
@@ -85,7 +88,7 @@ public class InLongMsgUtils {
String[] fieldNames = new String[]{
"attributes",
"data",
- "tid",
+ "streamId",
"time",
"predefinedFields",
"fields",
@@ -357,8 +360,8 @@ public class InLongMsgUtils {
}
for (int j = 0; j < metadataKeys.size(); j++) {
- if (metadataKeys.get(j).equals(TID.getKey())) {
- producedRow.setField(physicalArity + j,
StringData.fromString(head.getTid()));
+ if (metadataKeys.get(j).equals(STREAMID.getKey())) {
+ producedRow.setField(physicalArity + j,
StringData.fromString(head.getStreamId()));
}
}
diff --git
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
index 50b82493b8..6357482946 100644
---
a/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
+++
b/inlong-sort/sort-formats/format-rowdata/format-inlongmsg-rowdata-binlog/src/main/java/org/apache/inlong/sort/formats/inlongmsgbinlog/InLongMsgBinlogUtils.java
@@ -72,13 +72,13 @@ public class InLongMsgBinlogUtils {
Map<String, String> attributes = parseAttr(attr);
// Extracts interface from the attributes.
- String tid;
+ String streamId;
if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_NAME)) {
- tid = attributes.get(INLONGMSG_ATTR_INTERFACE_NAME);
+ streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_NAME);
} else if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_ID)) {
- tid = attributes.get(INLONGMSG_ATTR_INTERFACE_ID);
+ streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_ID);
} else if (attributes.containsKey(INLONGMSG_ATTR_INTERFACE_TID)) {
- tid = attributes.get(INLONGMSG_ATTR_INTERFACE_TID);
+ streamId = attributes.get(INLONGMSG_ATTR_INTERFACE_TID);
} else {
throw new IllegalArgumentException(
"Could not find " + INLONGMSG_ATTR_INTERFACE_NAME +
@@ -90,7 +90,7 @@ public class InLongMsgBinlogUtils {
Timestamp time =
parseEpochTime(Long.toString(System.currentTimeMillis()));
List<String> predefinedFields = getPredefinedFields(attributes);
- return new InLongMsgHead(attributes, tid, time, predefinedFields);
+ return new InLongMsgHead(attributes, streamId, time, predefinedFields);
}
public static InLongMsgBody parseBody(byte[] bytes) {
diff --git
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
index 5a5fc39ad8..2e34c13330 100644
---
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
+++
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/Tubemq.java
@@ -29,7 +29,7 @@ import java.util.Map;
import static
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_GROUP;
import static
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_MASTER;
import static
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_PROPERTIES;
-import static
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TIDS;
+import static
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_STREAMIDS;
import static
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TOPIC;
import static
org.apache.flink.connectors.tubemq.TubemqValidator.CONNECTOR_TYPE_VALUE_TUBEMQ;
import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -52,7 +52,7 @@ public class Tubemq extends ConnectorDescriptor {
private String group;
@Nullable
- private String tids;
+ private String streamIds;
@Nonnull
private Map<String, String> properties;
@@ -110,13 +110,13 @@ public class Tubemq extends ConnectorDescriptor {
}
/**
- * The tubemq consumers use these tids to filter records reading from
server.
+ * The tubemq consumers use these streamIds to filter records reading from
server.
*
- * @param tids The filter for consume record from server.
+ * @param streamIds The filter for consume record from server.
*/
- public Tubemq tids(String tids) {
+ public Tubemq streamIds(String streamIds) {
- this.tids = tids;
+ this.streamIds = streamIds;
return this;
}
@@ -150,8 +150,8 @@ public class Tubemq extends ConnectorDescriptor {
descriptorProperties.putString(CONNECTOR_GROUP, group);
}
- if (tids != null) {
- descriptorProperties.putString(CONNECTOR_TIDS, tids);
+ if (streamIds != null) {
+ descriptorProperties.putString(CONNECTOR_STREAMIDS, streamIds);
}
}
diff --git
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
index a827ae9444..e94fbf86c5 100644
---
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
+++
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqOptions.java
@@ -30,10 +30,10 @@ public class TubemqOptions {
.noDefaultValue()
.withDescription("The session key for this consumer group
at startup.");
- public static final ConfigOption<String> TID =
- ConfigOptions.key("topic.tid")
+ public static final ConfigOption<String> STREAM_ID =
+ ConfigOptions.key("topic.streamId")
.noDefaultValue()
- .withDescription("The tid owned this topic.");
+ .withDescription("The streamId owned this topic.");
public static final ConfigOption<Integer> MAX_RETRIES =
ConfigOptions.key("max.retries")
diff --git
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
index cf8c042649..fabd43af6e 100644
---
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
+++
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSinkFunction.java
@@ -60,9 +60,9 @@ public class TubemqSinkFunction<T> extends
RichSinkFunction<T> implements Checkp
private final String topic;
/**
- * The tid of this topic
+ * The streamId of this topic
*/
- private final String tid;
+ private final String streamId;
/**
* The serializer for the records sent to pulsar.
*/
@@ -99,7 +99,7 @@ public class TubemqSinkFunction<T> extends
RichSinkFunction<T> implements Checkp
this.topic = topic;
this.masterAddress = masterAddress;
this.serializationSchema = serializationSchema;
- this.tid = configuration.getString(TubemqOptions.TID);
+ this.streamId = configuration.getString(TubemqOptions.STREAM_ID);
this.maxRetries = configuration.getInteger(MAX_RETRIES);
}
@@ -136,10 +136,10 @@ public class TubemqSinkFunction<T> extends
RichSinkFunction<T> implements Checkp
try {
byte[] body = serializationSchema.serialize(in);
Message message = new Message(topic, body);
- if (StringUtils.isNotBlank(tid)) {
+ if (StringUtils.isNotBlank(streamId)) {
SimpleDateFormat sdf = new
SimpleDateFormat(SYSTEM_HEADER_TIME_FORMAT);
long currTimeMillis = System.currentTimeMillis();
- message.putSystemHeader(tid, sdf.format(new
Date(currTimeMillis)));
+ message.putSystemHeader(streamId, sdf.format(new
Date(currTimeMillis)));
}
MessageSentResult sendResult = producer.sendMessage(message);
diff --git
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
index ab15a835c5..4f9fc050a9 100644
---
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
+++
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqSourceFunction.java
@@ -82,9 +82,9 @@ public class TubemqSourceFunction<T>
private final String topic;
/**
- * The tubemq consumers use this tid set to filter records reading from
server.
+ * The tubemq consumers use this streamId set to filter records reading
from server.
*/
- private final TreeSet<String> tidSet;
+ private final TreeSet<String> streamIdSet;
/**
* The consumer group name.
@@ -150,7 +150,7 @@ public class TubemqSourceFunction<T>
*
* @param masterAddress the master address of TubeMQ
* @param topic the topic name
- * @param tidSet the topic's filter condition items
+ * @param streamIdSet the topic's filter condition items
* @param consumerGroup the consumer group name
* @param deserializationSchema the deserialize schema
* @param configuration the configure
@@ -158,7 +158,7 @@ public class TubemqSourceFunction<T>
public TubemqSourceFunction(
String masterAddress,
String topic,
- TreeSet<String> tidSet,
+ TreeSet<String> streamIdSet,
String consumerGroup,
DeserializationSchema<T> deserializationSchema,
Configuration configuration) {
@@ -166,8 +166,8 @@ public class TubemqSourceFunction<T>
"The master address must not be null.");
checkNotNull(topic,
"The topic must not be null.");
- checkNotNull(tidSet,
- "The tid set must not be null.");
+ checkNotNull(streamIdSet,
+ "The streamId set must not be null.");
checkNotNull(consumerGroup,
"The consumer group must not be null.");
checkNotNull(deserializationSchema,
@@ -177,7 +177,7 @@ public class TubemqSourceFunction<T>
this.masterAddress = masterAddress;
this.topic = topic;
- this.tidSet = tidSet;
+ this.streamIdSet = streamIdSet;
this.consumerGroup = consumerGroup;
this.deserializationSchema = deserializationSchema;
@@ -235,7 +235,7 @@ public class TubemqSourceFunction<T>
messagePullConsumer =
messageSessionFactory.createPullConsumer(consumerConfig);
messagePullConsumer
- .subscribe(topic, tidSet);
+ .subscribe(topic, streamIdSet);
messagePullConsumer
.completeSubscribe(sessionKey, numTasks, true, currentOffsets);
diff --git
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
index 79520df593..934a441687 100644
---
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
+++
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSource.java
@@ -90,9 +90,9 @@ public class TubemqTableSource
private final String topic;
/**
- * The TubeMQ tid filter collection.
+ * The TubeMQ streamId filter collection.
*/
- private final TreeSet<String> tidSet;
+ private final TreeSet<String> streamIdSet;
/**
* The TubeMQ consumer group name.
@@ -114,7 +114,7 @@ public class TubemqTableSource
* @param fieldMapping the field map information
* @param masterAddress the master address
* @param topic the topic name
- * @param tidSet the topic's filter condition items
+ * @param streamIdSet the topic's filter condition items
* @param consumerGroup the consumer group
* @param configuration the configure
*/
@@ -126,7 +126,7 @@ public class TubemqTableSource
Map<String, String> fieldMapping,
String masterAddress,
String topic,
- TreeSet<String> tidSet,
+ TreeSet<String> streamIdSet,
String consumerGroup,
Configuration configuration) {
checkNotNull(deserializationSchema,
@@ -139,8 +139,8 @@ public class TubemqTableSource
"The master address must not be null.");
checkNotNull(topic,
"The topic must not be null.");
- checkNotNull(tidSet,
- "The tid set must not be null.");
+ checkNotNull(streamIdSet,
+ "The streamId set must not be null.");
checkNotNull(consumerGroup,
"The consumer group must not be null.");
checkNotNull(configuration,
@@ -151,7 +151,7 @@ public class TubemqTableSource
this.fieldMapping = fieldMapping;
this.masterAddress = masterAddress;
this.topic = topic;
- this.tidSet = tidSet;
+ this.streamIdSet = streamIdSet;
this.consumerGroup = consumerGroup;
this.configuration = configuration;
@@ -189,7 +189,7 @@ public class TubemqTableSource
new TubemqSourceFunction<>(
masterAddress,
topic,
- tidSet,
+ streamIdSet,
consumerGroup,
deserializationSchema,
configuration);
diff --git
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
index 60efb5a117..e28e85c1b7 100644
---
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
+++
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqTableSourceSinkFactory.java
@@ -92,7 +92,7 @@ public class TubemqTableSourceSinkFactory
properties.add(TubemqValidator.CONNECTOR_TOPIC);
properties.add(TubemqValidator.CONNECTOR_MASTER);
properties.add(TubemqValidator.CONNECTOR_GROUP);
- properties.add(TubemqValidator.CONNECTOR_TIDS);
+ properties.add(TubemqValidator.CONNECTOR_STREAMIDS);
properties.add(TubemqValidator.CONNECTOR_PROPERTIES + ".*");
// schema
@@ -145,16 +145,16 @@ public class TubemqTableSourceSinkFactory
descriptorProperties.getString(TubemqValidator.CONNECTOR_MASTER);
final String consumerGroup =
descriptorProperties.getString(TubemqValidator.CONNECTOR_GROUP);
- final String tids =
+ final String streamIds =
descriptorProperties
- .getOptionalString(TubemqValidator.CONNECTOR_TIDS)
+ .getOptionalString(TubemqValidator.CONNECTOR_STREAMIDS)
.orElse(null);
final Configuration configuration =
getConfiguration(descriptorProperties);
- TreeSet<String> tidSet = new TreeSet<>();
- if (tids != null) {
- tidSet.addAll(Arrays.asList(tids.split(SPLIT_COMMA)));
+ TreeSet<String> streamIdSet = new TreeSet<>();
+ if (streamIds != null) {
+ streamIdSet.addAll(Arrays.asList(streamIds.split(SPLIT_COMMA)));
}
return new TubemqTableSource(
@@ -165,7 +165,7 @@ public class TubemqTableSourceSinkFactory
fieldMapping,
masterAddress,
topic,
- tidSet,
+ streamIdSet,
consumerGroup,
configuration);
}
diff --git
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
index 1373c99d37..bddaec4eea 100644
---
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
+++
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/main/java/org/apache/flink/connectors/tubemq/TubemqValidator.java
@@ -46,9 +46,9 @@ public class TubemqValidator extends
ConnectorDescriptorValidator {
public static final String CONNECTOR_GROUP = "connector.group";
/**
- * The tubemq consumers use these tids to filter records reading from
server.
+ * The tubemq consumers use these streamIds to filter records reading from
server.
*/
- public static final String CONNECTOR_TIDS = "connector.tids";
+ public static final String CONNECTOR_STREAMIDS = "connector.stream-ids";
/**
* The prefix of tubemq properties (optional).
@@ -71,7 +71,7 @@ public class TubemqValidator extends
ConnectorDescriptorValidator {
// Validate that the group name is set.
properties.validateString(CONNECTOR_GROUP, false, 1,
Integer.MAX_VALUE);
- // Validate that the tids is set.
- properties.validateString(CONNECTOR_TIDS, true, 1, Integer.MAX_VALUE);
+ // Validate that the streamIds is set.
+ properties.validateString(CONNECTOR_STREAMIDS, true, 1,
Integer.MAX_VALUE);
}
}
diff --git
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
index dc046e171a..7f0e68def3 100644
---
a/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
+++
b/inlong-tubemq/tubemq-connectors/tubemq-connector-flink/src/test/java/org/apache/flink/connectors/tubemq/TubemqTest.java
@@ -53,7 +53,7 @@ public class TubemqTest extends DescriptorTestBase {
.topic("test-topic-3")
.master("localhost:9001")
.group("test-group-3")
- .tids("test-tid-1,test-tid-2");
+ .streamIds("test-streamId-1,test-streamId-2");
return Arrays.asList(descriptor1, descriptor2, descriptor3);
}
@@ -80,7 +80,7 @@ public class TubemqTest extends DescriptorTestBase {
props3.put("connector.type", "tubemq");
props3.put("connector.master", "localhost:9001");
props3.put("connector.topic", "test-topic-3");
- props3.put("connector.tids", "test-tid-1,test-tid-2");
+ props3.put("connector.stream-ids", "test-streamId-1,test-streamId-2");
props3.put("connector.group", "test-group-3");
return Arrays.asList(props1, props2, props3);