Nice! Sorry for not creating the ticket, but I appreciate your investigation (and fix!)
Thanks again! On Thu, Nov 5, 2020 at 5:31 PM Sophie Blee-Goldman <sop...@confluent.io> wrote: > Ok I looked into this a bit and found the bug. I'll open a PR with the fix > sometime today: > https://issues.apache.org/jira/browse/KAFKA-10689 > > I also think we can do a better job of surfacing issues like this, > rather than letting the > application silently spin without making progress. I left some thoughts on > the JIRA > ticket and will try to incorporate one of them into the fix as well. > > On Mon, Nov 2, 2020 at 9:38 PM Sophie Blee-Goldman <sop...@confluent.io> > wrote: > > > Ok I tried to reproduce the issue with a minimal example, and saw the > same > > results. > > It seems like there's something weird going on with that exact topology > > that's causing > > it to get stuck during the assignment. Maybe it's causing an unexpected > > cycle in the > > topology that the assignor can't handle? Pretty weird that even removing > > the windowedBy > > fixes the issue, since a topology with a windowed aggregation is pretty > > much isomorphic > > to one with just a regular aggregation. > > > > Can you create a JIRA ticket for this and include your observations + > link > > to the example? > > It's definitely a bug, and we'll need to look into this to understand > > what's going wrong here. > > > > Sorry for the trouble, but thanks for bring it to our attention > > > > On Wed, Oct 28, 2020 at 12:24 PM Alex Jablonski < > > ajablon...@thoughtworks.com> wrote: > > > >> This block: > >> > >> @EmbeddedKafka( > >> topics = { > >> "WordCounts", "WordsForNumbers", "OutputTopic" > >> } > >> ) > >> > >> starts up an embedded Kafka in the test and creates the 3 topics (2 > >> input and 1 output). By default it creates them with 2 partitions > >> each, but changing to 1 partition didn't alter the endless-rebalancing > >> behavior. > >> > >> We also see the endless rebalancing behavior in a real Kafka cluster, > >> using input and output topics that have already been created (and are > >> readily consumed from and written to). > >> > >> > >> > >> > >> On Wed, Oct 28, 2020 at 12:45 PM Sophie Blee-Goldman < > sop...@confluent.io > >> > > >> wrote: > >> > >> > Yeah there's definitely something weird going on (assuming this is the > >> full > >> > logs over that > >> > time period). The last thing we see logged from the StreamThread is > this > >> > message from > >> > around the start of the task assignment process: > >> > > >> > 2020-10-28 12:22:37.879 DEBUG 27226 --- [-StreamThread-1] > >> > o.a.k.s.p.i.StreamsPartitionAssignor : stream-thread > >> > > >> > > >> > [demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer] > >> > Constructed client metadata > >> > {81060bdc-c8cc-4350-85f8-d238267e264e=ClientMetadata{hostInfo=null, > >> > > >> > > >> > consumers=[demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer-976853d9-06ad-4515-abf3-2a7398c12006], > >> > state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([]) > >> > prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([]) > >> > prevOwnedPartitionsByConsumerId: ([]) capacity: 1]}} from the member > >> > subscriptions. > >> > > >> > > >> > > >> > which is at 12:22:37. Then there's nothing else from Streams until at > >> least > >> > 12:25:00, > >> > where the logs end. Not sure what it could be doing inside the > assignor > >> for > >> > 2+ minutes > >> > without ever reaching another...how many partitions are on the input > >> > topics? Are you > >> > sure the input topics have been pre-created before starting the app, > >> with > >> > the correct > >> > names, etc? > >> > > >> > On Wed, Oct 28, 2020 at 10:29 AM Alex Jablonski < > >> > ajablon...@thoughtworks.com> > >> > wrote: > >> > > >> > > Hi Sophie, > >> > > > >> > > Thanks for your questions! Responses inline below. Also, I realized > I > >> > > linked to the gradle file, not the interesting bits of the example. > >> This > >> > > < > >> > > >> > https://github.com/ajablonski/streams-issue-demo/blob/master/src/main/java/com/github/ajablonski/StreamsConfiguration.java > >> > > > >> > > is the configuration and this > >> > > < > >> > > >> > https://github.com/ajablonski/streams-issue-demo/blob/master/src/test/java/com/github/ajablonski/StreamsConfigurationTest.java > >> > > > >> > > is the test. > >> > > > >> > > On Tue, Oct 27, 2020 at 10:11 PM Sophie Blee-Goldman < > >> > sop...@confluent.io> > >> > > wrote: > >> > > > >> > >> > > >> > >> > We've been able to get the crucial factors that cause this > behavior > >> > >> down to > >> > >> > a particular combination > >> > >> > >> > >> What do you mean by this -- that you only see this when all four of > >> > those > >> > >> operators > >> > >> are at play? Or do you see it with any of them. > >> > >> > >> > > > >> > > We see this when all four operators are in play. If you change the > >> sample > >> > > streams configuration to not do that final foreign key join, or not > >> use > >> > > custom serdes for example, I don't see the stuck-state issue (the > >> > > application transitions to running state just fine). > >> > > > >> > > > >> > >> > >> > >> I guess the first thing to narrow down is whether it's actually > >> > >> rebalancing > >> > >> or just > >> > >> restoring within this time (the REBALANCING state is somewhat > >> > >> misleadingly-named). > >> > >> If this is a completely new app then it's probably not restoring, > >> but if > >> > >> this app had > >> > >> already been running and building up state before hitting this > issue > >> > then > >> > >> that's probably > >> > >> the reason. It's not at all uncommon for restoration to take more > >> than > >> > 30 > >> > >> seconds. > >> > >> > >> > > > >> > > This is happening with the app in a completely new state -- in the > >> test, > >> > > for example, there's no pre-loaded data when we're asserting that > the > >> app > >> > > should eventually get to RUNNING, and none of the internal topics > >> exist. > >> > > > >> > > > >> > >> If it really is rebalancing this entire time, then you need to look > >> into > >> > >> the logs to figure > >> > >> out why. I don't see anything obviously wrong with your particular > >> > >> application, and even > >> > >> if there was it should never result in endless rebalances like > this. > >> How > >> > >> many instances > >> > >> of the application are you running? > >> > >> > >> > > > >> > > In our actual application, we have 3 instances, but in the tests in > >> that > >> > > sample project, there's only 1. > >> > > > >> > > The logs that we're getting right before the application gets > "stuck" > >> are > >> > > below. The one that seems most concerning to my uninformed eyes is > >> > "Member > >> > > > >> > > >> > demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 > >> > > in group demo-application has failed". I've attached some DEBUG > level > >> > logs > >> > > too, though nothing was obvious to me that would better explain the > >> > hanging > >> > > behavior. > >> > > > >> > > 2020-10-28 12:11:19.823 INFO 27127 --- [-StreamThread-1] > >> > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > >> > > > >> > > >> > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > >> > > groupId=demo-application] Discovered group coordinator > localhost:50343 > >> > (id: > >> > > 2147483647 rack: null) > >> > > 2020-10-28 12:11:19.825 INFO 27127 --- [-StreamThread-1] > >> > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > >> > > > >> > > >> > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > >> > > groupId=demo-application] (Re-)joining group > >> > > 2020-10-28 12:11:19.842 WARN 27127 --- [-StreamThread-1] > >> > > org.apache.kafka.clients.NetworkClient : [Consumer > >> > > > >> > > >> > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > >> > > groupId=demo-application] Error while fetching metadata with > >> correlation > >> > id > >> > > 7 : > >> > > > >> > > >> > {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION, > >> > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION, > >> > > > >> > > >> > demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION} > >> > > 2020-10-28 12:11:19.860 INFO 27127 --- [-StreamThread-1] > >> > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > >> > > > >> > > >> > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > >> > > groupId=demo-application] Join group failed with > >> > > org.apache.kafka.common.errors.MemberIdRequiredException: The group > >> > member > >> > > needs to have a valid member id before actually entering a consumer > >> group > >> > > 2020-10-28 12:11:19.861 INFO 27127 --- [-StreamThread-1] > >> > > o.a.k.c.c.internals.AbstractCoordinator : [Consumer > >> > > > >> > > >> > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > >> > > groupId=demo-application] (Re-)joining group > >> > > 2020-10-28 12:11:19.873 INFO 27127 --- [quest-handler-3] > >> > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: > >> > Preparing > >> > > to rebalance group demo-application in state PreparingRebalance with > >> old > >> > > generation 0 (__consumer_offsets-2) (reason: Adding new member > >> > > > >> > > >> > demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 > >> > > with group instance id None) > >> > > 2020-10-28 12:11:19.882 INFO 27127 --- [cutor-Rebalance] > >> > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: > >> > Stabilized > >> > > group demo-application generation 1 (__consumer_offsets-2) > >> > > 2020-10-28 12:11:19.947 WARN 27127 --- [-StreamThread-1] > >> > > org.apache.kafka.clients.NetworkClient : [Consumer > >> > > > >> > > >> > clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer, > >> > > groupId=demo-application] Error while fetching metadata with > >> correlation > >> > id > >> > > 9 : > >> > > > >> > > >> > {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION, > >> > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION, > >> > > > >> > > >> > demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION} > >> > > 2020-10-28 12:11:23.462 INFO 27127 --- [er-event-thread] > >> > > kafka.controller.KafkaController : [Controller id=0] > >> Processing > >> > > automatic preferred replica leader election > >> > > 2020-10-28 12:11:29.887 INFO 27127 --- [cutor-Heartbeat] > >> > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: > >> Member > >> > > > >> > > >> > demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 > >> > > in group demo-application has failed, removing it from the group > >> > > 2020-10-28 12:11:29.887 INFO 27127 --- [cutor-Heartbeat] > >> > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: > >> > Preparing > >> > > to rebalance group demo-application in state PreparingRebalance with > >> old > >> > > generation 1 (__consumer_offsets-2) (reason: removing member > >> > > > >> > > >> > demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282 > >> > > on heartbeat expiration) > >> > > 2020-10-28 12:11:29.888 INFO 27127 --- [cutor-Heartbeat] > >> > > k.coordinator.group.GroupCoordinator : [GroupCoordinator 0]: > Group > >> > > demo-application with generation 2 is now empty > (__consumer_offsets-2) > >> > > > >> > > > >> > >> > >> > >> Cheers, > >> > >> Sophie > >> > >> > >> > >> On Thu, Oct 15, 2020 at 10:01 PM Alex Jablonski < > >> > >> ajablon...@thoughtworks.com> > >> > >> wrote: > >> > >> > >> > >> > Hey there! > >> > >> > > >> > >> > My team and I have run across a bit of a jam in our application > >> where, > >> > >> > given a particular setup, our Kafka Streams application never > >> seems to > >> > >> > start successfully, instead just getting stuck in the REBALANCING > >> > state. > >> > >> > We've been able to get the crucial factors that cause this > behavior > >> > >> down to > >> > >> > a particular combination of (1) grouping, (2) windowing, (3) > >> > >> aggregating, > >> > >> > and (4) foreign-key joining, with some of those steps specifying > >> > Serdes > >> > >> > besides the default. > >> > >> > > >> > >> > It's probably more useful to see a minimal example, so there's > one > >> > here > >> > >> > < > >> > >> > >> > > >> > https://github.com/ajablonski/streams-issue-demo/blob/master/build.gradle > >> > >> > >. > >> > >> > The underlying Kafka Streams version is 2.5.1. The first test > >> should > >> > >> show > >> > >> > the application eventually transition to running state, but it > >> doesn't > >> > >> > within the 30 second timeout I've set. Interestingly, getting rid > >> of > >> > the > >> > >> > 'Grouped.with' argument to the 'groupBy' function and the > >> > >> > 'Materialized.with' in 'aggregate' in the 'StreamsConfiguration' > >> lets > >> > >> the > >> > >> > application transition to "RUNNING", though without the correct > >> Serdes > >> > >> > that's not too valuable. > >> > >> > > >> > >> > There might be a cleaner way to organize the particular flow in > the > >> > toy > >> > >> > example, but is there something fundamentally wrong with the > >> approach > >> > >> laid > >> > >> > out in that application that would cause Streams to be stuck in > >> > >> > REBALANCING? I'd appreciate any advice folks could give! > >> > >> > > >> > >> > Thanks! > >> > >> > Alex Jablonski > >> > >> > > >> > >> > >> > > > >> > > >> > > >