This is an automated email from the ASF dual-hosted git repository.
baodi pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 5b5b96d5d6d [improve][io] Support update subscription position for
sink connector (#23538)
5b5b96d5d6d is described below
commit 5b5b96d5d6d2f1ae81a421f03137278703761b53
Author: Baodi Shi <[email protected]>
AuthorDate: Tue Nov 5 11:40:39 2024 +0800
[improve][io] Support update subscription position for sink connector
(#23538)
(cherry picked from commit 3f12269d8d99e514cf48ef6d57fc3928d37b3646)
---
.../org/apache/pulsar/functions/utils/SinkConfigUtils.java | 4 +++-
.../apache/pulsar/functions/utils/SinkConfigUtilsTest.java | 14 ++++++++++++++
2 files changed, 17 insertions(+), 1 deletion(-)
diff --git
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
index 6631c053fac..65b6b97fc6e 100644
---
a/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
+++
b/pulsar-functions/utils/src/main/java/org/apache/pulsar/functions/utils/SinkConfigUtils.java
@@ -724,7 +724,9 @@ public class SinkConfigUtils {
if (newConfig.getTransformFunctionConfig() != null) {
mergedConfig.setTransformFunctionConfig(newConfig.getTransformFunctionConfig());
}
-
+ if (newConfig.getSourceSubscriptionPosition() != null) {
+
mergedConfig.setSourceSubscriptionPosition(newConfig.getSourceSubscriptionPosition());
+ }
return mergedConfig;
}
diff --git
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
index 5c2b6d92b93..c4c79a635ea 100644
---
a/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
+++
b/pulsar-functions/utils/src/test/java/org/apache/pulsar/functions/utils/SinkConfigUtilsTest.java
@@ -38,6 +38,7 @@ import java.util.Map;
import lombok.Data;
import lombok.NoArgsConstructor;
import lombok.experimental.Accessors;
+import org.apache.pulsar.client.api.SubscriptionInitialPosition;
import org.apache.pulsar.common.functions.ConsumerConfig;
import org.apache.pulsar.common.functions.FunctionConfig;
import org.apache.pulsar.common.functions.Resources;
@@ -224,6 +225,18 @@ public class SinkConfigUtilsTest {
}
}
+ @Test
+ public void testUpdateSubscriptionPosition() {
+ SinkConfig sinkConfig = createSinkConfig();
+ SinkConfig newSinkConfig = createSinkConfig();
+
newSinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Earliest);
+ SinkConfig mergedConfig = SinkConfigUtils.validateUpdate(sinkConfig,
newSinkConfig);
+ assertEquals(
+ new Gson().toJson(newSinkConfig),
+ new Gson().toJson(mergedConfig)
+ );
+ }
+
@Test
public void testMergeEqual() {
SinkConfig sinkConfig = createSinkConfig();
@@ -565,6 +578,7 @@ public class SinkConfigUtilsTest {
inputSpecs.put("test-input",
ConsumerConfig.builder().isRegexPattern(true).serdeClassName("test-serde").build());
sinkConfig.setInputSpecs(inputSpecs);
sinkConfig.setProcessingGuarantees(FunctionConfig.ProcessingGuarantees.ATLEAST_ONCE);
+
sinkConfig.setSourceSubscriptionPosition(SubscriptionInitialPosition.Latest);
sinkConfig.setRetainOrdering(false);
sinkConfig.setRetainKeyOrdering(false);
sinkConfig.setConfigs(new HashMap<>());