This is an automated email from the ASF dual-hosted git repository.
luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 9bed15f82a [INLONG-12007][Sort] SortPulsar supports topic
concatenation (#12008)
9bed15f82a is described below
commit 9bed15f82acd6babb207fbf9dc7d056ad6b524ad
Author: ChunLiang Lu <[email protected]>
AuthorDate: Wed Sep 24 20:04:57 2025 +0800
[INLONG-12007][Sort] SortPulsar supports topic concatenation (#12008)
---
.../inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java | 11 ++++++++++-
1 file changed, 10 insertions(+), 1 deletion(-)
diff --git
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index dcdb2c8aa7..6dc48d2511 100644
---
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -46,6 +46,7 @@ public class PulsarIdConfig extends IdConfig {
public static final String KEY_DATA_TYPE = "dataType";
public static final String KEY_SEPARATOR = "separator";
public static final String DEFAULT_SEPARATOR = "|";
+ public static final String PERSISTENT_KEY = "persistent";
private static final String DEFAULT_INLONG_STREAM = "1";
@@ -80,11 +81,19 @@ public class PulsarIdConfig extends IdConfig {
dataType = DataTypeEnum.PB;
}
+ String rawTopic = sinkConfig.getTopic();
+ if (rawTopic != null) {
+ if (!rawTopic.startsWith(PERSISTENT_KEY)) {
+ String pulsarTenant = sinkConfig.getPulsarTenant();
+ String namespace = sinkConfig.getNamespace();
+ rawTopic = String.format("%s://%s/%s/%s", PERSISTENT_KEY,
pulsarTenant, namespace, rawTopic);
+ }
+ }
return PulsarIdConfig.builder()
.inlongGroupId(dataFlowConfig.getInlongGroupId())
.inlongStreamId(dataFlowConfig.getInlongStreamId())
.uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(),
dataFlowConfig.getInlongStreamId()))
- .topic(sinkConfig.getTopic())
+ .topic(rawTopic)
.dataType(dataType)
.separator(separator)
.dataFlowConfig(dataFlowConfig)