gemini-code-assist[bot] commented on PR #37750: URL: https://github.com/apache/beam/pull/37750#issuecomment-3997936072
## Summary of Changes This pull request significantly improves the resilience and usability of DebeziumIO by introducing robust offset management features. It allows users to either explicitly define a starting offset for Debezium connectors or leverage an automatic file system-based mechanism to persist and restore offsets, ensuring seamless pipeline restarts and fault tolerance. ### Highlights * **Enhanced Offset Management**: Introduced `startOffset` and `OffsetStoragePath` capabilities for DebeziumIO in both Go and Java SDKs, allowing users to specify a point from which the Debezium connector should resume consuming changes or automatically manage offsets via a file system. * **Automatic Offset Persistence**: Implemented an `OffsetRetainer` interface and a `FileSystemOffsetRetainer` class in Java, enabling automatic saving and loading of Debezium connector offsets to any Beam-compatible filesystem (e.g., GCS, S3) after each successful checkpoint. * **Cross-Language Consistency**: Ensured that the new offset management features are consistently exposed and configurable across the Go, Java, and Python SDKs, providing a unified experience for DebeziumIO users. * **Improved Pipeline Resilience**: Significantly enhanced the robustness of Debezium-based pipelines by allowing them to gracefully restart from a known good state, preventing data loss or reprocessing of already handled events. 🧠**New Feature in Public Preview:** You can now enable **Memory** to help **Gemini Code Assist** learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. **Click [here](https://codeassist.google/code-review/login) to enable Memory in your admin console.** <details> <summary><b>Changelog</b></summary> * **sdks/go/pkg/beam/io/xlang/debeziumio/debezium.go** * Added `StartOffset` and `OffsetStoragePath` fields to the `readFromDebeziumSchema` struct. * Implemented new `StartOffset` and `OffsetStoragePath` functions as read options for configuring the Debezium connector. * **sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumIO.java** * Added abstract `getStartOffset`, `getOffsetRetainer`, `setStartOffset`, and `setOffsetRetainer` methods to the `Read` and `Builder` classes. * Introduced `withStartOffset` and `withOffsetRetainer` builder methods with detailed Javadoc for configuring offset management. * **sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/DebeziumTransformRegistrar.java** * Imported `HashMap` for map manipulation. * Added `startOffset` and `offsetStoragePath` fields to the `Configuration` class and their corresponding setters. * Implemented logic to parse `startOffset` strings into a `Map<String, Object>` and to configure `OffsetRetainer` based on `offsetStoragePath`. * **sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/FileSystemOffsetRetainer.java** * Added a new class `FileSystemOffsetRetainer` to persist and load Debezium connector offsets as JSON files using Beam's `FileSystems`. * **sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java** * Modified `getInitialRestriction` to prioritize loading offsets from an `OffsetRetainer` or falling back to an explicit `startOffset`. * Added logic to save the committed offset using the `OffsetRetainer` after each successful task commit. * **sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/OffsetRetainer.java** * Added a new `OffsetRetainer` interface for defining strategies to persist and restore Debezium connector offsets. * **sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/DebeziumIOTest.java** * Added new test cases to verify the functionality of `withStartOffset` and `withOffsetRetainer`, including priority handling and null checks. * **sdks/python/apache_beam/io/debezium.py** * Extended `ReadFromDebeziumSchema` to include optional `start_offset` and `offset_storage_path` fields. * Updated the `ReadFromDebezium` constructor to accept and document `start_offset` and `offset_storage_path` parameters. </details> <details> <summary><b>Activity</b></summary> * A summary of changes was provided by `gemini-code-assist[bot]`. * Reviewers were assigned by `github-actions[bot]`. * `tvalentyn` requested assistance in finding a reviewer. * `tkaymak` suggested `Abacn` as a potential reviewer. * `Abacn` provided feedback, suggesting a more comprehensive `OffsetRetainer` interface for restartable offsets. * `tkaymak` agreed with `Abacn`'s vision for a better solution. * `tkaymak` requested a new summary. </details> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
