Hi
I have a KafkaStreams application that is too heavyweight, with 9
sub-topologies.
I am trying to disable some unneeded part of the topology that is
completely independent of the rest of the topology. Since my state stores
have fixed, predictable names, I compared the topologies and I believe it
should be safe to trim some sub-topologies.
After trimming the unused ones, it now has 6 sub-topologies.
Nevertheless, the application won't start. It seems to be trying to recover
previous tasks, that shouldn't exist anymore.
I have let the application down for 30 min so any timeouts, like session or
polling timeouts could expire, but still, when the application starts, it
reads the task states from somewhere and fails to recover it...
Here's the log (note the "unknown task 7_0", which makes sense since the
number of topologies felt from 9 to 6):

2022-01-26 02:28:17.552 [asdasdasd-StreamThread-1] INFO
 
org.apache.kafka.streams.processor.internals.assignment.HighAvailabilityTaskAssignor
- Decided on assignment:
{1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1,
2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])
prevActiveTasks: ([]) prevStandbyTasks: ([0_0, 0_1, 1_0, 1_1, 2_0, 2_1,
3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1, 7_0, 7_1, 8_0, 8_1, 9_0, 9_1])
changelogOffsetTotalsByTask: ([0_0=1244818, 0_1=625988, 1_0=15255,
1_1=64645, 2_0=670938, 2_1=100636, 3_0=6379662, 3_1=5600072, 4_0=2362,
4_1=15224, 5_0=19577, 5_1=113994, 6_0=7403980, 6_1=9195079, 7_0=226722,
7_1=76623, 8_0=7334, 8_1=66344, 9_0=0, 9_1=39]) taskLagTotals: ([0_0=3,
0_1=3, 1_0=1, 1_1=1, 2_0=1, 2_1=1, 3_0=7, 3_1=7, 4_0=1, 4_1=1, 5_0=1,
5_1=1, 6_0=1, 6_1=1]) capacity: 1 assigned: 14]} with no followup probing
rebalance.
2022-01-26 02:28:17.558 [asdasdasd-StreamThread-1] INFO
 org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor -
stream-thread [asdasdasd-StreamThread-1-consumer] Assigned tasks [0_0, 0_1,
1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1] including
stateful [0_0, 0_1, 1_0, 1_1, 2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0,
6_1] to clients as:
1699fdc2-3121-4987-9fb2-26fc5bd4fb48=[activeTasks: ([0_0, 0_1, 1_0, 1_1,
2_0, 2_1, 3_0, 3_1, 4_0, 4_1, 5_0, 5_1, 6_0, 6_1]) standbyTasks: ([])].
2022-01-26 02:28:17.566 [asdasdasd-StreamThread-1] INFO
 org.apache.kafka.clients.consumer.internals.AbstractCoordinator -
[Consumer instanceId=asdasdasd-1,
clientId=asdasdasd-StreamThread-1-consumer, groupId=inventory-assembler-4]
Rebalance failed.
java.lang.IllegalStateException: Tried to lookup lag for unknown task 7_0
at
org.apache.kafka.streams.processor.internals.assignment.ClientState.lagFor(ClientState.java:318)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor$$Lambda$534.00000000F9BC5F20.applyAsLong(Unknown
Source) ~[?:?]
at java.util.Comparator.lambda$comparingLong$6043328a$1(Unknown Source)
~[?:?]
at java.util.Comparator$$Lambda$535.00000000F9B1D820.compare(Unknown
Source) ~[?:?]
at java.util.Comparator.lambda$thenComparing$36697e65$1(Unknown Source)
~[?:?]
at java.util.Comparator$$Lambda$399.00000000FEBE6620.compare(Unknown
Source) ~[?:?]
at java.util.TreeMap.put(Unknown Source) ~[?:?]
at java.util.TreeSet.add(Unknown Source) ~[?:?]
at java.util.AbstractCollection.addAll(Unknown Source) ~[?:?]
at java.util.TreeSet.addAll(Unknown Source) ~[?:?]
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.getPreviousTasksByLag(StreamsPartitionAssignor.java:1205)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assignTasksToThreads(StreamsPartitionAssignor.java:1119)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.computeNewAssignment(StreamsPartitionAssignor.java:845)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamsPartitionAssignor.assign(StreamsPartitionAssignor.java:405)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:589)
[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:693)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1000(AbstractCoordinator.java:111)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:599)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:562)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1182)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1157)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1296)
[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1237)
[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1210)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:925)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:885)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:720)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:583)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:556)
[app.jar:?]

Any idea how could I overcome this? I even tried to change  ConsumerConfig.
*GROUP_INSTANCE_ID_CONFIG *and StreamsConfig.*CLIENT_ID_CONFIG *but that
didn't seem to work...

Thanks
Murilo

Reply via email to