This is an automated email from the ASF dual-hosted git repository.

luchunliang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new 9bed15f82a [INLONG-12007][Sort] SortPulsar supports topic 
concatenation (#12008)
9bed15f82a is described below

commit 9bed15f82acd6babb207fbf9dc7d056ad6b524ad
Author: ChunLiang Lu <[email protected]>
AuthorDate: Wed Sep 24 20:04:57 2025 +0800

    [INLONG-12007][Sort] SortPulsar supports topic concatenation (#12008)
---
 .../inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java    | 11 ++++++++++-
 1 file changed, 10 insertions(+), 1 deletion(-)

diff --git 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
index dcdb2c8aa7..6dc48d2511 100644
--- 
a/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
+++ 
b/inlong-sort-standalone/sort-standalone-source/src/main/java/org/apache/inlong/sort/standalone/sink/pulsar/PulsarIdConfig.java
@@ -46,6 +46,7 @@ public class PulsarIdConfig extends IdConfig {
     public static final String KEY_DATA_TYPE = "dataType";
     public static final String KEY_SEPARATOR = "separator";
     public static final String DEFAULT_SEPARATOR = "|";
+    public static final String PERSISTENT_KEY = "persistent";
 
     private static final String DEFAULT_INLONG_STREAM = "1";
 
@@ -80,11 +81,19 @@ public class PulsarIdConfig extends IdConfig {
             dataType = DataTypeEnum.PB;
         }
 
+        String rawTopic = sinkConfig.getTopic();
+        if (rawTopic != null) {
+            if (!rawTopic.startsWith(PERSISTENT_KEY)) {
+                String pulsarTenant = sinkConfig.getPulsarTenant();
+                String namespace = sinkConfig.getNamespace();
+                rawTopic = String.format("%s://%s/%s/%s", PERSISTENT_KEY, 
pulsarTenant, namespace, rawTopic);
+            }
+        }
         return PulsarIdConfig.builder()
                 .inlongGroupId(dataFlowConfig.getInlongGroupId())
                 .inlongStreamId(dataFlowConfig.getInlongStreamId())
                 .uid(InlongId.generateUid(dataFlowConfig.getInlongGroupId(), 
dataFlowConfig.getInlongStreamId()))
-                .topic(sinkConfig.getTopic())
+                .topic(rawTopic)
                 .dataType(dataType)
                 .separator(separator)
                 .dataFlowConfig(dataFlowConfig)

Reply via email to