This is an automated email from the ASF dual-hosted git repository.
yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new e1e2c3ba85c [DebeziumIO] Implement startOffset & offset persistence
for Kafka (#28248) (#37750)
e1e2c3ba85c is described below
commit e1e2c3ba85c05d71dddf3d3dabbe8d2b0f8565c7
Author: Tobias Kaymak <[email protected]>
AuthorDate: Mon Mar 16 15:48:41 2026 +0100
[DebeziumIO] Implement startOffset & offset persistence for Kafka (#28248)
(#37750)
* [DebeziumIO] Implement offsetRetainer for Kafka (#28248)
* Adjust to review comments and add unit tests
* Address Gemini review comment & update CHANGES.md
---
CHANGES.md | 1 +
sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go | 40 +++++
.../org/apache/beam/io/debezium/DebeziumIO.java | 76 ++++++++++
.../io/debezium/DebeziumTransformRegistrar.java | 38 +++++
.../beam/io/debezium/FileSystemOffsetRetainer.java | 166 +++++++++++++++++++++
.../beam/io/debezium/KafkaSourceConsumerFn.java | 26 +++-
.../apache/beam/io/debezium/OffsetRetainer.java | 69 +++++++++
.../apache/beam/io/debezium/DebeziumIOTest.java | 152 +++++++++++++++++++
.../io/debezium/FileSystemOffsetRetainerTest.java | 131 ++++++++++++++++
sdks/python/apache_beam/io/debezium.py | 44 +++++-
10 files changed, 738 insertions(+), 5 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 5499cb06647..4072a66e07c 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -65,6 +65,7 @@
## I/Os
* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
+* DebeziumIO (Java): added `OffsetRetainer` interface and
`FileSystemOffsetRetainer` implementation to persist and restore CDC offsets
across pipeline restarts, and exposed `withStartOffset` / `withOffsetRetainer`
on `DebeziumIO.Read` and the cross-language `ReadBuilder`
([#28248](https://github.com/apache/beam/issues/28248)).
## New Features / Improvements
diff --git a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
index 547aba0ceb9..f31a71fbc9d 100644
--- a/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
+++ b/sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go
@@ -77,6 +77,8 @@ type readFromDebeziumSchema struct {
MaxNumberOfRecords *int64
MaxTimeToRun *int64
ConnectionProperties []string
+ StartOffset []string
+ OffsetStoragePath *string
}
type debeziumConfig struct {
@@ -149,6 +151,44 @@ func ConnectionProperties(cp []string) readOption {
}
}
+// StartOffset specifies the offset from which the connector should resume
consuming
+// changes. Each entry must be a "key=value" string, where numeric values are
encoded
+// as their decimal string representation.
+//
+// Example for PostgreSQL:
+//
+// debeziumio.StartOffset([]string{"lsn=28160840"})
+//
+// Example for MySQL:
+//
+// debeziumio.StartOffset([]string{"file=binlog.000001", "pos=156"})
+//
+// Obtain the offset from the output of a previous pipeline run. Numeric
values such
+// as LSN or binlog position are automatically parsed to Long on the Java side.
+func StartOffset(offset []string) readOption {
+ return func(cfg *debeziumConfig) {
+ cfg.readSchema.StartOffset = offset
+ }
+}
+
+// OffsetStoragePath sets a path where the connector offset is automatically
saved after each
+// checkpoint and loaded on pipeline startup, allowing the pipeline to resume
from where it
+// left off without any manual offset management.
+//
+// The path can be on any filesystem supported by the active Beam runner
+// (local disk, GCS, S3, etc.).
+//
+// Example:
+//
+//
debeziumio.OffsetStoragePath("gs://my-bucket/debezium/orders-offset.json")
+//
+// When set, takes precedence over StartOffset.
+func OffsetStoragePath(path string) readOption {
+ return func(cfg *debeziumConfig) {
+ cfg.readSchema.OffsetStoragePath = &path
+ }
+}
+
// ExpansionAddr sets the expansion service address to use for DebeziumIO
cross-langauage transform.
func ExpansionAddr(expansionAddr string) readOption {
return func(cfg *debeziumConfig) {
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
index ebf91a4a095..6c31d5a0234 100644
---
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java
@@ -144,6 +144,10 @@ public class DebeziumIO {
abstract @Nullable Long getPollingTimeout();
+ abstract @Nullable Map<String, Object> getStartOffset();
+
+ abstract @Nullable OffsetRetainer getOffsetRetainer();
+
abstract @Nullable Coder<T> getCoder();
abstract Builder<T> toBuilder();
@@ -162,6 +166,10 @@ public class DebeziumIO {
abstract Builder<T> setPollingTimeout(Long miliseconds);
+ abstract Builder<T> setStartOffset(Map<String, Object> startOffset);
+
+ abstract Builder<T> setOffsetRetainer(OffsetRetainer retainer);
+
abstract Read<T> build();
}
@@ -230,6 +238,74 @@ public class DebeziumIO {
return toBuilder().setPollingTimeout(miliseconds).build();
}
+ /**
+ * Sets a starting offset so the connector resumes consuming changes from
a previously seen
+ * position rather than from the beginning of the change stream.
+ *
+ * <p>The offset format is connector-specific. You can capture the current
offset for each
+ * processed record inside your {@link SourceRecordMapper} via {@link
+ * org.apache.kafka.connect.source.SourceRecord#sourceOffset()} and
persist it externally (for
+ * example in Cloud Storage, a database, or a local file). On the next
pipeline run, pass the
+ * last saved offset here.
+ *
+ * <p>Example (PostgreSQL):
+ *
+ * <pre>{@code
+ * // Capture the offset inside the SourceRecordMapper:
+ * Map<String, Object> offset = sourceRecord.sourceOffset();
+ * // Persist 'offset' externally, then on restart:
+ * DebeziumIO.read()
+ * .withConnectorConfiguration(config)
+ * .withStartOffset(savedOffset)
+ * .withFormatFunction(myMapper);
+ * }</pre>
+ *
+ * @param startOffset A map representing the resumption point, as returned
by {@code
+ * SourceRecord#sourceOffset()}.
+ * @return PTransform {@link #read}
+ */
+ public Read<T> withStartOffset(Map<String, Object> startOffset) {
+ checkArgument(startOffset != null, "startOffset can not be null");
+ return toBuilder().setStartOffset(startOffset).build();
+ }
+
+ /**
+ * Sets an {@link OffsetRetainer} that automatically saves and restores
the connector offset,
+ * allowing the pipeline to resume from where it left off after a restart
without any manual
+ * offset management.
+ *
+ * <p>When a retainer is configured:
+ *
+ * <ol>
+ * <li>At pipeline startup, {@link OffsetRetainer#loadOffset()} is
called. If a saved offset
+ * is found, the connector resumes from that position; otherwise it
starts from the
+ * beginning of the change stream.
+ * <li>After each successful checkpoint ({@code task.commit()}), {@link
+ * OffsetRetainer#saveOffset(Map)} is called with the latest
committed offset.
+ * </ol>
+ *
+ * <p>The built-in {@link FileSystemOffsetRetainer} persists the offset as
a JSON file on any
+ * Beam-compatible filesystem (local, GCS, S3, etc.):
+ *
+ * <pre>{@code
+ * DebeziumIO.read()
+ * .withConnectorConfiguration(config)
+ * .withOffsetRetainer(
+ * new
FileSystemOffsetRetainer("gs://my-bucket/debezium/orders-offset.json"))
+ * .withFormatFunction(myMapper);
+ * }</pre>
+ *
+ * <p>When both a retainer and {@link #withStartOffset(Map)} are set, the
retainer takes
+ * precedence. Use {@link #withStartOffset(Map)} alone for a one-time
manual override.
+ *
+ * @param retainer The {@link OffsetRetainer} to use for loading and
saving offsets.
+ * @return PTransform {@link #read}
+ */
+ public Read<T> withOffsetRetainer(OffsetRetainer retainer) {
+ checkArgument(retainer != null, "retainer can not be null");
+ return toBuilder().setOffsetRetainer(retainer).build();
+ }
+
protected Schema getRecordSchema() {
KafkaSourceConsumerFn<T> fn =
new
KafkaSourceConsumerFn<>(getConnectorConfiguration().getConnectorClass().get(),
this);
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
index 22a34ae2654..4b431ac38ba 100644
---
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java
@@ -18,6 +18,7 @@
package org.apache.beam.io.debezium;
import com.google.auto.service.AutoService;
+import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.expansion.ExternalTransformRegistrar;
@@ -78,6 +79,8 @@ public class DebeziumTransformRegistrar implements
ExternalTransformRegistrar {
private @Nullable List<String> connectionProperties;
private @Nullable Long maxNumberOfRecords;
private @Nullable Long maxTimeToRun;
+ private @Nullable List<String> startOffset;
+ private @Nullable String offsetStoragePath;
public void setConnectionProperties(@Nullable List<String>
connectionProperties) {
this.connectionProperties = connectionProperties;
@@ -90,6 +93,14 @@ public class DebeziumTransformRegistrar implements
ExternalTransformRegistrar {
public void setMaxTimeToRun(@Nullable Long maxTimeToRun) {
this.maxTimeToRun = maxTimeToRun;
}
+
+ public void setStartOffset(@Nullable List<String> startOffset) {
+ this.startOffset = startOffset;
+ }
+
+ public void setOffsetStoragePath(@Nullable String offsetStoragePath) {
+ this.offsetStoragePath = offsetStoragePath;
+ }
}
@Override
@@ -123,6 +134,33 @@ public class DebeziumTransformRegistrar implements
ExternalTransformRegistrar {
readTransform =
readTransform.withMaxTimeToRun(configuration.maxTimeToRun);
}
+ if (configuration.startOffset != null) {
+ Map<String, Object> startOffsetMap = new HashMap<>();
+ for (String property : configuration.startOffset) {
+ String[] parts = property.split("=", 2);
+ if (parts.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid startOffset entry: \""
+ + property
+ + "\". Expected format is \"key=value\".");
+ }
+ String key = parts[0];
+ String value = parts[1];
+ try {
+ startOffsetMap.put(key, Long.parseLong(value));
+ } catch (NumberFormatException e) {
+ startOffsetMap.put(key, value);
+ }
+ }
+ readTransform = readTransform.withStartOffset(startOffsetMap);
+ }
+
+ if (configuration.offsetStoragePath != null) {
+ readTransform =
+ readTransform.withOffsetRetainer(
+ FileSystemOffsetRetainer.of(configuration.offsetStoragePath));
+ }
+
return readTransform;
}
}
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/FileSystemOffsetRetainer.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/FileSystemOffsetRetainer.java
new file mode 100644
index 00000000000..552eccafe3a
--- /dev/null
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/FileSystemOffsetRetainer.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.Collections;
+import java.util.Map;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.checkerframework.checker.nullness.qual.Nullable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link OffsetRetainer} that persists the Debezium connector offset as a
JSON file using Beam's
+ * {@link FileSystems} abstraction.
+ *
+ * <p>The {@code path} argument can point to any filesystem supported by the
active Beam runner,
+ * including local disk, Google Cloud Storage, Amazon S3, and others
+ *
+ * <p>On every {@code task.commit()}, the latest offset is serialised to JSON
and written to the
+ * given path (overwriting the previous file). On pipeline startup the file is
read back and the
+ * connector resumes from the stored position. If the file does not yet exist
the connector starts
+ * from the beginning of the change stream.
+ *
+ * <p>Example — resume from GCS:
+ *
+ * <pre>{@code
+ * DebeziumIO.read()
+ * .withConnectorConfiguration(config)
+ * .withOffsetRetainer(
+ *
FileSystemOffsetRetainer.of("gs://my-bucket/debezium/orders-offset.json"))
+ * .withFormatFunction(myMapper);
+ * }</pre>
+ *
+ * <p>Example — local filesystem (useful for testing):
+ *
+ * <pre>{@code
+ * DebeziumIO.read()
+ * .withConnectorConfiguration(config)
+ *
.withOffsetRetainer(FileSystemOffsetRetainer.of("/tmp/debezium-offset.json"))
+ * .withFormatFunction(myMapper);
+ * }</pre>
+ *
+ * <p><b>Note:</b> writes are performed atomically: the offset is first
written to a {@code .tmp}
+ * sibling file and then renamed to the final path, so a mid-write crash
leaves the previous offset
+ * intact.
+ */
+public class FileSystemOffsetRetainer implements OffsetRetainer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FileSystemOffsetRetainer.class);
+ private static final TypeReference<Map<String, Object>> MAP_TYPE = new
TypeReference<>() {};
+
+ private final String path;
+
+ // ObjectMapper is thread-safe after configuration and does not need to be
serialised.
+ private transient @Nullable ObjectMapper objectMapper;
+
+ // Tracks the last successfully saved offset so repeated identical saves are
skipped.
+ private transient @Nullable Map<String, Object> lastSavedOffset;
+
+ private FileSystemOffsetRetainer(String path) {
+ this.path = path;
+ }
+
+ /** Creates a new {@code FileSystemOffsetRetainer} that stores the offset at
{@code path}. */
+ public static FileSystemOffsetRetainer of(String path) {
+ return new FileSystemOffsetRetainer(path);
+ }
+
+ private ObjectMapper mapper() {
+ if (objectMapper == null) {
+ objectMapper = new ObjectMapper();
+ }
+ return objectMapper;
+ }
+
+ /**
+ * Reads the offset JSON file and returns its contents, or {@code null} if
the file does not yet
+ * exist (first run). Throws {@link RuntimeException} if the file exists but
cannot be read, to
+ * prevent silently reprocessing data from the beginning.
+ */
+ @Override
+ public @Nullable Map<String, Object> loadOffset() {
+ try {
+ ResourceId resourceId = FileSystems.matchNewResource(path, /*
isDirectory= */ false);
+ try (ReadableByteChannel channel = FileSystems.open(resourceId);
+ InputStream stream = Channels.newInputStream(channel)) {
+ Map<String, Object> offset = mapper().readValue(stream, MAP_TYPE);
+ LOG.info("OffsetRetainer: loaded offset from {}: {}", path, offset);
+ return offset;
+ }
+ } catch (FileNotFoundException e) {
+ LOG.info("OffsetRetainer: no offset file found at {}; starting from the
beginning.", path);
+ return null;
+ } catch (IOException e) {
+ throw new RuntimeException(
+ "OffsetRetainer: failed to read offset from "
+ + path
+ + ". "
+ + "Delete the file to restart from the beginning.",
+ e);
+ }
+ }
+
+ /**
+ * Serialises {@code offset} to JSON and writes it atomically to the
configured path.
+ *
+ * <p>If the offset is identical to the last successfully written one, the
write is skipped to
+ * avoid unnecessary I/O on every checkpoint.
+ *
+ * <p>Otherwise the data is first written to a {@code .tmp} sibling file and
then renamed to the
+ * final path, so a mid-write crash leaves the previous offset intact.
+ *
+ * <p>Errors are logged as warnings and swallowed so the pipeline continues.
+ */
+ @Override
+ public void saveOffset(Map<String, Object> offset) {
+ if (offset.equals(lastSavedOffset)) {
+ LOG.debug("OffsetRetainer: offset unchanged, skipping write to {}",
path);
+ return;
+ }
+ String tmpPath = path + ".tmp";
+ try {
+ ResourceId tmpResourceId = FileSystems.matchNewResource(tmpPath, /*
isDirectory= */ false);
+ try (WritableByteChannel channel = FileSystems.create(tmpResourceId,
"application/json");
+ OutputStream stream = Channels.newOutputStream(channel)) {
+ mapper().writeValue(stream, offset);
+ }
+ ResourceId finalResourceId = FileSystems.matchNewResource(path, /*
isDirectory= */ false);
+ FileSystems.rename(
+ Collections.singletonList(tmpResourceId),
Collections.singletonList(finalResourceId));
+ lastSavedOffset = offset;
+ LOG.debug("OffsetRetainer: saved offset to {}: {}", path, offset);
+ } catch (IOException e) {
+ LOG.warn(
+ "OffsetRetainer: failed to save offset to {}."
+ + " The offset will be lost if the pipeline restarts.",
+ path,
+ e);
+ }
+ }
+}
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
index fb4c2f21458..d298ddd9caf 100644
---
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java
@@ -122,7 +122,21 @@ public class KafkaSourceConsumerFn<T> extends
DoFn<Map<String, String>, T> {
@GetInitialRestriction
public OffsetHolder getInitialRestriction(@Element Map<String, String>
unused)
throws IOException {
- return new OffsetHolder(null, null, null, spec.getMaxNumberOfRecords(),
spec.getMaxTimeToRun());
+ Map<String, Object> initialOffset = null;
+
+ // Retainer takes precedence: it reflects the most recently committed
position.
+ OffsetRetainer retainer = spec.getOffsetRetainer();
+ if (retainer != null) {
+ initialOffset = retainer.loadOffset();
+ }
+
+ // Fall back to the explicit one-time override when the retainer has no
saved offset.
+ if (initialOffset == null) {
+ initialOffset = spec.getStartOffset();
+ }
+
+ return new OffsetHolder(
+ initialOffset, null, null, spec.getMaxNumberOfRecords(),
spec.getMaxTimeToRun());
}
@NewTracker
@@ -284,6 +298,16 @@ public class KafkaSourceConsumerFn<T> extends
DoFn<Map<String, String>, T> {
receiver.outputWithTimestamp(json, recordInstant);
}
task.commit();
+
+ // Persist the offset after every successful commit so the pipeline
can resume
+ // from this position on restart.
+ OffsetRetainer retainer = spec.getOffsetRetainer();
+ @SuppressWarnings("unchecked")
+ Map<String, Object> committedOffset =
+ (Map<String, Object>) tracker.currentRestriction().offset;
+ if (retainer != null && committedOffset != null) {
+ retainer.saveOffset(committedOffset);
+ }
}
} catch (Exception ex) {
throw new RuntimeException("Error occurred when consuming changes from
Database. ", ex);
diff --git
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/OffsetRetainer.java
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/OffsetRetainer.java
new file mode 100644
index 00000000000..b1fe5a58bbe
--- /dev/null
+++
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/OffsetRetainer.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import java.io.Serializable;
+import java.util.Map;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/**
+ * Strategy interface for persisting and restoring Debezium connector offsets
across pipeline
+ * restarts.
+ *
+ * <p>When configured via {@link
DebeziumIO.Read#withOffsetRetainer(OffsetRetainer)}, the pipeline
+ * behaves as follows:
+ *
+ * <ol>
+ * <li>On startup, {@link #loadOffset()} is called once. If a non-null
offset is returned, the
+ * Debezium connector resumes from that position; otherwise it starts
from the beginning of
+ * the change stream.
+ * <li>After each successful {@code task.commit()}, {@link #saveOffset(Map)}
is called with the
+ * latest committed offset.
+ * </ol>
+ *
+ * <p>A ready-to-use filesystem-based implementation is provided by {@link
+ * FileSystemOffsetRetainer}, which supports any Beam-compatible filesystem
(local, GCS, S3, etc.)
+ *
+ * <p>Implementations must be {@link Serializable} because they are embedded
inside {@link
+ * DebeziumIO.Read}, which is a {@link
org.apache.beam.sdk.transforms.PTransform} that gets
+ * serialized and shipped to workers.
+ */
+public interface OffsetRetainer extends Serializable {
+
+ /**
+ * Returns the most recently saved offset, or {@code null} if no offset has
been saved yet.
+ *
+ * <p>A {@code null} return causes the connector to start from the beginning
of the change stream.
+ * Implementations should handle transient I/O errors gracefully and return
{@code null} on
+ * failure rather than propagating an exception.
+ */
+ @Nullable
+ Map<String, Object> loadOffset();
+
+ /**
+ * Persists the given offset so it can be recovered after a pipeline restart.
+ *
+ * <p>Called after each successful {@code task.commit()} with the latest
committed offset.
+ * Implementations should swallow transient errors rather than throwing, so
that a failed save
+ * does not terminate the pipeline.
+ *
+ * @param offset The current connector offset, as returned by {@link
+ * org.apache.kafka.connect.source.SourceRecord#sourceOffset()}.
+ */
+ void saveOffset(Map<String, Object> offset);
+}
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
index 88ecc4fdd90..869fa5d34a5 100644
---
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java
@@ -17,6 +17,9 @@
*/
package org.apache.beam.io.debezium;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
@@ -24,9 +27,13 @@ import io.debezium.config.Configuration;
import io.debezium.connector.mysql.MySqlConnector;
import io.debezium.connector.mysql.MySqlConnectorConfig;
import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
import java.util.Map;
import org.apache.beam.io.debezium.DebeziumIO.ConnectorConfiguration;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.kafka.common.config.ConfigValue;
+import org.checkerframework.checker.nullness.qual.Nullable;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -95,4 +102,149 @@ public class DebeziumIOTest implements Serializable {
IllegalArgumentException.class,
() ->
MYSQL_CONNECTOR_CONFIGURATION.withUsername(username).withPassword(password));
}
+
+ @Test
+ public void testReadWithStartOffsetStoresOffset() {
+ Map<String, Object> offset = ImmutableMap.of("file", "mysql-bin.000003",
"pos", 156L);
+ DebeziumIO.Read<String> read =
+ DebeziumIO.<String>read()
+ .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+ .withStartOffset(offset);
+ assertEquals(offset, read.getStartOffset());
+ }
+
+ @Test
+ public void testReadWithoutStartOffsetIsNull() {
+ DebeziumIO.Read<String> read =
+
DebeziumIO.<String>read().withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION);
+ assertNull(read.getStartOffset());
+ }
+
+ @Test
+ public void testReadWithNullStartOffsetThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ DebeziumIO.<String>read()
+ .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+ .withStartOffset(null));
+ }
+
+ @Test
+ public void testGetInitialRestrictionUsesStartOffset() throws Exception {
+ Map<String, Object> offset = ImmutableMap.of("file", "mysql-bin.000003",
"pos", 156L);
+ DebeziumIO.Read<String> spec =
+ DebeziumIO.<String>read()
+ .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+ .withStartOffset(offset);
+ KafkaSourceConsumerFn<String> fn = new
KafkaSourceConsumerFn<>(MySqlConnector.class, spec);
+ KafkaSourceConsumerFn.OffsetHolder restriction =
fn.getInitialRestriction(null);
+ assertEquals(offset, restriction.offset);
+ }
+
+ @Test
+ public void testGetInitialRestrictionWithoutStartOffsetIsNull() throws
Exception {
+ DebeziumIO.Read<String> spec =
+
DebeziumIO.<String>read().withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION);
+ KafkaSourceConsumerFn<String> fn = new
KafkaSourceConsumerFn<>(MySqlConnector.class, spec);
+ KafkaSourceConsumerFn.OffsetHolder restriction =
fn.getInitialRestriction(null);
+ assertNull(restriction.offset);
+ }
+
+ // ---- OffsetRetainer tests -----------------------------------------------
+
+ /** Minimal in-memory retainer used only in tests. */
+ private static class InMemoryOffsetRetainer implements OffsetRetainer {
+ private final @Nullable Map<String, Object> loadResult;
+ private @Nullable Map<String, Object> lastSaved;
+
+ InMemoryOffsetRetainer(@Nullable Map<String, Object> loadResult) {
+ this.loadResult = loadResult;
+ }
+
+ @Override
+ public @Nullable Map<String, Object> loadOffset() {
+ return loadResult;
+ }
+
+ @Override
+ public void saveOffset(Map<String, Object> offset) {
+ lastSaved = new HashMap<>(offset);
+ }
+ }
+
+ @Test
+ public void testWithOffsetRetainerStoresRetainer() {
+ InMemoryOffsetRetainer retainer = new InMemoryOffsetRetainer(null);
+ DebeziumIO.Read<String> read =
+ DebeziumIO.<String>read()
+ .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+ .withOffsetRetainer(retainer);
+ assertNotNull(read.getOffsetRetainer());
+ }
+
+ @Test
+ public void testWithNullOffsetRetainerThrows() {
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ DebeziumIO.<String>read()
+ .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+ .withOffsetRetainer(null));
+ }
+
+ @Test
+ public void testGetInitialRestrictionUsesRetainerOffset() throws Exception {
+ Map<String, Object> savedOffset = ImmutableMap.of("lsn", 28160840L);
+ DebeziumIO.Read<String> spec =
+ DebeziumIO.<String>read()
+ .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+ .withOffsetRetainer(new InMemoryOffsetRetainer(savedOffset));
+ KafkaSourceConsumerFn<String> fn = new
KafkaSourceConsumerFn<>(MySqlConnector.class, spec);
+ KafkaSourceConsumerFn.OffsetHolder restriction =
fn.getInitialRestriction(null);
+ assertEquals(savedOffset, restriction.offset);
+ }
+
+ @Test
+ public void testRetainerTakesPriorityOverWithStartOffset() throws Exception {
+ Map<String, Object> retainerOffset = ImmutableMap.of("lsn", 99L);
+ Map<String, Object> explicitOffset = ImmutableMap.of("lsn", 1L);
+ DebeziumIO.Read<String> spec =
+ DebeziumIO.<String>read()
+ .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+ .withStartOffset(explicitOffset)
+ .withOffsetRetainer(new InMemoryOffsetRetainer(retainerOffset));
+ KafkaSourceConsumerFn<String> fn = new
KafkaSourceConsumerFn<>(MySqlConnector.class, spec);
+ KafkaSourceConsumerFn.OffsetHolder restriction =
fn.getInitialRestriction(null);
+ assertEquals(retainerOffset, restriction.offset);
+ }
+
+ @Test
+ public void testRetainerFallsBackToWithStartOffsetWhenLoadReturnsNull()
throws Exception {
+ Map<String, Object> explicitOffset = ImmutableMap.of("lsn", 1L);
+ DebeziumIO.Read<String> spec =
+ DebeziumIO.<String>read()
+ .withConnectorConfiguration(MYSQL_CONNECTOR_CONFIGURATION)
+ .withStartOffset(explicitOffset)
+ // Retainer has no saved offset yet (returns null).
+ .withOffsetRetainer(new InMemoryOffsetRetainer(null));
+ KafkaSourceConsumerFn<String> fn = new
KafkaSourceConsumerFn<>(MySqlConnector.class, spec);
+ KafkaSourceConsumerFn.OffsetHolder restriction =
fn.getInitialRestriction(null);
+ assertEquals(explicitOffset, restriction.offset);
+ }
+
+ @Test
+ public void testBuildExternalThrowsOnMalformedStartOffsetEntry() {
+ DebeziumTransformRegistrar.ReadBuilder.Configuration config =
+ new DebeziumTransformRegistrar.ReadBuilder.Configuration();
+ config.setUsername("user");
+ config.setPassword("pass");
+ config.setHost("localhost");
+ config.setPort("3306");
+ config.setConnectorClass("MySQL");
+ config.setStartOffset(Arrays.asList("lsn=100", "no-equals-sign"));
+ assertThrows(
+ IllegalArgumentException.class,
+ () -> new
DebeziumTransformRegistrar.ReadBuilder().buildExternal(config));
+ }
}
diff --git
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/FileSystemOffsetRetainerTest.java
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/FileSystemOffsetRetainerTest.java
new file mode 100644
index 00000000000..e3ac0fff5da
--- /dev/null
+++
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/FileSystemOffsetRetainerTest.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.Map;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Unit tests for {@link FileSystemOffsetRetainer}. */
+@RunWith(JUnit4.class)
+public class FileSystemOffsetRetainerTest {
+
+ @Rule public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ @Before
+ public void setUp() {
+ FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create());
+ }
+
+ @Test
+ public void testLoadOffsetReturnNullWhenFileIsMissing() {
+ String path = tmpFolder.getRoot().getAbsolutePath() + "/nonexistent.json";
+ assertNull(FileSystemOffsetRetainer.of(path).loadOffset());
+ }
+
+ @Test
+ public void testLoadOffsetThrowsWhenFileIsUnreadable() throws Exception {
+ String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
+ FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path);
+ retainer.saveOffset(ImmutableMap.of("lsn", "100"));
+
+ // Corrupt the file so JSON parsing fails.
+ Files.newBufferedWriter(Paths.get(path), StandardCharsets.UTF_8).close();
// truncate to empty
+ assertThrows(RuntimeException.class, retainer::loadOffset);
+ }
+
+ @Test
+ public void testSaveAndLoadOffsetRoundTrip() {
+ String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
+ FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path);
+
+ retainer.saveOffset(ImmutableMap.of("file", "binlog.000001", "pos",
"156"));
+
+ Map<String, Object> loaded = retainer.loadOffset();
+ assertNotNull(loaded);
+ assertEquals("binlog.000001", loaded.get("file"));
+ assertEquals("156", loaded.get("pos"));
+ }
+
+ @Test
+ public void testSaveOffsetSkipsWriteWhenOffsetUnchanged() throws Exception {
+ String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
+ FileSystemOffsetRetainer retainer = FileSystemOffsetRetainer.of(path);
+ Map<String, Object> offset = ImmutableMap.of("lsn", "100");
+
+ retainer.saveOffset(offset);
+ long modifiedAfterFirstSave = new File(path).lastModified();
+
+ // Second call with the same offset should not touch the file.
+ Thread.sleep(10); // ensure mtime would differ if a write occurred
+ retainer.saveOffset(offset);
+ assertEquals(modifiedAfterFirstSave, new File(path).lastModified());
+ }
+
+ @Test
+ public void testSaveOffsetLeavesNoTmpFile() {
+ String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
+ FileSystemOffsetRetainer.of(path).saveOffset(ImmutableMap.of("lsn",
"28160840"));
+
+ assertTrue("Final offset file should exist", new File(path).exists());
+ assertFalse("Temp file should not remain after rename", new File(path +
".tmp").exists());
+ }
+
+ @Test
+ public void testSerializedRetainerCanLoadAfterDeserialization() throws
Exception {
+ String path = tmpFolder.getRoot().getAbsolutePath() + "/offset.json";
+ FileSystemOffsetRetainer original = FileSystemOffsetRetainer.of(path);
+ original.saveOffset(ImmutableMap.of("lsn", "12345"));
+
+ ByteArrayOutputStream bos = new ByteArrayOutputStream();
+ try (ObjectOutputStream oos = new ObjectOutputStream(bos)) {
+ oos.writeObject(original);
+ }
+ FileSystemOffsetRetainer deserialized;
+ try (ObjectInputStream ois =
+ new ObjectInputStream(new ByteArrayInputStream(bos.toByteArray()))) {
+ deserialized = (FileSystemOffsetRetainer) ois.readObject();
+ }
+
+ Map<String, Object> loaded = deserialized.loadOffset();
+ assertNotNull(loaded);
+ assertEquals("12345", loaded.get("lsn"));
+ }
+}
diff --git a/sdks/python/apache_beam/io/debezium.py
b/sdks/python/apache_beam/io/debezium.py
index 26516fa4e4b..d1ca02aa68d 100644
--- a/sdks/python/apache_beam/io/debezium.py
+++ b/sdks/python/apache_beam/io/debezium.py
@@ -109,7 +109,9 @@ ReadFromDebeziumSchema = NamedTuple(
'ReadFromDebeziumSchema',
[('connector_class', str), ('username', str), ('password', str),
('host', str), ('port', str), ('max_number_of_records', Optional[int]),
- ('connection_properties', List[str])])
+ ('connection_properties', List[str]),
+ ('start_offset', Optional[List[str]]),
+ ('offset_storage_path', Optional[str])])
class ReadFromDebezium(PTransform):
@@ -131,6 +133,8 @@ class ReadFromDebezium(PTransform):
port,
max_number_of_records=None,
connection_properties=None,
+ start_offset=None,
+ offset_storage_path=None,
expansion_service=None):
"""
Initializes a read operation from Debezium.
@@ -144,8 +148,38 @@ class ReadFromDebezium(PTransform):
to be fetched before stop.
:param connection_properties: properties of the debezium
connection passed as string
- with with format
- [propertyName=property;]*
+ with format [propertyName=property;]*
+ :param start_offset: starting offset to resume the connector from
+ a previously seen position. Provided as a list
+ of "key=value" strings, where numeric values are
+ encoded as their decimal string representation.
+ Example for PostgreSQL::
+
+ start_offset=["lsn=28160840"]
+
+ Example for MySQL::
+
+ start_offset=["file=binlog.000001", "pos=156"]
+
+ Obtain the offset from the JSON output of a
+ previous pipeline run (the "metadata" field
+ contains connector-specific position info) or
+ via ``SourceRecord.sourceOffset()`` in a custom
+ Java SourceRecordMapper.
+ :param offset_storage_path: path to a file where the connector offset
+ is automatically saved after each
checkpoint
+ and loaded on pipeline startup, allowing
the
+ pipeline to resume from where it left off.
+ Supports any filesystem available to the
+ Beam runner (local, GCS, S3, etc.).
+ Example::
+
+ offset_storage_path=(
+ "gs://my-bucket/debezium/offset.json"
+ )
+
+ When set, takes precedence over
+ ``start_offset``.
:param expansion_service: The address (host:port)
of the ExpansionService.
"""
@@ -157,7 +191,9 @@ class ReadFromDebezium(PTransform):
host=host,
port=str(port),
max_number_of_records=max_number_of_records,
- connection_properties=connection_properties)
+ connection_properties=connection_properties,
+ start_offset=start_offset,
+ offset_storage_path=offset_storage_path)
self.expansion_service = expansion_service or
default_io_expansion_service()
def expand(self, pbegin):