[ https://issues.apache.org/jira/browse/KAFKA-13521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hector Geraldino updated KAFKA-13521: ------------------------------------- Issue Type: Bug (was: Improvement) > Supress changelog schema version breaks migration > ------------------------------------------------- > > Key: KAFKA-13521 > URL: https://issues.apache.org/jira/browse/KAFKA-13521 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 2.4.0, 2.5.0, 2.5.1 > Reporter: Hector Geraldino > Priority: Major > > Hi, > We recently updated the kafka-streams library in one of our apps from v2.5.0 > to v2.5.1. This upgrade changes the header format of the state store for > suppress changelog topics (see > https://issues.apache.org/jira/browse/KAFKA-10173 and > [https://github.com/apache/kafka/pull/8905)] > What we noticed was that, introducing a new version on the binary schema > header breaks older clients. I.e. applications running on v2.5.1 can parse > the v3, v2, v1 and 0 headers, while the ones running on 2.5.0 (and, I assume, > previous versions) cannot read headers in v3 format. > The logged exception is: > > {code:java} > java.lang.IllegalArgumentException: Restoring apparently invalid changelog > record: ConsumerRecord(topic = > msgequator-kfns-msgequator-msgequator-suppress-buffer-store-changelog, > partition = 8, leaderEpoch = 405, offset = 711400430, CreateTime = > 1638828473341, serialized key size = 32, serialized value size = 90, headers > = RecordHeaders(headers = [RecordHeader(key = v, value = [3])], isReadOnly = > false), key = [B@5cf0e540, value = [B@40abc004) > at > org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:372) > ~[msgequator-1.59.3.jar:1.59.3] > at > org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89) > ~[msgequator-1.59.3.jar:1.59.3] > at > org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92) > ~[msgequator-1.59.3.jar:1.59.3] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350) > ~[msgequator-1.59.3.jar:1.59.3] > at > org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94) > ~[msgequator-1.59.3.jar:1.59.3] > at > org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401) > ~[msgequator-1.59.3.jar:1.59.3] > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779) > ~[msgequator-1.59.3.jar:1.59.3] > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697) > ~[msgequator-1.59.3.jar:1.59.3] > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670) > ~[msgequator-1.59.3.jar:1.59.3] {code} > > > There's obviously no clear solution for this other than stopping/starting all > instances at once. A rolling bounce that takes some time to complete (in our > case, days) will break instances that haven't been upgraded yet after a > rebalance that causes older clients to pick up the newly encoded changelog > partition(s) > > I don't know if adding a flag on the client side that lists the supported > protocol versions (so it behaves like Kafka Consumers when picking the > rebalance protocol - cooperative or eager), or if it just needs to be > explicitly stated on the migration guide that a full stop/start migration is > required in cases where the protocol version changes. -- This message was sent by Atlassian Jira (v8.20.1#820001)