nicktelford commented on PR #21738: URL: https://github.com/apache/kafka/pull/21738#issuecomment-4074065130
> @nicktelford @bbejeck Can you please validate that kafkatest.tests.streams.streams_application_upgrade_test.StreamsUpgradeTest.test_app_upgrade works with these changes? It seems it is failing starting with this commit. > > Claude analysis: > > The previous analysis identified [KAFKA-19434](https://issues.apache.org/jira/browse/KAFKA-19434) (initializeStartupStores) as the cause. While [KAFKA-19434](https://issues.apache.org/jira/browse/KAFKA-19434) (Feb 20) introduced the startup initialization code path, the failure only started recently because [KAFKA-19712](https://issues.apache.org/jira/browse/KAFKA-19712) (commit [5740a26](https://github.com/apache/kafka/commit/5740a26525a27df9eacd847a6d4ed6eb23fda0dc), March 16) fundamentally changed how offsets are read within that path: > > Before [KAFKA-19712](https://issues.apache.org/jira/browse/KAFKA-19712): initializeStoreOffsetsFromCheckpoint() read offsets from the .checkpoint file and used changelogOffsetFromCheckpointedOffset() to convert the OFFSET_UNKNOWN sentinel (-4) back to null. This properly handled state written by older Kafka versions. > > After [KAFKA-19712](https://issues.apache.org/jira/browse/KAFKA-19712): The method was rewritten to initializeStoreOffsets(), which calls store.stateStore.committedOffset(store.changelogPartition) instead of reading the checkpoint file. RocksDB stores now manage their own offsets (managesOffsets() == true) and are no longer wrapped in LegacyCheckpointingStateStore. The changelogOffsetFromCheckpointedOffset() method — which handled the -4 → null conversion — was removed entirely. > > When opening state directories written by an older Kafka Streams version during upgrade, the new RocksDB store's committedOffset() returns raw values that were never intended to be exposed directly. This produces the -3 value (from -4 + 1) that fails the sentinel validation in sumOfChangelogOffsets(). > > [KAFKA-20257](https://issues.apache.org/jira/browse/KAFKA-20257) (commit [823f0cf](https://github.com/apache/kafka/commit/823f0cfc8cb4d6295f9385ce1dca2b172f3af7a8), same day) makes the same delegation change for GlobalStateManagerImpl and may have similar upgrade implications. > > Both commits landed on March 16 ~17:15 UTC, approximately 4 hours before the test workflow started (21:00 UTC), confirming they were in the tested build. @lucasbru Thanks for letting me know. I keep forgetting to run the smoke tests! :grimacing: Thanks to your Claude analysis, I think I've identified the bug: when migrating legacy offsets from `.checkpoint` to the `RocksDBStore`, `LegacyCheckpointingStateStore#migrateLegacyOffsets` was not using `changelogOffsetFromCheckpointedOffset` to convert the old checkpoint offset before calling `StateStore#commit`, causing the `OFFSET_UNKNOWN` sentinel value to be written to the `RocksDBStore` offsets CF. I'm just running the test suites now to validate the fix, and if it all passes, I'll raise a PR ASAP. Will tag you and Bill as reviewers. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
