This is an automated email from the ASF dual-hosted git repository.

cbqiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 851b39ee fix data loss after failover (#633)
851b39ee is described below

commit 851b39ee28f8dafc26427108421dc5b4d4c269f5
Author: chzhoo <[email protected]>
AuthorDate: Wed Nov 12 19:48:59 2025 +0800

    fix data loss after failover (#633)
---
 .../apache/geaflow/dsl/connector/api/function/OffsetStore.java   | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/function/OffsetStore.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/function/OffsetStore.java
index 413d2b11..31c20c14 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/function/OffsetStore.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/function/OffsetStore.java
@@ -85,9 +85,14 @@ public class OffsetStore {
         storeContext.withKeySerializer(new DefaultKVSerializer(String.class, 
String.class));
         jsonOffsetStore.init(storeContext);
 
-        this.bucketNum = 2 * 
runtimeContext.getConfiguration().getLong(FrameworkConfigKeys.BATCH_NUMBER_PER_CHECKPOINT);
+        long bucketNum = 2 * 
runtimeContext.getConfiguration().getLong(FrameworkConfigKeys.BATCH_NUMBER_PER_CHECKPOINT);
+        long streamFlyingNum = 
runtimeContext.getConfiguration().getInteger(FrameworkConfigKeys.STREAMING_FLYING_BATCH_NUM)
 + 1;
+        if (bucketNum < streamFlyingNum) {
+            bucketNum = streamFlyingNum;
+        }
+        this.bucketNum = bucketNum;
         this.kvStoreCache = new HashMap<>();
-        LOGGER.info("init offset store, store type is: {}", backendType);
+        LOGGER.info("init offset store, store type is: {}, bucket num is: {}", 
backendType, this.bucketNum);
     }
 
     public Offset readOffset(String partitionName, long batchId) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to