John Roesler created KAFKA-13002: ------------------------------------ Summary: dev branch Streams not able to fetch end offsets from pre-3.0 brokers Key: KAFKA-13002 URL: https://issues.apache.org/jira/browse/KAFKA-13002 Project: Kafka Issue Type: Bug Components: streams Reporter: John Roesler Fix For: 3.0.0 Attachments: soaks.png
Note: this is not a report against a released version of AK. It seems to be a problem on the trunk development branch only. After deploying our soak test against `trunk/HEAD` on Friday, I noticed that Streams is no longer processing: !soaks.png! I found this stacktrace in the logs during startup: {code:java} 5075 [2021-06-25T16:50:44-05:00] (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) [2021-06-25 21:50:44,499] WARN [i-0691913411e8c77c3-StreamThread-1] The listOffsets request failed. (org.apache.kafka.streams.processor.internals.ClientUtils) 5076 [2021-06-25T16:50:44-05:00] (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support LIST_OFFSETS with version in range [7,7]. The supported range is [0,6]. 5077 at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45) 5078 at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32) 5079 at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89) 5080 at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260) 5081 at org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:147) 5082 at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:643) 5083 at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:579) 5084 at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387) 5085 at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589) 5086 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) 5087 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111) 5088 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:593) 5089 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:556) 5090 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1178) 5091 at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1153) 5092 at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) 5093 at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) 5094 at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) 5095 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) 5096 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) 5097 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) 5098 at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) 5099 at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297) 5100 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238) 5101 at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211) 5102 at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:932) 5103 at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885) 5104 at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720) 5105 at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) 5106 at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555) {code} Just eyeballing the recent commits, I'm guessing it was due to [https://github.com/apache/kafka/commit/bd72ef1bf1e40feb3bc17349a385b479fa5fa530] . It looks like that code sets the initial "minimum version" to 7, but then should back off into compatibility mode. Therefore, maybe that stacktrace is expected (though it's not great UX regardless). However, it does not seem like Streams is actually able to back off. The next thing I see is: {code:java} [2021-06-25T16:50:44-05:00] (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) [2021-06-25 21:50:44,507] WARN [i-0691913411e8c77c3-StreamThread-1] Task 3_2 had endOffsetSum=-3 smaller than offsetSum=0 on member 24e46b47-0a01-4b57-9d15-771482869097. This probably means the task is corrupted, which in turn indicates that it will need to restore from scratch if it gets assigned. The assignor will de-prioritize returning this task to this member in the hopes that some other member may be able to re-use its state. (org.apache.kafka.streams.processor.internals.assignment.ClientState) {code} Which is itself a problem. It looks like there's a sentinel "-3" value returned as the end offset, but since that value is lower than any real endOffset Streams will have book-kept, Streams will assume that all its local state is corrupt. The result is that Streams will delete all its local state and rebuild from the changelog. This isn't an ideal behavior on restart. Finally, I never actually see Streams able to proceed with processing. The only thing it logs after this point (as far as I can tell) is: {code:java} [2021-06-25T16:50:54-05:00] (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) [2021-06-25 21:50:52,463] INFO [i-0691913411e8c77c3-StreamThread-1] stream-thread [i-0691913411e8c77c3-StreamThread-1] End offset for changelog stream-soak-test-trunk-ccloud-alos--KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-0 cannot be found; will retry in the next time. (org.apache.kafka.streams.processor.internals.StoreChangelogReader) {code} So, it seems the version backoff simply isn't working. Obviously, we'll need to fix these problems before we can release 3.0 -- This message was sent by Atlassian Jira (v8.3.4#803005)