[ 
https://issues.apache.org/jira/browse/KAFKA-13521?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hector Geraldino updated KAFKA-13521:
-------------------------------------
    Issue Type: Bug  (was: Improvement)

> Supress changelog schema version breaks migration
> -------------------------------------------------
>
>                 Key: KAFKA-13521
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13521
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.0, 2.5.0, 2.5.1
>            Reporter: Hector Geraldino
>            Priority: Major
>
> Hi,
> We recently updated the kafka-streams library in one of our apps from v2.5.0 
> to v2.5.1. This upgrade changes the header format of the state store for 
> suppress changelog topics (see 
> https://issues.apache.org/jira/browse/KAFKA-10173 and 
> [https://github.com/apache/kafka/pull/8905)]
> What we noticed was that, introducing a new version on the binary schema 
> header breaks older clients. I.e. applications running on v2.5.1 can parse 
> the v3, v2, v1 and 0 headers, while the ones running on 2.5.0 (and, I assume, 
> previous versions) cannot read headers in v3 format.
> The logged exception is:
>  
> {code:java}
> java.lang.IllegalArgumentException: Restoring apparently invalid changelog 
> record: ConsumerRecord(topic = 
> msgequator-kfns-msgequator-msgequator-suppress-buffer-store-changelog, 
> partition = 8, leaderEpoch = 405, offset = 711400430, CreateTime = 
> 1638828473341, serialized key size = 32, serialized value size = 90, headers 
> = RecordHeaders(headers = [RecordHeader(key = v, value = [3])], isReadOnly = 
> false), key = [B@5cf0e540, value = [B@40abc004)
>       at 
> org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:372)
>  ~[msgequator-1.59.3.jar:1.59.3]
>       at 
> org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
>  ~[msgequator-1.59.3.jar:1.59.3]
>       at 
> org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
>  ~[msgequator-1.59.3.jar:1.59.3]
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
>  ~[msgequator-1.59.3.jar:1.59.3]
>       at 
> org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
>  ~[msgequator-1.59.3.jar:1.59.3]
>       at 
> org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
>  ~[msgequator-1.59.3.jar:1.59.3]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
>  ~[msgequator-1.59.3.jar:1.59.3]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>  ~[msgequator-1.59.3.jar:1.59.3]
>       at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
>  ~[msgequator-1.59.3.jar:1.59.3] {code}
>  
>  
> There's obviously no clear solution for this other than stopping/starting all 
> instances at once. A rolling bounce that takes some time to complete (in our 
> case, days) will break  instances that haven't been upgraded yet after a 
> rebalance that causes older clients to pick up the newly encoded changelog 
> partition(s)
>  
> I don't know if adding a flag on the client side that lists the supported 
> protocol versions (so it behaves like Kafka Consumers when picking the 
> rebalance protocol - cooperative or eager), or if it just needs to be 
> explicitly stated on the migration guide that a full stop/start migration is 
> required in cases where the protocol version changes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to