This is an automated email from the ASF dual-hosted git repository.

zixuan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 3f12269d8d9 [improve][io] Support update subscription position for 
sink connector (#23538)
3f12269d8d9 is described below

commit 3f12269d8d99e514cf48ef6d57fc3928d37b3646
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Nov 5 11:40:39 2024 +0800

    [improve][io] Support update subscription position for sink connector 
(#23538)
---
 .../org/apache/pulsar/functions/utils/SinkConfigUtils.java |  4 +++-
 .../apache/pulsar/functions/utils/SinkConfigUtilsTest.java | 14 ++++++++++++++
 2 files changed, 17 insertions(+), 1 deletion(-)

diff --git 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 6631c053fac..65b6b97fc6e 100644
--- 
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++ 
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -724,7 +724,9 @@ public class SinkConfigUtils {
         if (newConfig.getTransformFunctionConfig() != null) {
             
mergedConfig.setTransformFunctionConfig(newConfig.getTransformFunctionConfig());
         }
-
+        if (newConfig.getSourceSubscriptionPosition() != null) {
+            
mergedConfig.setSourceSubscriptionPosition(newConfig.getSourceSubscriptionPosition());
+        }
         return mergedConfig;
     }
 
diff --git 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 5c2b6d92b93..c4c79a635ea 100644
--- 
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++ 
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -38,6 +38,7 @@ import java.util.Map;
 import lombok.Data;
 import lombok.NoArgsConstructor;
 import lombok.experimental.Accessors;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
 import org.apache.pulsar.common.functions.ConsumerConfig;
 import org.apache.pulsar.common.functions.FunctionConfig;
 import org.apache.pulsar.common.functions.Resources;
@@ -224,6 +225,18 @@ public class SinkConfigUtilsTest {
         }
     }
 
+    @Test
+    public void testUpdateSubscriptionPosition() {
+        SinkConfig sinkConfig = createSinkConfig();
+        SinkConfig newSinkConfig = createSinkConfig();
+        
newSinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Earliest);
+        SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig, 
newSinkConfig);
+        assertEquals(
+                new Gson().toJson(newSinkConfig),
+                new Gson().toJson(mergedConfig)
+        );
+    }
+
     @Test
     public void testMergeEqual() {
         SinkConfig sinkConfig = createSinkConfig();
@@ -565,6 +578,7 @@ public class SinkConfigUtilsTest {
         inputSpecs.put("test-input", 
ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
         sinkConfig.setInputSpecs(inputSpecs);
         
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+        
sinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Latest);
         sinkConfig.setRetainOrdering(false);
         sinkConfig.setRetainKeyOrdering(false);
         sinkConfig.setConfigs(new HashMap<>());

Reply via email to