John Roesler created KAFKA-13002:
------------------------------------

             Summary: dev branch Streams not able to fetch end offsets from 
pre-3.0 brokers
                 Key: KAFKA-13002
                 URL: https://issues.apache.org/jira/browse/KAFKA-13002
             Project: Kafka
          Issue Type: Bug
          Components: streams
            Reporter: John Roesler
             Fix For: 3.0.0
         Attachments: soaks.png

Note: this is not a report against a released version of AK. It seems to be a 
problem on the trunk development branch only.

After deploying our soak test against `trunk/HEAD` on Friday, I noticed that 
Streams is no longer processing:

!soaks.png!

I found this stacktrace in the logs during startup:
{code:java}
5075 [2021-06-25T16:50:44-05:00] 
(streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
[2021-06-25 21:50:44,499] WARN [i-0691913411e8c77c3-StreamThread-1] The 
listOffsets request failed. 
(org.apache.kafka.streams.processor.internals.ClientUtils)
 5076 [2021-06-25T16:50:44-05:00] 
(streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not 
support LIST_OFFSETS with version in range [7,7].       The supported range is 
[0,6].
 5077         at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
 5078         at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
 5079         at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
 5080         at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
 5081         at 
org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:147)
 5082         at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:643)
 5083         at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:579)
 5084         at 
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387)
 5085         at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
 5086         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
 5087         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
 5088         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:593)
 5089         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:556)
 5090         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1178)
 5091         at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1153)
 5092         at 
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
 5093         at 
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
 5094         at 
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
 5095         at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
 5096         at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
 5097         at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
 5098         at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
 5099         at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
 5100         at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
 5101         at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
 5102         at 
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:932)
 5103         at 
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885)
 5104         at 
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
 5105         at 
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
 5106         at 
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
 {code}
Just eyeballing the recent commits, I'm guessing it was due to 
[https://github.com/apache/kafka/commit/bd72ef1bf1e40feb3bc17349a385b479fa5fa530]
 . It looks like that code sets the initial "minimum version" to 7, but then 
should back off into compatibility mode. Therefore, maybe that stacktrace is 
expected (though it's not great UX regardless).

However, it does not seem like Streams is actually able to back off. The next 
thing I see is:
{code:java}
[2021-06-25T16:50:44-05:00] 
(streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
[2021-06-25 21:50:44,507] WARN [i-0691913411e8c77c3-StreamThread-1] Task 3_2 
had endOffsetSum=-3 smaller than offsetSum=0 on member 
24e46b47-0a01-4b57-9d15-771482869097. This probably means the task is 
corrupted, which in turn indicates that it will need to restore from scratch if 
it gets assigned. The assignor will de-prioritize returning this task to this 
member in the hopes that some other member may be able to re-use its state. 
(org.apache.kafka.streams.processor.internals.assignment.ClientState) {code}
Which is itself a problem. It looks like there's a sentinel "-3" value returned 
as the end offset, but since that value is lower than any real endOffset 
Streams will have book-kept, Streams will assume that all its local state is 
corrupt. The result is that Streams will delete all its local state and rebuild 
from the changelog. This isn't an ideal behavior on restart.

Finally, I never actually see Streams able to proceed with processing. The only 
thing it logs after this point (as far as I can tell) is:
{code:java}
[2021-06-25T16:50:54-05:00] 
(streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog) 
[2021-06-25 21:50:52,463] INFO [i-0691913411e8c77c3-StreamThread-1] 
stream-thread [i-0691913411e8c77c3-StreamThread-1] End offset for changelog 
stream-soak-test-trunk-ccloud-alos--KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-0
 cannot be found; will retry in the next time. 
(org.apache.kafka.streams.processor.internals.StoreChangelogReader) {code}
So, it seems the version backoff simply isn't working.

Obviously, we'll need to fix these problems before we can release 3.0



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to