Ok I looked into this a bit and found the bug. I'll open a PR with the fix sometime today: https://issues.apache.org/jira/browse/KAFKA-10689
I also think we can do a better job of surfacing issues like this, rather than letting the application silently spin without making progress. I left some thoughts on the JIRA ticket and will try to incorporate one of them into the fix as well. On Mon, Nov 2, 2020 at 9:38 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > Ok I tried to reproduce the issue with a minimal example, and saw the same > results. > It seems like there's something weird going on with that exact topology > that's causing > it to get stuck during the assignment. Maybe it's causing an unexpected > cycle in the > topology that the assignor can't handle? Pretty weird that even removing > the windowedBy > fixes the issue, since a topology with a windowed aggregation is pretty > much isomorphic > to one with just a regular aggregation. > > Can you create a JIRA ticket for this and include your observations + link > to the example? > It's definitely a bug, and we'll need to look into this to understand > what's going wrong here. > > Sorry for the trouble, but thanks for bring it to our attention > > On Wed, Oct 28, 2020 at 12:24 PM Alex Jablonski < > ajablon...@thoughtworks.com> wrote: > >> This block: >> >> @EmbeddedKafka( >> topics = { >> "WordCounts", "WordsForNumbers", "OutputTopic" >> } >> ) >> >> starts up an embedded Kafka in the test and creates the 3 topics (2 >> input and 1 output). By default it creates them with 2 partitions >> each, but changing to 1 partition didn't alter the endless-rebalancing >> behavior. >> >> We also see the endless rebalancing behavior in a real Kafka cluster, >> using input and output topics that have already been created (and are >> readily consumed from and written to). >> >> >> >> >> On Wed, Oct 28, 2020 at 12:45 PM Sophie Blee-Goldman <sop...@confluent.io >> > >> wrote: >> >> > Yeah there's definitely something weird going on (assuming this is the >> full >> > logs over that >> > time period). The last thing we see logged from the StreamThread is this >> > message from >> > around the start of the task assignment process: >> > >> > 2020-10-28 12:22:37.879 DEBUG 27226 --- [-StreamThread-1] >> > o.a.k.s.p.i.StreamsPartitionAssignor : stream-thread >> > >> > >> [demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer] >> > Constructed client metadata >> > {81060bdc-c8cc-4350-85f8-d238267e264e=ClientMetadata{hostInfo=null, >> > >> > >> consumers=[demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer-976853d9-06ad-4515-abf3-2a7398c12006], >> > state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) >> > prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) >> > prevOwnedPartitionsByConsumerId: ([]) capacity: 1]}} from the member >> > subscriptions. >> > >> > >> > >> > which is at 12:22:37. Then there's nothing else from Streams until at >> least >> > 12:25:00, >> > where the logs end. Not sure what it could be doing inside the assignor >> for >> > 2+ minutes >> > without ever reaching another...how many partitions are on the input >> > topics? Are you >> > sure the input topics have been pre-created before starting the app, >> with >> > the correct >> > names, etc? >> > >> > On Wed, Oct 28, 2020 at 10:29 AM Alex Jablonski < >> > ajablon...@thoughtworks.com> >> > wrote: >> > >> > > Hi Sophie, >> > > >> > > Thanks for your questions! Responses inline below. Also, I realized I >> > > linked to the gradle file, not the interesting bits of the example. >> This >> > > < >> > >> https://github.com/ajablonski/streams-issue-demo/blob/master/src/main/java/com/github/ajablonski/StreamsConfiguration.java >> > > >> > > is the configuration and this >> > > < >> > >> https://github.com/ajablonski/streams-issue-demo/blob/master/src/test/java/com/github/ajablonski/StreamsConfigurationTest.java >> > > >> > > is the test. >> > > >> > > On Tue, Oct 27, 2020 at 10:11 PM Sophie Blee-Goldman < >> > sop...@confluent.io> >> > > wrote: >> > > >> > >> > >> > >> > We've been able to get the crucial factors that cause this behavior >> > >> down to >> > >> > a particular combination >> > >> >> > >> What do you mean by this -- that you only see this when all four of >> > those >> > >> operators >> > >> are at play? Or do you see it with any of them. >> > >> >> > > >> > > We see this when all four operators are in play. If you change the >> sample >> > > streams configuration to not do that final foreign key join, or not >> use >> > > custom serdes for example, I don't see the stuck-state issue (the >> > > application transitions to running state just fine). >> > > >> > > >> > >> >> > >> I guess the first thing to narrow down is whether it's actually >> > >> rebalancing >> > >> or just >> > >> restoring within this time (the REBALANCING state is somewhat >> > >> misleadingly-named). >> > >> If this is a completely new app then it's probably not restoring, >> but if >> > >> this app had >> > >> already been running and building up state before hitting this issue >> > then >> > >> that's probably >> > >> the reason. It's not at all uncommon for restoration to take more >> than >> > 30 >> > >> seconds. >> > >> >> > > >> > > This is happening with the app in a completely new state -- in the >> test, >> > > for example, there's no pre-loaded data when we're asserting that the >> app >> > > should eventually get to RUNNING, and none of the internal topics >> exist. >> > > >> > > >> > >> If it really is rebalancing this entire time, then you need to look >> into >> > >> the logs to figure >> > >> out why. I don't see anything obviously wrong with your particular >> > >> application, and even >> > >> if there was it should never result in endless rebalances like this. >> How >> > >> many instances >> > >> of the application are you running? >> > >> >> > > >> > > In our actual application, we have 3 instances, but in the tests in >> that >> > > sample project, there's only 1. >> > > >> > > The logs that we're getting right before the application gets "stuck" >> are >> > > below. The one that seems most concerning to my uninformed eyes is >> > "Member >> > > >> > >> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 >> > > in group demo-application has failed". I've attached some DEBUG level >> > logs >> > > too, though nothing was obvious to me that would better explain the >> > hanging >> > > behavior. >> > > >> > > 2020-10-28 12:11:19.823 INFO 27127 --- [-StreamThread-1] >> > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer >> > > >> > >> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, >> > > groupId=demo-application] Discovered group coordinator localhost:50343 >> > (id: >> > > 2147483647 rack: null) >> > > 2020-10-28 12:11:19.825 INFO 27127 --- [-StreamThread-1] >> > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer >> > > >> > >> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, >> > > groupId=demo-application] (Re-)joining group >> > > 2020-10-28 12:11:19.842 WARN 27127 --- [-StreamThread-1] >> > > org.apache.kafka.clients.NetworkClient : [Consumer >> > > >> > >> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, >> > > groupId=demo-application] Error while fetching metadata with >> correlation >> > id >> > > 7 : >> > > >> > >> {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION, >> > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION, >> > > >> > >> demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION} >> > > 2020-10-28 12:11:19.860 INFO 27127 --- [-StreamThread-1] >> > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer >> > > >> > >> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, >> > > groupId=demo-application] Join group failed with >> > > org.apache.kafka.common.errors.MemberIdRequiredException: The group >> > member >> > > needs to have a valid member id before actually entering a consumer >> group >> > > 2020-10-28 12:11:19.861 INFO 27127 --- [-StreamThread-1] >> > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer >> > > >> > >> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, >> > > groupId=demo-application] (Re-)joining group >> > > 2020-10-28 12:11:19.873 INFO 27127 --- [quest-handler-3] >> > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: >> > Preparing >> > > to rebalance group demo-application in state PreparingRebalance with >> old >> > > generation 0 (__consumer_offsets-2) (reason: Adding new member >> > > >> > >> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 >> > > with group instance id None) >> > > 2020-10-28 12:11:19.882 INFO 27127 --- [cutor-Rebalance] >> > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: >> > Stabilized >> > > group demo-application generation 1 (__consumer_offsets-2) >> > > 2020-10-28 12:11:19.947 WARN 27127 --- [-StreamThread-1] >> > > org.apache.kafka.clients.NetworkClient : [Consumer >> > > >> > >> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, >> > > groupId=demo-application] Error while fetching metadata with >> correlation >> > id >> > > 9 : >> > > >> > >> {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION, >> > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION, >> > > >> > >> demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION} >> > > 2020-10-28 12:11:23.462 INFO 27127 --- [er-event-thread] >> > > kafka.controller.KafkaController : [Controller id=0] >> Processing >> > > automatic preferred replica leader election >> > > 2020-10-28 12:11:29.887 INFO 27127 --- [cutor-Heartbeat] >> > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: >> Member >> > > >> > >> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 >> > > in group demo-application has failed, removing it from the group >> > > 2020-10-28 12:11:29.887 INFO 27127 --- [cutor-Heartbeat] >> > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: >> > Preparing >> > > to rebalance group demo-application in state PreparingRebalance with >> old >> > > generation 1 (__consumer_offsets-2) (reason: removing member >> > > >> > >> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 >> > > on heartbeat expiration) >> > > 2020-10-28 12:11:29.888 INFO 27127 --- [cutor-Heartbeat] >> > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: Group >> > > demo-application with generation 2 is now empty (__consumer_offsets-2) >> > > >> > > >> > >> >> > >> Cheers, >> > >> Sophie >> > >> >> > >> On Thu, Oct 15, 2020 at 10:01 PM Alex Jablonski < >> > >> ajablon...@thoughtworks.com> >> > >> wrote: >> > >> >> > >> > Hey there! >> > >> > >> > >> > My team and I have run across a bit of a jam in our application >> where, >> > >> > given a particular setup, our Kafka Streams application never >> seems to >> > >> > start successfully, instead just getting stuck in the REBALANCING >> > state. >> > >> > We've been able to get the crucial factors that cause this behavior >> > >> down to >> > >> > a particular combination of (1) grouping, (2) windowing, (3) >> > >> aggregating, >> > >> > and (4) foreign-key joining, with some of those steps specifying >> > Serdes >> > >> > besides the default. >> > >> > >> > >> > It's probably more useful to see a minimal example, so there's one >> > here >> > >> > < >> > >> >> > >> https://github.com/ajablonski/streams-issue-demo/blob/master/build.gradle >> > >> > >. >> > >> > The underlying Kafka Streams version is 2.5.1. The first test >> should >> > >> show >> > >> > the application eventually transition to running state, but it >> doesn't >> > >> > within the 30 second timeout I've set. Interestingly, getting rid >> of >> > the >> > >> > 'Grouped.with' argument to the 'groupBy' function and the >> > >> > 'Materialized.with' in 'aggregate' in the 'StreamsConfiguration' >> lets >> > >> the >> > >> > application transition to "RUNNING", though without the correct >> Serdes >> > >> > that's not too valuable. >> > >> > >> > >> > There might be a cleaner way to organize the particular flow in the >> > toy >> > >> > example, but is there something fundamentally wrong with the >> approach >> > >> laid >> > >> > out in that application that would cause Streams to be stuck in >> > >> > REBALANCING? I'd appreciate any advice folks could give! >> > >> > >> > >> > Thanks! >> > >> > Alex Jablonski >> > >> > >> > >> >> > > >> > >> >