Hector Geraldino created KAFKA-13521:
----------------------------------------

             Summary: Supress changelog schema version breaks migration
                 Key: KAFKA-13521
                 URL: https://issues.apache.org/jira/browse/KAFKA-13521
             Project: Kafka
          Issue Type: Improvement
          Components: streams
    Affects Versions: 2.5.1, 2.5.0, 2.4.0
            Reporter: Hector Geraldino


Hi,

We recently updated the kafka-streams library in one of our apps from v2.5.0 to 
v2.5.1. This upgrade changes the header format of the state store for suppress 
changelog topics (see https://issues.apache.org/jira/browse/KAFKA-10173 and 
[https://github.com/apache/kafka/pull/8905)]

What we noticed was that, introducing a new version on the binary schema header 
breaks older clients. I.e. applications running on v2.5.1 can parse the v3, v2, 
v1 and 0 headers, while the ones running on 2.5.0 (and, I assume, previous 
versions) cannot read headers in v3 format.

The logged exception is:

 
{code:java}
java.lang.IllegalArgumentException: Restoring apparently invalid changelog 
record: ConsumerRecord(topic = 
msgequator-kfns-msgequator-msgequator-suppress-buffer-store-changelog, 
partition = 8, leaderEpoch = 405, offset = 711400430, CreateTime = 
1638828473341, serialized key size = 32, serialized value size = 90, headers = 
RecordHeaders(headers = [RecordHeader(key = v, value = [3])], isReadOnly = 
false), key = [B@5cf0e540, value = [B@40abc004)
        at 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueBuffer.restoreBatch(InMemoryTimeOrderedKeyValueBuffer.java:372)
 ~[msgequator-1.59.3.jar:1.59.3]
        at 
org.apache.kafka.streams.processor.internals.CompositeRestoreListener.restoreBatch(CompositeRestoreListener.java:89)
 ~[msgequator-1.59.3.jar:1.59.3]
        at 
org.apache.kafka.streams.processor.internals.StateRestorer.restore(StateRestorer.java:92)
 ~[msgequator-1.59.3.jar:1.59.3]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.processNext(StoreChangelogReader.java:350)
 ~[msgequator-1.59.3.jar:1.59.3]
        at 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:94)
 ~[msgequator-1.59.3.jar:1.59.3]
        at 
org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:401)
 ~[msgequator-1.59.3.jar:1.59.3]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:779)
 ~[msgequator-1.59.3.jar:1.59.3]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
 ~[msgequator-1.59.3.jar:1.59.3]
        at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670)
 ~[msgequator-1.59.3.jar:1.59.3] {code}
 

 

There's obviously no clear solution for this other than stopping/starting all 
instances at once. A rolling bounce that takes some time to complete (in our 
case, days) will break  instances that haven't been upgraded yet after a 
rebalance that causes older clients to pick up the newly encoded changelog 
partition(s)

 

I don't know if adding a flag on the client side that lists the supported 
protocol versions (so it behaves like Kafka Consumers when picking the 
rebalance protocol - cooperative or eager), or if it just needs to be 
explicitly stated on the migration guide that a full stop/start migration is 
required in cases where the protocol version changes.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to