Ok I looked into this a bit and found the bug. I'll open a PR with the fix
sometime today:
https://issues.apache.org/jira/browse/KAFKA-10689

I also think we can do a better job of surfacing issues like this,
rather than letting the
application silently spin without making progress. I left some thoughts on
the JIRA
ticket and will try to incorporate one of them into the fix as well.

On Mon, Nov 2, 2020 at 9:38 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

> Ok I tried to reproduce the issue with a minimal example, and saw the same
> results.
> It seems like there's something weird going on with that exact topology
> that's causing
> it to get stuck during the assignment. Maybe it's causing an unexpected
> cycle in the
> topology that the assignor can't handle? Pretty weird that even removing
> the windowedBy
> fixes the issue, since a topology with a windowed aggregation is pretty
> much isomorphic
> to one with just a regular aggregation.
>
> Can you create a JIRA ticket for this and include your observations + link
> to the example?
> It's definitely a bug, and we'll need to look into this to understand
> what's going wrong here.
>
> Sorry for the trouble, but thanks for bring it to our attention
>
> On Wed, Oct 28, 2020 at 12:24 PM Alex Jablonski <
> ajablon...@thoughtworks.com> wrote:
>
>> This block:
>>
>> @EmbeddedKafka(
>>         topics = {
>>                 "WordCounts", "WordsForNumbers", "OutputTopic"
>>         }
>> )
>>
>> starts up an embedded Kafka in the test and creates the 3 topics (2
>> input and 1 output). By default it creates them with 2 partitions
>> each, but changing to 1 partition didn't alter the endless-rebalancing
>> behavior.
>>
>> We also see the endless rebalancing behavior in a real Kafka cluster,
>> using input and output topics that have already been created (and are
>> readily consumed from and written to).
>>
>>
>>
>>
>> On Wed, Oct 28, 2020 at 12:45 PM Sophie Blee-Goldman <sop...@confluent.io
>> >
>> wrote:
>>
>> > Yeah there's definitely something weird going on (assuming this is the
>> full
>> > logs over that
>> > time period). The last thing we see logged from the StreamThread is this
>> > message from
>> > around the start of the task assignment process:
>> >
>> > 2020-10-28 12:22:37.879 DEBUG 27226 --- [-StreamThread-1]
>> > o.a.k.s.p.i.StreamsPartitionAssignor     : stream-thread
>> >
>> >
>> [demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer]
>> > Constructed client metadata
>> > {81060bdc-c8cc-4350-85f8-d238267e264e=ClientMetadata{hostInfo=null,
>> >
>> >
>> consumers=[demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer-976853d9-06ad-4515-abf3-2a7398c12006],
>> > state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([])
>> > prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([])
>> > prevOwnedPartitionsByConsumerId: ([]) capacity: 1]}} from the member
>> > subscriptions.
>> >
>> >
>> >
>> > which is at 12:22:37. Then there's nothing else from Streams until at
>> least
>> > 12:25:00,
>> > where the logs end. Not sure what it could be doing inside the assignor
>> for
>> > 2+ minutes
>> > without ever reaching another...how many partitions are on the input
>> > topics? Are you
>> > sure the input topics have been pre-created before starting the app,
>> with
>> > the correct
>> > names, etc?
>> >
>> > On Wed, Oct 28, 2020 at 10:29 AM Alex Jablonski <
>> > ajablon...@thoughtworks.com>
>> > wrote:
>> >
>> > > Hi Sophie,
>> > >
>> > > Thanks for your questions! Responses inline below. Also, I realized I
>> > > linked to the gradle file, not the interesting bits of the example.
>> This
>> > > <
>> >
>> https://github.com/ajablonski/streams-issue-demo/blob/master/src/main/java/com/github/ajablonski/StreamsConfiguration.java
>> > >
>> > > is the configuration and this
>> > > <
>> >
>> https://github.com/ajablonski/streams-issue-demo/blob/master/src/test/java/com/github/ajablonski/StreamsConfigurationTest.java
>> > >
>> > > is the test.
>> > >
>> > > On Tue, Oct 27, 2020 at 10:11 PM Sophie Blee-Goldman <
>> > sop...@confluent.io>
>> > > wrote:
>> > >
>> > >> >
>> > >> > We've been able to get the crucial factors that cause this behavior
>> > >> down to
>> > >> > a particular combination
>> > >>
>> > >> What do you mean by this -- that you only see this when all four of
>> > those
>> > >> operators
>> > >> are at play? Or do you see it with any of them.
>> > >>
>> > >
>> > > We see this when all four operators are in play. If you change the
>> sample
>> > > streams configuration to not do that final foreign key join, or not
>> use
>> > > custom serdes for example, I don't see the stuck-state issue (the
>> > > application transitions to running state just fine).
>> > >
>> > >
>> > >>
>> > >> I guess the first thing to narrow down is whether it's actually
>> > >> rebalancing
>> > >> or just
>> > >> restoring within this time (the REBALANCING state is somewhat
>> > >> misleadingly-named).
>> > >> If this is a completely new app then it's probably not restoring,
>> but if
>> > >> this app had
>> > >> already been running and building up state before hitting this issue
>> > then
>> > >> that's probably
>> > >> the reason. It's not at all uncommon for restoration to take more
>> than
>> > 30
>> > >> seconds.
>> > >>
>> > >
>> > > This is happening with the app in a completely new state -- in the
>> test,
>> > > for example, there's no pre-loaded data when we're asserting that the
>> app
>> > > should eventually get to RUNNING, and none of the internal topics
>> exist.
>> > >
>> > >
>> > >> If it really is rebalancing this entire time, then you need to look
>> into
>> > >> the logs to figure
>> > >> out why. I don't see anything obviously wrong with your particular
>> > >> application, and even
>> > >> if there was it should never result in endless rebalances like this.
>> How
>> > >> many instances
>> > >> of the application are you running?
>> > >>
>> > >
>> > > In our actual application, we have 3 instances, but in the tests in
>> that
>> > > sample project, there's only 1.
>> > >
>> > > The logs that we're getting right before the application gets "stuck"
>> are
>> > > below. The one that seems most concerning to my uninformed eyes is
>> > "Member
>> > >
>> >
>> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
>> > > in group demo-application has failed". I've attached some DEBUG level
>> > logs
>> > > too, though nothing was obvious to me that would better explain the
>> > hanging
>> > > behavior.
>> > >
>> > > 2020-10-28 12:11:19.823  INFO 27127 --- [-StreamThread-1]
>> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
>> > >
>> >
>> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
>> > > groupId=demo-application] Discovered group coordinator localhost:50343
>> > (id:
>> > > 2147483647 rack: null)
>> > > 2020-10-28 12:11:19.825  INFO 27127 --- [-StreamThread-1]
>> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
>> > >
>> >
>> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
>> > > groupId=demo-application] (Re-)joining group
>> > > 2020-10-28 12:11:19.842  WARN 27127 --- [-StreamThread-1]
>> > > org.apache.kafka.clients.NetworkClient   : [Consumer
>> > >
>> >
>> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
>> > > groupId=demo-application] Error while fetching metadata with
>> correlation
>> > id
>> > > 7 :
>> > >
>> >
>> {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION,
>> > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION,
>> > >
>> >
>> demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION}
>> > > 2020-10-28 12:11:19.860  INFO 27127 --- [-StreamThread-1]
>> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
>> > >
>> >
>> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
>> > > groupId=demo-application] Join group failed with
>> > > org.apache.kafka.common.errors.MemberIdRequiredException: The group
>> > member
>> > > needs to have a valid member id before actually entering a consumer
>> group
>> > > 2020-10-28 12:11:19.861  INFO 27127 --- [-StreamThread-1]
>> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
>> > >
>> >
>> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
>> > > groupId=demo-application] (Re-)joining group
>> > > 2020-10-28 12:11:19.873  INFO 27127 --- [quest-handler-3]
>> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
>> > Preparing
>> > > to rebalance group demo-application in state PreparingRebalance with
>> old
>> > > generation 0 (__consumer_offsets-2) (reason: Adding new member
>> > >
>> >
>> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
>> > > with group instance id None)
>> > > 2020-10-28 12:11:19.882  INFO 27127 --- [cutor-Rebalance]
>> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
>> > Stabilized
>> > > group demo-application generation 1 (__consumer_offsets-2)
>> > > 2020-10-28 12:11:19.947  WARN 27127 --- [-StreamThread-1]
>> > > org.apache.kafka.clients.NetworkClient   : [Consumer
>> > >
>> >
>> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
>> > > groupId=demo-application] Error while fetching metadata with
>> correlation
>> > id
>> > > 9 :
>> > >
>> >
>> {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION,
>> > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION,
>> > >
>> >
>> demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION}
>> > > 2020-10-28 12:11:23.462  INFO 27127 --- [er-event-thread]
>> > > kafka.controller.KafkaController         : [Controller id=0]
>> Processing
>> > > automatic preferred replica leader election
>> > > 2020-10-28 12:11:29.887  INFO 27127 --- [cutor-Heartbeat]
>> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
>> Member
>> > >
>> >
>> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
>> > > in group demo-application has failed, removing it from the group
>> > > 2020-10-28 12:11:29.887  INFO 27127 --- [cutor-Heartbeat]
>> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
>> > Preparing
>> > > to rebalance group demo-application in state PreparingRebalance with
>> old
>> > > generation 1 (__consumer_offsets-2) (reason: removing member
>> > >
>> >
>> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
>> > > on heartbeat expiration)
>> > > 2020-10-28 12:11:29.888  INFO 27127 --- [cutor-Heartbeat]
>> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]: Group
>> > > demo-application with generation 2 is now empty (__consumer_offsets-2)
>> > >
>> > >
>> > >>
>> > >> Cheers,
>> > >> Sophie
>> > >>
>> > >> On Thu, Oct 15, 2020 at 10:01 PM Alex Jablonski <
>> > >> ajablon...@thoughtworks.com>
>> > >> wrote:
>> > >>
>> > >> > Hey there!
>> > >> >
>> > >> > My team and I have run across a bit of a jam in our application
>> where,
>> > >> > given a particular setup, our Kafka Streams application never
>> seems to
>> > >> > start successfully, instead just getting stuck in the REBALANCING
>> > state.
>> > >> > We've been able to get the crucial factors that cause this behavior
>> > >> down to
>> > >> > a particular combination of (1) grouping, (2) windowing, (3)
>> > >> aggregating,
>> > >> > and (4) foreign-key joining, with some of those steps specifying
>> > Serdes
>> > >> > besides the default.
>> > >> >
>> > >> > It's probably more useful to see a minimal example, so there's one
>> > here
>> > >> > <
>> > >>
>> >
>> https://github.com/ajablonski/streams-issue-demo/blob/master/build.gradle
>> > >> > >.
>> > >> > The underlying Kafka Streams version is 2.5.1. The first test
>> should
>> > >> show
>> > >> > the application eventually transition to running state, but it
>> doesn't
>> > >> > within the 30 second timeout I've set. Interestingly, getting rid
>> of
>> > the
>> > >> > 'Grouped.with' argument to the 'groupBy' function and the
>> > >> > 'Materialized.with' in 'aggregate' in the 'StreamsConfiguration'
>> lets
>> > >> the
>> > >> > application transition to "RUNNING", though without the correct
>> Serdes
>> > >> > that's not too valuable.
>> > >> >
>> > >> > There might be a cleaner way to organize the particular flow in the
>> > toy
>> > >> > example, but is there something fundamentally wrong with the
>> approach
>> > >> laid
>> > >> > out in that application that would cause Streams to be stuck in
>> > >> > REBALANCING? I'd appreciate any advice folks could give!
>> > >> >
>> > >> > Thanks!
>> > >> > Alex Jablonski
>> > >> >
>> > >>
>> > >
>> >
>>
>

Reply via email to