John Roesler created KAFKA-13002:
------------------------------------
Summary: dev branch Streams not able to fetch end offsets from
pre-3.0 brokers
Key: KAFKA-13002
URL: https://issues.apache.org/jira/browse/KAFKA-13002
Project: Kafka
Issue Type: Bug
Components: streams
Reporter: John Roesler
Fix For: 3.0.0
Attachments: soaks.png
Note: this is not a report against a released version of AK. It seems to be a
problem on the trunk development branch only.
After deploying our soak test against `trunk/HEAD` on Friday, I noticed that
Streams is no longer processing:
!soaks.png!
I found this stacktrace in the logs during startup:
{code:java}
5075 [2021-06-25T16:50:44-05:00]
(streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog)
[2021-06-25 21:50:44,499] WARN [i-0691913411e8c77c3-StreamThread-1] The
listOffsets request failed.
(org.apache.kafka.streams.processor.internals.ClientUtils)
5076 [2021-06-25T16:50:44-05:00]
(streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog)
java.util.concurrent.ExecutionException:
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not
support LIST_OFFSETS with version in range [7,7]. The supported range is
[0,6].
5077 at
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
5078 at
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
5079 at
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
5080 at
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
5081 at
org.apache.kafka.streams.processor.internals.ClientUtils.getEndOffsets(ClientUtils.java:147)
5082 at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.populateClientStatesMap(StreamsPartitionAssignor.java:643)
5083 at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToClients(StreamsPartitionAssignor.java:579)
5084 at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:387)
5085 at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
5086 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689)
5087 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
5088 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:593)
5089 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:556)
5090 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1178)
5091 at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1153)
5092 at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
5093 at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
5094 at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
5095 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
5096 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
5097 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
5098 at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
5099 at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1297)
5100 at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1238)
5101 at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
5102 at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:932)
5103 at
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885)
5104 at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
5105 at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
5106 at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:555)
{code}
Just eyeballing the recent commits, I'm guessing it was due to
[https://github.com/apache/kafka/commit/bd72ef1bf1e40feb3bc17349a385b479fa5fa530]
. It looks like that code sets the initial "minimum version" to 7, but then
should back off into compatibility mode. Therefore, maybe that stacktrace is
expected (though it's not great UX regardless).
However, it does not seem like Streams is actually able to back off. The next
thing I see is:
{code:java}
[2021-06-25T16:50:44-05:00]
(streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog)
[2021-06-25 21:50:44,507] WARN [i-0691913411e8c77c3-StreamThread-1] Task 3_2
had endOffsetSum=-3 smaller than offsetSum=0 on member
24e46b47-0a01-4b57-9d15-771482869097. This probably means the task is
corrupted, which in turn indicates that it will need to restore from scratch if
it gets assigned. The assignor will de-prioritize returning this task to this
member in the hopes that some other member may be able to re-use its state.
(org.apache.kafka.streams.processor.internals.assignment.ClientState) {code}
Which is itself a problem. It looks like there's a sentinel "-3" value returned
as the end offset, but since that value is lower than any real endOffset
Streams will have book-kept, Streams will assume that all its local state is
corrupt. The result is that Streams will delete all its local state and rebuild
from the changelog. This isn't an ideal behavior on restart.
Finally, I never actually see Streams able to proceed with processing. The only
thing it logs after this point (as far as I can tell) is:
{code:java}
[2021-06-25T16:50:54-05:00]
(streams-soak-trunk-ccloud-alos_soak_i-0691913411e8c77c3_streamslog)
[2021-06-25 21:50:52,463] INFO [i-0691913411e8c77c3-StreamThread-1]
stream-thread [i-0691913411e8c77c3-StreamThread-1] End offset for changelog
stream-soak-test-trunk-ccloud-alos--KSTREAM-AGGREGATE-STATE-STORE-0000000013-changelog-0
cannot be found; will retry in the next time.
(org.apache.kafka.streams.processor.internals.StoreChangelogReader) {code}
So, it seems the version backoff simply isn't working.
Obviously, we'll need to fix these problems before we can release 3.0
--
This message was sent by Atlassian Jira
(v8.3.4#803005)