This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e1e2c3ba85c [DebeziumIO] Implement startOffset & offset persistence 
for Kafka (#28248) (#37750)
e1e2c3ba85c is described below

commit e1e2c3ba85c05d71dddf3d3dabbe8d2b0f8565c7
Author: Tobias Kaymak <[email protected]>
AuthorDate: Mon Mar 16 15:48:41 2026 +0100

    [DebeziumIO] Implement startOffset & offset persistence for Kafka (#28248) 
(#37750)
    
    * [DebeziumIO] Implement offsetRetainer for Kafka (#28248)
    
    * Adjust to review comments and add unit tests
    
    * Address Gemini review comment & update CHANGES.md
---
 CHANGES.md                                         |   1 +
 sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go   |  40 +++++
 .../org/apache/beam/io/debezium/DebeziumIO.java    |  76 ++++++++++
 .../io/debezium/DebeziumTransformRegistrar.java    |  38 +++++
 .../beam/io/debezium/FileSystemOffsetRetainer.java | 166 +++++++++++++++++++++
 .../beam/io/debezium/KafkaSourceConsumerFn.java    |  26 +++-
 .../apache/beam/io/debezium/OffsetRetainer.java    |  69 +++++++++
 .../apache/beam/io/debezium/DebeziumIOTest.java    | 152 +++++++++++++++++++
 .../io/debezium/FileSystemOffsetRetainerTest.java  | 131 ++++++++++++++++
 sdks/python/apache_beam/io/debezium.py             |  44 +++++-
 10 files changed, 738 insertions(+), 5 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index 5499cb06647..4072a66e07c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
 ## I/Os
 
 * Support for X source added (Java/Python) 
([#X](https://github.com/apache/beam/issues/X)).
+* DebeziumIO (Java): added `OffsetRetainer` interface and 
`FileSystemOffsetRetainer` implementation to persist and restore CDC offsets 
across pipeline restarts, and exposed `withStartOffset` / `withOffsetRetainer` 
on `DebeziumIO.Read` and the cross-language `ReadBuilder` 
([#28248](https://github.com/apache/beam/issues/28248)).
 
 ## New Features / Improvements
 
diff --git a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go 
b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
index 547aba0ceb9..f31a71fbc9d 100644
--- a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
+++ b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
@@ -77,6 +77,8 @@ type readFromDebeziumSchema struct {
        MaxNumberOfRecords   *int64
        MaxTimeToRun         *int64
        ConnectionProperties []string
+       StartOffset          []string
+       OffsetStoragePath    *string
 }
 
 type debeziumConfig struct {
@@ -149,6 +151,44 @@ func ConnectionProperties(cp []string) readOption {
        }
 }
 
+// StartOffset specifies the offset from which the connector should resume 
consuming
+// changes. Each entry must be a "key=value" string, where numeric values are 
encoded
+// as their decimal string representation.
+//
+// Example for PostgreSQL:
+//
+//     debeziumio.StartOffset([]string{"lsn=28160840"})
+//
+// Example for MySQL:
+//
+//     debeziumio.StartOffset([]string{"file=binlog.000001", "pos=156"})
+//
+// Obtain the offset from the output of a previous pipeline run. Numeric 
values such
+// as LSN or binlog position are automatically parsed to Long on the Java side.
+func StartOffset(offset []string) readOption {
+       return func(cfg *debeziumConfig) {
+               cfg.readSchema.StartOffset = offset
+       }
+}
+
+// OffsetStoragePath sets a path where the connector offset is automatically 
saved after each
+// checkpoint and loaded on pipeline startup, allowing the pipeline to resume 
from where it
+// left off without any manual offset management.
+//
+// The path can be on any filesystem supported by the active Beam runner
+// (local disk, GCS, S3, etc.).
+//
+// Example:
+//
+//     
debeziumio.OffsetStoragePath("gs://my-bucket/debezium/orders-offset.json")
+//
+// When set, takes precedence over StartOffset.
+func OffsetStoragePath(path string) readOption {
+       return func(cfg *debeziumConfig) {
+               cfg.readSchema.OffsetStoragePath = &path
+       }
+}
+
 // ExpansionAddr sets the expansion service address to use for DebeziumIO 
cross-langauage transform.
 func ExpansionAddr(expansionAddr string) readOption {
        return func(cfg *debeziumConfig) {
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
index ebf91a4a095..6c31d5a0234 100644
--- 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
@@ -144,6 +144,10 @@ public class DebeziumIO {
 
     abstract @Nullable Long getPollingTimeout();
 
+    abstract @Nullable Map<String, Object> getStartOffset();
+
+    abstract @Nullable OffsetRetainer getOffsetRetainer();
+
     abstract @Nullable Coder<T> getCoder();
 
     abstract Builder<T> toBuilder();
@@ -162,6 +166,10 @@ public class DebeziumIO {
 
       abstract Builder<T> setPollingTimeout(Long miliseconds);
 
+      abstract Builder<T> setStartOffset(Map<String, Object> startOffset);
+
+      abstract Builder<T> setOffsetRetainer(OffsetRetainer retainer);
+
       abstract Read<T> build();
     }
 
@@ -230,6 +238,74 @@ public class DebeziumIO {
       return toBuilder().setPollingTimeout(miliseconds).build();
     }
 
+    /**
+     * Sets a starting offset so the connector resumes consuming changes from 
a previously seen
+     * position rather than from the beginning of the change stream.
+     *
+     * <p>The offset format is connector-specific. You can capture the current 
offset for each
+     * processed record inside your {@link SourceRecordMapper} via {@link
+     * org.apache.kafka.connect.source.SourceRecord#sourceOffset()} and 
persist it externally (for
+     * example in Cloud Storage, a database, or a local file). On the next 
pipeline run, pass the
+     * last saved offset here.
+     *
+     * <p>Example (PostgreSQL):
+     *
+     * <pre>{@code
+     * // Capture the offset inside the SourceRecordMapper:
+     * Map<String, Object> offset = sourceRecord.sourceOffset();
+     * // Persist 'offset' externally, then on restart:
+     * DebeziumIO.read()
+     *     .withConnectorConfiguration(config)
+     *     .withStartOffset(savedOffset)
+     *     .withFormatFunction(myMapper);
+     * }</pre>
+     *
+     * @param startOffset A map representing the resumption point, as returned 
by {@code
+     *     SourceRecord#sourceOffset()}.
+     * @return PTransform {@link #read}
+     */
+    public Read<T> withStartOffset(Map<String, Object> startOffset) {
+      checkArgument(startOffset != null, "startOffset can not be null");
+      return toBuilder().setStartOffset(startOffset).build();
+    }
+
+    /**
+     * Sets an {@link OffsetRetainer} that automatically saves and restores 
the connector offset,
+     * allowing the pipeline to resume from where it left off after a restart 
without any manual
+     * offset management.
+     *
+     * <p>When a retainer is configured:
+     *
+     * <ol>
+     *   <li>At pipeline startup, {@link OffsetRetainer#loadOffset()} is 
called. If a saved offset
+     *       is found, the connector resumes from that position; otherwise it 
starts from the
+     *       beginning of the change stream.
+     *   <li>After each successful checkpoint ({@code task.commit()}), {@link
+     *       OffsetRetainer#saveOffset(Map)} is called with the latest 
committed offset.
+     * </ol>
+     *
+     * <p>The built-in {@link FileSystemOffsetRetainer} persists the offset as 
a JSON file on any
+     * Beam-compatible filesystem (local, GCS, S3, etc.):
+     *
+     * <pre>{@code
+     * DebeziumIO.read()
+     *     .withConnectorConfiguration(config)
+     *     .withOffsetRetainer(
+     *         new 
FileSystemOffsetRetainer("gs://my-bucket/debezium/orders-offset.json"))
+     *     .withFormatFunction(myMapper);
+     * }</pre>
+     *
+     * <p>When both a retainer and {@link #withStartOffset(Map)} are set, the 
retainer takes
+     * precedence. Use {@link #withStartOffset(Map)} alone for a one-time 
manual override.
+     *
+     * @param retainer The {@link OffsetRetainer} to use for loading and 
saving offsets.
+     * @return PTransform {@link #read}
+     */
+    public Read<T> withOffsetRetainer(OffsetRetainer retainer) {
+      checkArgument(retainer != null, "retainer can not be null");
+      return toBuilder().setOffsetRetainer(retainer).build();
+    }
+
     protected Schema getRecordSchema() {
       KafkaSourceConsumerFn<T> fn =
           new 
KafkaSourceConsumerFn<>(getConnectorConfiguration().getConnectorClass().get(), 
this);
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
index 22a34ae2654..4b431ac38ba 100644
--- 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
@@ -18,6 +18,7 @@
 package org.apache.beam.io.debezium;
 
 import com.google.auto.service.AutoService;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
@@ -78,6 +79,8 @@ public class DebeziumTransformRegistrar implements 
ExternalTransformRegistrar {
       private @Nullable List<String> connectionProperties;
       private @Nullable Long maxNumberOfRecords;
       private @Nullable Long maxTimeToRun;
+      private @Nullable List<String> startOffset;
+      private @Nullable String offsetStoragePath;
 
       public void setConnectionProperties(@Nullable List<String> 
connectionProperties) {
         this.connectionProperties = connectionProperties;
@@ -90,6 +93,14 @@ public class DebeziumTransformRegistrar implements 
ExternalTransformRegistrar {
       public void setMaxTimeToRun(@Nullable Long maxTimeToRun) {
         this.maxTimeToRun = maxTimeToRun;
       }
+
+      public void setStartOffset(@Nullable List<String> startOffset) {
+        this.startOffset = startOffset;
+      }
+
+      public void setOffsetStoragePath(@Nullable String offsetStoragePath) {
+        this.offsetStoragePath = offsetStoragePath;
+      }
     }
 
     @Override
@@ -123,6 +134,33 @@ public class DebeziumTransformRegistrar implements 
ExternalTransformRegistrar {
         readTransform = 
readTransform.withMaxTimeToRun(configuration.maxTimeToRun);
       }
 
+      if (configuration.startOffset != null) {
+        Map<String, Object> startOffsetMap = new HashMap<>();
+        for (String property : configuration.startOffset) {
+          String[] parts = property.split("=", 2);
+          if (parts.length != 2) {
+            throw new IllegalArgumentException(
+                "Invalid startOffset entry: \""
+                    + property
+                    + "\". Expected format is \"key=value\".");
+          }
+          String key = parts[0];
+          String value = parts[1];
+          try {
+            startOffsetMap.put(key, Long.parseLong(value));
+          } catch (NumberFormatException e) {
+            startOffsetMap.put(key, value);
+          }
+        }
+        readTransform = readTransform.withStartOffset(startOffsetMap);
+      }
+
+      if (configuration.offsetStoragePath != null) {
+        readTransform =
+            readTransform.withOffsetRetainer(
+                FileSystemOffsetRetainer.of(configuration.offsetStoragePath));
+      }
+
       return readTransform;
     }
   }
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/FileSystemOffsetRetainer.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/FileSystemOffsetRetainer.java
new file mode 100644
index 00000000000..552eccafe3a
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/FileSystemOffsetRetainer.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link OffsetRetainer} that persists the Debezium connector offset as a 
JSON file using Beam's
+ * {@link FileSystems} abstraction.
+ *
+ * <p>The {@code path} argument can point to any filesystem supported by the 
active Beam runner,
+ * including local disk, Google Cloud Storage, Amazon S3, and others
+ *
+ * <p>On every {@code task.commit()}, the latest offset is serialised to JSON 
and written to the
+ * given path (overwriting the previous file). On pipeline startup the file is 
read back and the
+ * connector resumes from the stored position. If the file does not yet exist 
the connector starts
+ * from the beginning of the change stream.
+ *
+ * <p>Example — resume from GCS:
+ *
+ * <pre>{@code
+ * DebeziumIO.read()
+ *     .withConnectorConfiguration(config)
+ *     .withOffsetRetainer(
+ *         
FileSystemOffsetRetainer.of("gs://my-bucket/debezium/orders-offset.json"))
+ *     .withFormatFunction(myMapper);
+ * }</pre>
+ *
+ * <p>Example — local filesystem (useful for testing):
+ *
+ * <pre>{@code
+ * DebeziumIO.read()
+ *     .withConnectorConfiguration(config)
+ *     
.withOffsetRetainer(FileSystemOffsetRetainer.of("/tmp/debezium-offset.json"))
+ *     .withFormatFunction(myMapper);
+ * }</pre>
+ *
+ * <p><b>Note:</b> writes are performed atomically: the offset is first 
written to a {@code .tmp}
+ * sibling file and then renamed to the final path, so a mid-write crash 
leaves the previous offset
+ * intact.
+ */
+public class FileSystemOffsetRetainer implements OffsetRetainer {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemOffsetRetainer.class);
+  private static final TypeReference<Map<String, Object>> MAP_TYPE = new 
TypeReference<>() {};
+
+  private final String path;
+
+  // ObjectMapper is thread-safe after configuration and does not need to be 
serialised.
+  private transient @Nullable ObjectMapper objectMapper;
+
+  // Tracks the last successfully saved offset so repeated identical saves are 
skipped.
+  private transient @Nullable Map<String, Object> lastSavedOffset;
+
+  private FileSystemOffsetRetainer(String path) {
+    this.path = path;
+  }
+
+  /** Creates a new {@code FileSystemOffsetRetainer} that stores the offset at 
{@code path}. */
+  public static FileSystemOffsetRetainer of(String path) {
+    return new FileSystemOffsetRetainer(path);
+  }
+
+  private ObjectMapper mapper() {
+    if (objectMapper == null) {
+      objectMapper = new ObjectMapper();
+    }
+    return objectMapper;
+  }
+
+  /**
+   * Reads the offset JSON file and returns its contents, or {@code null} if 
the file does not yet
+   * exist (first run). Throws {@link RuntimeException} if the file exists but 
cannot be read, to
+   * prevent silently reprocessing data from the beginning.
+   */
+  @Override
+  public @Nullable Map<String, Object> loadOffset() {
+    try {
+      ResourceId resourceId = FileSystems.matchNewResource(path, /* 
isDirectory= */ false);
+      try (ReadableByteChannel channel = FileSystems.open(resourceId);
+          InputStream stream = Channels.newInputStream(channel)) {
+        Map<String, Object> offset = mapper().readValue(stream, MAP_TYPE);
+        LOG.info("OffsetRetainer: loaded offset from {}: {}", path, offset);
+        return offset;
+      }
+    } catch (FileNotFoundException e) {
+      LOG.info("OffsetRetainer: no offset file found at {}; starting from the 
beginning.", path);
+      return null;
+    } catch (IOException e) {
+      throw new RuntimeException(
+          "OffsetRetainer: failed to read offset from "
+              + path
+              + ". "
+              + "Delete the file to restart from the beginning.",
+          e);
+    }
+  }
+
+  /**
+   * Serialises {@code offset} to JSON and writes it atomically to the 
configured path.
+   *
+   * <p>If the offset is identical to the last successfully written one, the 
write is skipped to
+   * avoid unnecessary I/O on every checkpoint.
+   *
+   * <p>Otherwise the data is first written to a {@code .tmp} sibling file and 
then renamed to the
+   * final path, so a mid-write crash leaves the previous offset intact.
+   *
+   * <p>Errors are logged as warnings and swallowed so the pipeline continues.
+   */
+  @Override
+  public void saveOffset(Map<String, Object> offset) {
+    if (offset.equals(lastSavedOffset)) {
+      LOG.debug("OffsetRetainer: offset unchanged, skipping write to {}", 
path);
+      return;
+    }
+    String tmpPath = path + ".tmp";
+    try {
+      ResourceId tmpResourceId = FileSystems.matchNewResource(tmpPath, /* 
isDirectory= */ false);
+      try (WritableByteChannel channel = FileSystems.create(tmpResourceId, 
"application/json");
+          OutputStream stream = Channels.newOutputStream(channel)) {
+        mapper().writeValue(stream, offset);
+      }
+      ResourceId finalResourceId = FileSystems.matchNewResource(path, /* 
isDirectory= */ false);
+      FileSystems.rename(
+          Collections.singletonList(tmpResourceId), 
Collections.singletonList(finalResourceId));
+      lastSavedOffset = offset;
+      LOG.debug("OffsetRetainer: saved offset to {}: {}", path, offset);
+    } catch (IOException e) {
+      LOG.warn(
+          "OffsetRetainer: failed to save offset to {}."
+              + " The offset will be lost if the pipeline restarts.",
+          path,
+          e);
+    }
+  }
+}
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
index fb4c2f21458..d298ddd9caf 100644
--- 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
@@ -122,7 +122,21 @@ public class KafkaSourceConsumerFn<T> extends 
DoFn<Map<String, String>, T> {
   @GetInitialRestriction
   public OffsetHolder getInitialRestriction(@Element Map<String, String> 
unused)
       throws IOException {
-    return new OffsetHolder(null, null, null, spec.getMaxNumberOfRecords(), 
spec.getMaxTimeToRun());
+    Map<String, Object> initialOffset = null;
+
+    // Retainer takes precedence: it reflects the most recently committed 
position.
+    OffsetRetainer retainer = spec.getOffsetRetainer();
+    if (retainer != null) {
+      initialOffset = retainer.loadOffset();
+    }
+
+    // Fall back to the explicit one-time override when the retainer has no 
saved offset.
+    if (initialOffset == null) {
+      initialOffset = spec.getStartOffset();
+    }
+
+    return new OffsetHolder(
+        initialOffset, null, null, spec.getMaxNumberOfRecords(), 
spec.getMaxTimeToRun());
   }
 
   @NewTracker
@@ -284,6 +298,16 @@ public class KafkaSourceConsumerFn<T> extends 
DoFn<Map<String, String>, T> {
           receiver.outputWithTimestamp(json, recordInstant);
         }
         task.commit();
+
+        // Persist the offset after every successful commit so the pipeline 
can resume
+        // from this position on restart.
+        OffsetRetainer retainer = spec.getOffsetRetainer();
+        @SuppressWarnings("unchecked")
+        Map<String, Object> committedOffset =
+            (Map<String, Object>) tracker.currentRestriction().offset;
+        if (retainer != null && committedOffset != null) {
+          retainer.saveOffset(committedOffset);
+        }
       }
     } catch (Exception ex) {
       throw new RuntimeException("Error occurred when consuming changes from 
Database. ", ex);
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/OffsetRetainer.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/OffsetRetainer.java
new file mode 100644
index 00000000000..b1fe5a58bbe
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/OffsetRetainer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Strategy interface for persisting and restoring Debezium connector offsets 
across pipeline
+ * restarts.
+ *
+ * <p>When configured via {@link 
DebeziumIO.Read#withOffsetRetainer(OffsetRetainer)}, the pipeline
+ * behaves as follows:
+ *
+ * <ol>
+ *   <li>On startup, {@link #loadOffset()} is called once. If a non-null 
offset is returned, the
+ *       Debezium connector resumes from that position; otherwise it starts 
from the beginning of
+ *       the change stream.
+ *   <li>After each successful {@code task.commit()}, {@link #saveOffset(Map)} 
is called with the
+ *       latest committed offset.
+ * </ol>
+ *
+ * <p>A ready-to-use filesystem-based implementation is provided by {@link
+ * FileSystemOffsetRetainer}, which supports any Beam-compatible filesystem 
(local, GCS, S3, etc.)
+ *
+ * <p>Implementations must be {@link Serializable} because they are embedded 
inside {@link
+ * DebeziumIO.Read}, which is a {@link 
org.apache.beam.sdk.transforms.PTransform} that gets
+ * serialized and shipped to workers.
+ */
+public interface OffsetRetainer extends Serializable {
+
+  /**
+   * Returns the most recently saved offset, or {@code null} if no offset has 
been saved yet.
+   *
+   * <p>A {@code null} return causes the connector to start from the beginning 
of the change stream.
+   * Implementations should handle transient I/O errors gracefully and return 
{@code null} on
+   * failure rather than propagating an exception.
+   */
+  @Nullable
+  Map<String, Object> loadOffset();
+
+  /**
+   * Persists the given offset so it can be recovered after a pipeline restart.
+   *
+   * <p>Called after each successful {@code task.commit()} with the latest 
committed offset.
+   * Implementations should swallow transient errors rather than throwing, so 
that a failed save
+   * does not terminate the pipeline.
+   *
+   * @param offset The current connector offset, as returned by {@link
+   *     org.apache.kafka.connect.source.SourceRecord#sourceOffset()}.
+   */
+  void saveOffset(Map<String, Object> offset);
+}
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
index 88ecc4fdd90..869fa5d34a5 100644
--- 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
@@ -17,6 +17,9 @@
  */
 package org.apache.beam.io.debezium;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 
@@ -24,9 +27,13 @@ import io.debezium.config.Configuration;
 import io.debezium.connector.mysql.MySqlConnector;
 import io.debezium.connector.mysql.MySqlConnectorConfig;
 import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.Map;
 import org.apache.beam.io.debezium.DebeziumIO.ConnectorConfiguration;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
 import org.apache.kafka.common.config.ConfigValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -95,4 +102,149 @@ public class DebeziumIOTest implements Serializable {
         IllegalArgumentException.class,
         () -> 
MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password));
   }
+
+  @Test
+  public void testReadWithStartOffsetStoresOffset() {
+    Map<String, Object> offset = ImmutableMap.of("file", "mysql-bin.000003", 
"pos", 156L);
+    DebeziumIO.Read<String> read =
+        DebeziumIO.<String>read()
+            .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+            .withStartOffset(offset);
+    assertEquals(offset, read.getStartOffset());
+  }
+
+  @Test
+  public void testReadWithoutStartOffsetIsNull() {
+    DebeziumIO.Read<String> read =
+        
DebeziumIO.<String>read().withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION);
+    assertNull(read.getStartOffset());
+  }
+
+  @Test
+  public void testReadWithNullStartOffsetThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            DebeziumIO.<String>read()
+                .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+                .withStartOffset(null));
+  }
+
+  @Test
+  public void testGetInitialRestrictionUsesStartOffset() throws Exception {
+    Map<String, Object> offset = ImmutableMap.of("file", "mysql-bin.000003", 
"pos", 156L);
+    DebeziumIO.Read<String> spec =
+        DebeziumIO.<String>read()
+            .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+            .withStartOffset(offset);
+    KafkaSourceConsumerFn<String> fn = new 
KafkaSourceConsumerFn<>(MySqlConnector.class, spec);
+    KafkaSourceConsumerFn.OffsetHolder restriction = 
fn.getInitialRestriction(null);
+    assertEquals(offset, restriction.offset);
+  }
+
+  @Test
+  public void testGetInitialRestrictionWithoutStartOffsetIsNull() throws 
Exception {
+    DebeziumIO.Read<String> spec =
+        
DebeziumIO.<String>read().withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION);
+    KafkaSourceConsumerFn<String> fn = new 
KafkaSourceConsumerFn<>(MySqlConnector.class, spec);
+    KafkaSourceConsumerFn.OffsetHolder restriction = 
fn.getInitialRestriction(null);
+    assertNull(restriction.offset);
+  }
+
+  // ---- OffsetRetainer tests -----------------------------------------------
+
+  /** Minimal in-memory retainer used only in tests. */
+  private static class InMemoryOffsetRetainer implements OffsetRetainer {
+    private final @Nullable Map<String, Object> loadResult;
+    private @Nullable Map<String, Object> lastSaved;
+
+    InMemoryOffsetRetainer(@Nullable Map<String, Object> loadResult) {
+      this.loadResult = loadResult;
+    }
+
+    @Override
+    public @Nullable Map<String, Object> loadOffset() {
+      return loadResult;
+    }
+
+    @Override
+    public void saveOffset(Map<String, Object> offset) {
+      lastSaved = new HashMap<>(offset);
+    }
+  }
+
+  @Test
+  public void testWithOffsetRetainerStoresRetainer() {
+    InMemoryOffsetRetainer retainer = new InMemoryOffsetRetainer(null);
+    DebeziumIO.Read<String> read =
+        DebeziumIO.<String>read()
+            .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+            .withOffsetRetainer(retainer);
+    assertNotNull(read.getOffsetRetainer());
+  }
+
+  @Test
+  public void testWithNullOffsetRetainerThrows() {
+    assertThrows(
+        IllegalArgumentException.class,
+        () ->
+            DebeziumIO.<String>read()
+                .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+                .withOffsetRetainer(null));
+  }
+
+  @Test
+  public void testGetInitialRestrictionUsesRetainerOffset() throws Exception {
+    Map<String, Object> savedOffset = ImmutableMap.of("lsn", 28160840L);
+    DebeziumIO.Read<String> spec =
+        DebeziumIO.<String>read()
+            .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+            .withOffsetRetainer(new InMemoryOffsetRetainer(savedOffset));
+    KafkaSourceConsumerFn<String> fn = new 
KafkaSourceConsumerFn<>(MySqlConnector.class, spec);
+    KafkaSourceConsumerFn.OffsetHolder restriction = 
fn.getInitialRestriction(null);
+    assertEquals(savedOffset, restriction.offset);
+  }
+
+  @Test
+  public void testRetainerTakesPriorityOverWithStartOffset() throws Exception {
+    Map<String, Object> retainerOffset = ImmutableMap.of("lsn", 99L);
+    Map<String, Object> explicitOffset = ImmutableMap.of("lsn", 1L);
+    DebeziumIO.Read<String> spec =
+        DebeziumIO.<String>read()
+            .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+            .withStartOffset(explicitOffset)
+            .withOffsetRetainer(new InMemoryOffsetRetainer(retainerOffset));
+    KafkaSourceConsumerFn<String> fn = new 
KafkaSourceConsumerFn<>(MySqlConnector.class, spec);
+    KafkaSourceConsumerFn.OffsetHolder restriction = 
fn.getInitialRestriction(null);
+    assertEquals(retainerOffset, restriction.offset);
+  }
+
+  @Test
+  public void testRetainerFallsBackToWithStartOffsetWhenLoadReturnsNull() 
throws Exception {
+    Map<String, Object> explicitOffset = ImmutableMap.of("lsn", 1L);
+    DebeziumIO.Read<String> spec =
+        DebeziumIO.<String>read()
+            .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+            .withStartOffset(explicitOffset)
+            // Retainer has no saved offset yet (returns null).
+            .withOffsetRetainer(new InMemoryOffsetRetainer(null));
+    KafkaSourceConsumerFn<String> fn = new 
KafkaSourceConsumerFn<>(MySqlConnector.class, spec);
+    KafkaSourceConsumerFn.OffsetHolder restriction = 
fn.getInitialRestriction(null);
+    assertEquals(explicitOffset, restriction.offset);
+  }
+
+  @Test
+  public void testBuildExternalThrowsOnMalformedStartOffsetEntry() {
+    DebeziumTransformRegistrar.ReadBuilder.Configuration config =
+        new DebeziumTransformRegistrar.ReadBuilder.Configuration();
+    config.setUsername("user");
+    config.setPassword("pass");
+    config.setHost("localhost");
+    config.setPort("3306");
+    config.setConnectorClass("MySQL");
+    config.setStartOffset(Arrays.asList("lsn=100", "no-equals-sign"));
+    assertThrows(
+        IllegalArgumentException.class,
+        () -> new 
DebeziumTransformRegistrar.ReadBuilder().buildExternal(config));
+  }
 }
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/FileSystemOffsetRetainerTest.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/FileSystemOffsetRetainerTest.java
new file mode 100644
index 00000000000..e3ac0fff5da
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/FileSystemOffsetRetainerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link FileSystemOffsetRetainer}. */
+@RunWith(JUnit4.class)
+public class FileSystemOffsetRetainerTest {
+
+  @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+  @Before
+  public void setUp() {
+    FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
+  }
+
+  @Test
+  public void testLoadOffsetReturnNullWhenFileIsMissing() {
+    String path = tmpFolder.getRoot().getAbsolutePath() + "/nonexistent.json";
+    assertNull(FileSystemOffsetRetainer.of(path).loadOffset());
+  }
+
+  @Test
+  public void testLoadOffsetThrowsWhenFileIsUnreadable() throws Exception {
+    String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
+    FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path);
+    retainer.saveOffset(ImmutableMap.of("lsn", "100"));
+
+    // Corrupt the file so JSON parsing fails.
+    Files.newBufferedWriter(Paths.get(path), StandardCharsets.UTF_8).close(); 
// truncate to empty
+    assertThrows(RuntimeException.class, retainer::loadOffset);
+  }
+
+  @Test
+  public void testSaveAndLoadOffsetRoundTrip() {
+    String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
+    FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path);
+
+    retainer.saveOffset(ImmutableMap.of("file", "binlog.000001", "pos", 
"156"));
+
+    Map<String, Object> loaded = retainer.loadOffset();
+    assertNotNull(loaded);
+    assertEquals("binlog.000001", loaded.get("file"));
+    assertEquals("156", loaded.get("pos"));
+  }
+
+  @Test
+  public void testSaveOffsetSkipsWriteWhenOffsetUnchanged() throws Exception {
+    String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
+    FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path);
+    Map<String, Object> offset = ImmutableMap.of("lsn", "100");
+
+    retainer.saveOffset(offset);
+    long modifiedAfterFirstSave = new File(path).lastModified();
+
+    // Second call with the same offset should not touch the file.
+    Thread.sleep(10); // ensure mtime would differ if a write occurred
+    retainer.saveOffset(offset);
+    assertEquals(modifiedAfterFirstSave, new File(path).lastModified());
+  }
+
+  @Test
+  public void testSaveOffsetLeavesNoTmpFile() {
+    String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
+    FileSystemOffsetRetainer.of(path).saveOffset(ImmutableMap.of("lsn", 
"28160840"));
+
+    assertTrue("Final offset file should exist", new File(path).exists());
+    assertFalse("Temp file should not remain after rename", new File(path + 
".tmp").exists());
+  }
+
+  @Test
+  public void testSerializedRetainerCanLoadAfterDeserialization() throws 
Exception {
+    String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
+    FileSystemOffsetRetainer original = FileSystemOffsetRetainer.of(path);
+    original.saveOffset(ImmutableMap.of("lsn", "12345"));
+
+    ByteArrayOutputStream bos = new ByteArrayOutputStream();
+    try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+      oos.writeObject(original);
+    }
+    FileSystemOffsetRetainer deserialized;
+    try (ObjectInputStream ois =
+        new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
+      deserialized = (FileSystemOffsetRetainer) ois.readObject();
+    }
+
+    Map<String, Object> loaded = deserialized.loadOffset();
+    assertNotNull(loaded);
+    assertEquals("12345", loaded.get("lsn"));
+  }
+}
diff --git a/sdks/python/apache_beam/io/debezium.py 
b/sdks/python/apache_beam/io/debezium.py
index 26516fa4e4b..d1ca02aa68d 100644
--- a/sdks/python/apache_beam/io/debezium.py
+++ b/sdks/python/apache_beam/io/debezium.py
@@ -109,7 +109,9 @@ ReadFromDebeziumSchema = NamedTuple(
     'ReadFromDebeziumSchema',
     [('connector_class', str), ('username', str), ('password', str),
      ('host', str), ('port', str), ('max_number_of_records', Optional[int]),
-     ('connection_properties', List[str])])
+     ('connection_properties', List[str]),
+     ('start_offset', Optional[List[str]]),
+     ('offset_storage_path', Optional[str])])
 
 
 class ReadFromDebezium(PTransform):
@@ -131,6 +133,8 @@ class ReadFromDebezium(PTransform):
       port,
       max_number_of_records=None,
       connection_properties=None,
+      start_offset=None,
+      offset_storage_path=None,
       expansion_service=None):
     """
         Initializes a read operation from Debezium.
@@ -144,8 +148,38 @@ class ReadFromDebezium(PTransform):
                                       to be fetched before stop.
         :param connection_properties: properties of the debezium
                                       connection passed as string
-                                      with with format
-                                      [propertyName=property;]*
+                                      with format [propertyName=property;]*
+        :param start_offset: starting offset to resume the connector from
+                             a previously seen position. Provided as a list
+                             of "key=value" strings, where numeric values are
+                             encoded as their decimal string representation.
+                             Example for PostgreSQL::
+
+                               start_offset=["lsn=28160840"]
+
+                             Example for MySQL::
+
+                               start_offset=["file=binlog.000001", "pos=156"]
+
+                             Obtain the offset from the JSON output of a
+                             previous pipeline run (the "metadata" field
+                             contains connector-specific position info) or
+                             via ``SourceRecord.sourceOffset()`` in a custom
+                             Java SourceRecordMapper.
+        :param offset_storage_path: path to a file where the connector offset
+                                    is automatically saved after each 
checkpoint
+                                    and loaded on pipeline startup, allowing 
the
+                                    pipeline to resume from where it left off.
+                                    Supports any filesystem available to the
+                                    Beam runner (local, GCS, S3, etc.).
+                                    Example::
+
+                                      offset_storage_path=(
+                                        "gs://my-bucket/debezium/offset.json"
+                                      )
+
+                                    When set, takes precedence over
+                                    ``start_offset``.
         :param expansion_service: The address (host:port)
                                   of the ExpansionService.
     """
@@ -157,7 +191,9 @@ class ReadFromDebezium(PTransform):
         host=host,
         port=str(port),
         max_number_of_records=max_number_of_records,
-        connection_properties=connection_properties)
+        connection_properties=connection_properties,
+        start_offset=start_offset,
+        offset_storage_path=offset_storage_path)
     self.expansion_service = expansion_service or 
default_io_expansion_service()
 
   def expand(self, pbegin):


Reply via email to