[ https://issues.apache.org/jira/browse/KAFKA-10191?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17142291#comment-17142291 ]
Sophie Blee-Goldman commented on KAFKA-10191: --------------------------------------------- I think this could be caused by a client having a task in its prevTasks set that is not also in its taskLagTotals, ie the task is no longer among the current stateful tasks for this rebalance. How we could _lose_ a task, I'm not sure, but I don't think we should crash over it. > fix flaky StreamsOptimizedTest > ------------------------------ > > Key: KAFKA-10191 > URL: https://issues.apache.org/jira/browse/KAFKA-10191 > Project: Kafka > Issue Type: Test > Components: streams, unit tests > Reporter: Chia-Ping Tsai > Assignee: Chia-Ping Tsai > Priority: Major > > {quote}Exception in thread > "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" > java.lang.IllegalStateException: Tried to lookup lag for unknown task > 2_0Exception in thread > "StreamsOptimizedTest-53c7d3b1-12b2-4d02-90b1-15757dfd2735-StreamThread-1" > java.lang.IllegalStateException: Tried to lookup lag for unknown task 2_0 at > org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:306) > at java.util.Comparator.lambda$comparingLong$6043328a$1(Comparator.java:511) > at java.util.Comparator.lambda$thenComparing$36697e65$1(Comparator.java:216) > at java.util.TreeMap.put(TreeMap.java:552) at > java.util.TreeSet.add(TreeSet.java:255) at > java.util.AbstractCollection.addAll(AbstractCollection.java:344) at > java.util.TreeSet.addAll(TreeSet.java:312) at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1250) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1164) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:920) > at > org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:391) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:583) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:689) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1400(AbstractCoordinator.java:111) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:602) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:575) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1132) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1107) > at > org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) > at > org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:506) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1263) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1229) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1204) > at > org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:762) > at > org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:622) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:549) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:508) > {quote} > > this issue may be related to > [https://github.com/apache/kafka/commit/0f68dc7a640b26a8edea154ea4ea2b6d93b5104b] > since the test passes If the commit is reverted -- This message was sent by Atlassian Jira (v8.3.4#803005)