[ 
https://issues.apache.org/jira/browse/KAFKA-13002?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17370653#comment-17370653
 ] 

Tom Scott commented on KAFKA-13002:
-----------------------------------

I think I've determined the problem, I'll get a PR together ASAP

> dev branch Streams not able to fetch end offsets from pre-3.0 brokers
> ---------------------------------------------------------------------
>
>                 Key: KAFKA-13002
>                 URL: https://issues.apache.org/jira/browse/KAFKA-13002
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: John Roesler
>            Assignee: Tom Scott
>            Priority: Blocker
>             Fix For: 3.0.0
>
>         Attachments: soaks.png
>
>
> Note: this is not a report against a released version of AK. It seems to be a 
> problem on the trunk development branch only.
> After deploying our soak test against `trunk/HEAD` on Friday, I noticed that 
> Streams is no longer processing:
> !soaks.png!
> I found this stacktrace in the logs during startup:
> {code:java}
> 5075 [2021-06-25T16:50:44-05:00] 
> (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
> [2021-06-25 21:50:44,499] WARN [i-0691913411e8c77c3-StreamThread-1] The 
> listOffsets request failed. 
> (org.apache.kafka.streams.processor.internals.ClientUtils)
>  5076 [2021-06-25T16:50:44-05:00] 
> (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
> java.util.concurrent.ExecutionException: 
> org.apache.kafka.common.errors.UnsupportedVersionException: The broker does 
> not support LIST_OFFSETS with version in range [7,7].       The supported 
> range is [0,6].
>  5077         at 
> org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
>  5078         at 
> org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
>  5079         at 
> org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
>  5080         at 
> org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
>  5081         at 
> org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:147)
>  5082         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:643)
>  5083         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:579)
>  5084         at 
> org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387)
>  5085         at 
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
>  5086         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
>  5087         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
>  5088         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:593)
>  5089         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:556)
>  5090         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1178)
>  5091         at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1153)
>  5092         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
>  5093         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
>  5094         at 
> org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
>  5095         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
>  5096         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
>  5097         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
>  5098         at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>  5099         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
>  5100         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
>  5101         at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
>  5102         at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:932)
>  5103         at 
> org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885)
>  5104         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
>  5105         at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
>  5106         at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
>  {code}
> Just eyeballing the recent commits, I'm guessing it was due to 
> [https://github.com/apache/kafka/commit/bd72ef1bf1e40feb3bc17349a385b479fa5fa530]
>  . It looks like that code sets the initial "minimum version" to 7, but then 
> should back off into compatibility mode. Therefore, maybe that stacktrace is 
> expected (though it's not great UX regardless).
> However, it does not seem like Streams is actually able to back off. The next 
> thing I see is:
> {code:java}
> [2021-06-25T16:50:44-05:00] 
> (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
> [2021-06-25 21:50:44,507] WARN [i-0691913411e8c77c3-StreamThread-1] Task 3_2 
> had endOffsetSum=-3 smaller than offsetSum=0 on member 
> 24e46b47-0a01-4b57-9d15-771482869097. This probably means the task is 
> corrupted, which in turn indicates that it will need to restore from scratch 
> if it gets assigned. The assignor will de-prioritize returning this task to 
> this member in the hopes that some other member may be able to re-use its 
> state. (org.apache.kafka.streams.processor.internals.assignment.ClientState) 
> {code}
> Which is itself a problem. It looks like there's a sentinel "-3" value 
> returned as the end offset, but since that value is lower than any real 
> endOffset Streams will have book-kept, Streams will assume that all its local 
> state is corrupt. The result is that Streams will delete all its local state 
> and rebuild from the changelog. This isn't an ideal behavior on restart.
> Finally, I never actually see Streams able to proceed with processing. The 
> only thing it logs after this point (as far as I can tell) is:
> {code:java}
> [2021-06-25T16:50:54-05:00] 
> (streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
> [2021-06-25 21:50:52,463] INFO [i-0691913411e8c77c3-StreamThread-1] 
> stream-thread [i-0691913411e8c77c3-StreamThread-1] End offset for changelog 
> stream-soak-test-trunk-ccloud-alos--KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-0
>  cannot be found; will retry in the next time. 
> (org.apache.kafka.streams.processor.internals.StoreChangelogReader) {code}
> So, it seems the version backoff simply isn't working.
> Obviously, we'll need to fix these problems before we can release 3.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to