Hi I have a KafkaStreams application that is too heavyweight, with 9 sub-topologies. I am trying to disable some unneeded part of the topology that is completely independent of the rest of the topology. Since my state stores have fixed, predictable names, I compared the topologies and I believe it should be safe to trim some sub-topologies. After trimming the unused ones, it now has 6 sub-topologies. Nevertheless, the application won't start. It seems to be trying to recover previous tasks, that shouldn't exist anymore. I have let the application down for 30 min so any timeouts, like session or polling timeouts could expire, but still, when the application starts, it reads the task states from somewhere and fails to recover it... Here's the log (note the "unknown task 7_0", which makes sense since the number of topologies felt from 9 to 6):
2022-01-26 02:28:17.552 [asdasdasd-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor - Decided on assignment: {1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([]) prevActiveTasks: ([]) prevStandbyTasks: ([0_0, 0_1, 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1, 7_0, 7_1, 8_0, 8_1, 9_0, 9_1]) changelogOffsetTotalsByTask: ([0_0=1244818, 0_1=625988, 1_0=15255, 1_1=64645, 2_0=670938, 2_1=100636, 3_0=6379662, 3_1=5600072, 4_0=2362, 4_1=15224, 5_0=19577, 5_1=113994, 6_0=7403980, 6_1=9195079, 7_0=226722, 7_1=76623, 8_0=7334, 8_1=66344, 9_0=0, 9_1=39]) taskLagTotals: ([0_0=3, 0_1=3, 1_0=1, 1_1=1, 2_0=1, 2_1=1, 3_0=7, 3_1=7, 4_0=1, 4_1=1, 5_0=1, 5_1=1, 6_0=1, 6_1=1]) capacity: 1 assigned: 14]} with no followup probing rebalance. 2022-01-26 02:28:17.558 [asdasdasd-StreamThread-1] INFO org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor - stream-thread [asdasdasd-StreamThread-1-consumer] Assigned tasks [0_0, 0_1, 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1] including stateful [0_0, 0_1, 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1] to clients as: 1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])]. 2022-01-26 02:28:17.566 [asdasdasd-StreamThread-1] INFO org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer instanceId=asdasdasd-1, clientId=asdasdasd-StreamThread-1-consumer, groupId=inventory-assembler-4] Rebalance failed. java.lang.IllegalStateException: Tried to lookup lag for unknown task 7_0 at org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor$$Lambda$534.00000000F9BC5F20.applyAsLong(Unknown Source) ~[?:?] at java.util.Comparator.lambda$comparingLong$6043328a$1(Unknown Source) ~[?:?] at java.util.Comparator$$Lambda$535.00000000F9B1D820.compare(Unknown Source) ~[?:?] at java.util.Comparator.lambda$thenComparing$36697e65$1(Unknown Source) ~[?:?] at java.util.Comparator$$Lambda$399.00000000FEBE6620.compare(Unknown Source) ~[?:?] at java.util.TreeMap.put(Unknown Source) ~[?:?] at java.util.TreeSet.add(Unknown Source) ~[?:?] at java.util.AbstractCollection.addAll(Unknown Source) ~[?:?] at java.util.TreeSet.addAll(Unknown Source) ~[?:?] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1205) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1119) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:845) ~[app.jar:?] at org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:405) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589) [app.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[app.jar:?] at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296) [app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237) [app.jar:?] at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:925) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583) [app.jar:?] at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556) [app.jar:?] Any idea how could I overcome this? I even tried to change ConsumerConfig. *GROUP_INSTANCE_ID_CONFIG *and StreamsConfig.*CLIENT_ID_CONFIG *but that didn't seem to work... Thanks Murilo