[ https://issues.apache.org/jira/browse/KAFKA-12780?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17344131#comment-17344131 ]
A. Sophie Blee-Goldman commented on KAFKA-12780: ------------------------------------------------ Hey [~iirekm2] This exception means there's a discrepancy between the tasks that a client is reporting based on its current assignment + local state on disk, and the tasks that the assignor has parsed from the current topology. It means something has gone wrong with your app, but there can be a few different causes. Given your description of hitting this "every second or third time [you] rerun the application", my best guess is that you may have an application that is generating at least some subtopologies in a random order. This has been reported before by [someone who was using Sprint Boot|https://issues.apache.org/jira/browse/KAFKA-5882?focusedCommentId=16554237&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-16554237] – is it possible your app is experiencing the same issue? You can print out the topology on startup and compare it across reruns to check. > java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 > ------------------------------------------------------------------------- > > Key: KAFKA-12780 > URL: https://issues.apache.org/jira/browse/KAFKA-12780 > Project: Kafka > Issue Type: Bug > Reporter: Ireneusz Matysiewicz > Priority: Major > > Whenever I try doing a stream.join(table, joiner), I get the following error > ever second or third time I rerun the application. > Kafka Streams version: 2.8.0 > The default stores config is used (simply writing to /tmp directory). > Changing state.dir to other location doesn't help. > > {code:java} > 22:36:35.205 [xxxxxx-94ada1ab-8573-4245-9742-3322e412598b-StreamThread-1] > WARN o.a.k.s.p.i.assignment.ClientState - Task 1_22 had endOffsetSum=3 > smaller than offsetSum=13 on member 94ada1ab-8573-4245-9742-3322e412598b. > This probably means the task is corrupted, which in turn indicates that it > will need to restore from scratch if it gets assigned. The assignor will > de-prioritize returning this task to this member in the hopes that some other > member may be able to re-use its state.22:36:35.205 > [xxxxxx-94ada1ab-8573-4245-9742-3322e412598b-StreamThread-1] WARN > o.a.k.s.p.i.assignment.ClientState - Task 1_22 had endOffsetSum=3 smaller > than offsetSum=13 on member 94ada1ab-8573-4245-9742-3322e412598b. This > probably means the task is corrupted, which in turn indicates that it will > need to restore from scratch if it gets assigned. The assignor will > de-prioritize returning this task to this member in the hopes that some other > member may be able to re-use its state.22:36:35.224 > [xxxxxx-94ada1ab-8573-4245-9742-3322e412598b-StreamThread-1] ERROR > o.apache.kafka.streams.KafkaStreams - stream-client > [xxxxxx-94ada1ab-8573-4245-9742-3322e412598b] Encountered the following > exception during processing and the registered exception handler opted to > SHUTDOWN_CLIENT. The streams client is going to shut down now. > java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at > org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318) > at > java.base/java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) > at > java.base/java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) > at java.base/java.util.TreeMap.put(TreeMap.java:550) at > java.base/java.util.TreeSet.add(TreeSet.java:255) at > java.base/java.util.AbstractCollection.addAll(AbstractCollection.java:352) at > java.base/java.util.TreeSet.addAll(TreeSet.java:312) at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1205) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1119) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:845) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:405) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:691) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:597) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:560) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1177) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1152) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:925) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556){code} -- This message was sent by Atlassian Jira (v8.3.4#803005)