Nice! Sorry for not creating the ticket, but I appreciate your
investigation (and fix!)

Thanks again!










On Thu, Nov 5, 2020 at 5:31 PM Sophie Blee-Goldman <sop...@confluent.io>
wrote:

> Ok I looked into this a bit and found the bug. I'll open a PR with the fix
> sometime today:
> https://issues.apache.org/jira/browse/KAFKA-10689
>
> I also think we can do a better job of surfacing issues like this,
> rather than letting the
> application silently spin without making progress. I left some thoughts on
> the JIRA
> ticket and will try to incorporate one of them into the fix as well.
>
> On Mon, Nov 2, 2020 at 9:38 PM Sophie Blee-Goldman <sop...@confluent.io>
> wrote:
>
> > Ok I tried to reproduce the issue with a minimal example, and saw the
> same
> > results.
> > It seems like there's something weird going on with that exact topology
> > that's causing
> > it to get stuck during the assignment. Maybe it's causing an unexpected
> > cycle in the
> > topology that the assignor can't handle? Pretty weird that even removing
> > the windowedBy
> > fixes the issue, since a topology with a windowed aggregation is pretty
> > much isomorphic
> > to one with just a regular aggregation.
> >
> > Can you create a JIRA ticket for this and include your observations +
> link
> > to the example?
> > It's definitely a bug, and we'll need to look into this to understand
> > what's going wrong here.
> >
> > Sorry for the trouble, but thanks for bring it to our attention
> >
> > On Wed, Oct 28, 2020 at 12:24 PM Alex Jablonski <
> > ajablon...@thoughtworks.com> wrote:
> >
> >> This block:
> >>
> >> @EmbeddedKafka(
> >>         topics = {
> >>                 "WordCounts", "WordsForNumbers", "OutputTopic"
> >>         }
> >> )
> >>
> >> starts up an embedded Kafka in the test and creates the 3 topics (2
> >> input and 1 output). By default it creates them with 2 partitions
> >> each, but changing to 1 partition didn't alter the endless-rebalancing
> >> behavior.
> >>
> >> We also see the endless rebalancing behavior in a real Kafka cluster,
> >> using input and output topics that have already been created (and are
> >> readily consumed from and written to).
> >>
> >>
> >>
> >>
> >> On Wed, Oct 28, 2020 at 12:45 PM Sophie Blee-Goldman <
> sop...@confluent.io
> >> >
> >> wrote:
> >>
> >> > Yeah there's definitely something weird going on (assuming this is the
> >> full
> >> > logs over that
> >> > time period). The last thing we see logged from the StreamThread is
> this
> >> > message from
> >> > around the start of the task assignment process:
> >> >
> >> > 2020-10-28 12:22:37.879 DEBUG 27226 --- [-StreamThread-1]
> >> > o.a.k.s.p.i.StreamsPartitionAssignor     : stream-thread
> >> >
> >> >
> >>
> [demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer]
> >> > Constructed client metadata
> >> > {81060bdc-c8cc-4350-85f8-d238267e264e=ClientMetadata{hostInfo=null,
> >> >
> >> >
> >>
> consumers=[demo-application-81060bdc-c8cc-4350-85f8-d238267e264e-StreamThread-1-consumer-976853d9-06ad-4515-abf3-2a7398c12006],
> >> > state=[activeTasks: ([]) standbyTasks: ([]) assignedTasks: ([])
> >> > prevActiveTasks: ([]) prevStandbyTasks: ([]) prevAssignedTasks: ([])
> >> > prevOwnedPartitionsByConsumerId: ([]) capacity: 1]}} from the member
> >> > subscriptions.
> >> >
> >> >
> >> >
> >> > which is at 12:22:37. Then there's nothing else from Streams until at
> >> least
> >> > 12:25:00,
> >> > where the logs end. Not sure what it could be doing inside the
> assignor
> >> for
> >> > 2+ minutes
> >> > without ever reaching another...how many partitions are on the input
> >> > topics? Are you
> >> > sure the input topics have been pre-created before starting the app,
> >> with
> >> > the correct
> >> > names, etc?
> >> >
> >> > On Wed, Oct 28, 2020 at 10:29 AM Alex Jablonski <
> >> > ajablon...@thoughtworks.com>
> >> > wrote:
> >> >
> >> > > Hi Sophie,
> >> > >
> >> > > Thanks for your questions! Responses inline below. Also, I realized
> I
> >> > > linked to the gradle file, not the interesting bits of the example.
> >> This
> >> > > <
> >> >
> >>
> https://github.com/ajablonski/streams-issue-demo/blob/master/src/main/java/com/github/ajablonski/StreamsConfiguration.java
> >> > >
> >> > > is the configuration and this
> >> > > <
> >> >
> >>
> https://github.com/ajablonski/streams-issue-demo/blob/master/src/test/java/com/github/ajablonski/StreamsConfigurationTest.java
> >> > >
> >> > > is the test.
> >> > >
> >> > > On Tue, Oct 27, 2020 at 10:11 PM Sophie Blee-Goldman <
> >> > sop...@confluent.io>
> >> > > wrote:
> >> > >
> >> > >> >
> >> > >> > We've been able to get the crucial factors that cause this
> behavior
> >> > >> down to
> >> > >> > a particular combination
> >> > >>
> >> > >> What do you mean by this -- that you only see this when all four of
> >> > those
> >> > >> operators
> >> > >> are at play? Or do you see it with any of them.
> >> > >>
> >> > >
> >> > > We see this when all four operators are in play. If you change the
> >> sample
> >> > > streams configuration to not do that final foreign key join, or not
> >> use
> >> > > custom serdes for example, I don't see the stuck-state issue (the
> >> > > application transitions to running state just fine).
> >> > >
> >> > >
> >> > >>
> >> > >> I guess the first thing to narrow down is whether it's actually
> >> > >> rebalancing
> >> > >> or just
> >> > >> restoring within this time (the REBALANCING state is somewhat
> >> > >> misleadingly-named).
> >> > >> If this is a completely new app then it's probably not restoring,
> >> but if
> >> > >> this app had
> >> > >> already been running and building up state before hitting this
> issue
> >> > then
> >> > >> that's probably
> >> > >> the reason. It's not at all uncommon for restoration to take more
> >> than
> >> > 30
> >> > >> seconds.
> >> > >>
> >> > >
> >> > > This is happening with the app in a completely new state -- in the
> >> test,
> >> > > for example, there's no pre-loaded data when we're asserting that
> the
> >> app
> >> > > should eventually get to RUNNING, and none of the internal topics
> >> exist.
> >> > >
> >> > >
> >> > >> If it really is rebalancing this entire time, then you need to look
> >> into
> >> > >> the logs to figure
> >> > >> out why. I don't see anything obviously wrong with your particular
> >> > >> application, and even
> >> > >> if there was it should never result in endless rebalances like
> this.
> >> How
> >> > >> many instances
> >> > >> of the application are you running?
> >> > >>
> >> > >
> >> > > In our actual application, we have 3 instances, but in the tests in
> >> that
> >> > > sample project, there's only 1.
> >> > >
> >> > > The logs that we're getting right before the application gets
> "stuck"
> >> are
> >> > > below. The one that seems most concerning to my uninformed eyes is
> >> > "Member
> >> > >
> >> >
> >>
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> >> > > in group demo-application has failed". I've attached some DEBUG
> level
> >> > logs
> >> > > too, though nothing was obvious to me that would better explain the
> >> > hanging
> >> > > behavior.
> >> > >
> >> > > 2020-10-28 12:11:19.823  INFO 27127 --- [-StreamThread-1]
> >> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> >> > >
> >> >
> >>
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> >> > > groupId=demo-application] Discovered group coordinator
> localhost:50343
> >> > (id:
> >> > > 2147483647 rack: null)
> >> > > 2020-10-28 12:11:19.825  INFO 27127 --- [-StreamThread-1]
> >> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> >> > >
> >> >
> >>
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> >> > > groupId=demo-application] (Re-)joining group
> >> > > 2020-10-28 12:11:19.842  WARN 27127 --- [-StreamThread-1]
> >> > > org.apache.kafka.clients.NetworkClient   : [Consumer
> >> > >
> >> >
> >>
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> >> > > groupId=demo-application] Error while fetching metadata with
> >> correlation
> >> > id
> >> > > 7 :
> >> > >
> >> >
> >>
> {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION,
> >> > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION,
> >> > >
> >> >
> >>
> demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION}
> >> > > 2020-10-28 12:11:19.860  INFO 27127 --- [-StreamThread-1]
> >> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> >> > >
> >> >
> >>
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> >> > > groupId=demo-application] Join group failed with
> >> > > org.apache.kafka.common.errors.MemberIdRequiredException: The group
> >> > member
> >> > > needs to have a valid member id before actually entering a consumer
> >> group
> >> > > 2020-10-28 12:11:19.861  INFO 27127 --- [-StreamThread-1]
> >> > > o.a.k.c.c.internals.AbstractCoordinator  : [Consumer
> >> > >
> >> >
> >>
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> >> > > groupId=demo-application] (Re-)joining group
> >> > > 2020-10-28 12:11:19.873  INFO 27127 --- [quest-handler-3]
> >> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
> >> > Preparing
> >> > > to rebalance group demo-application in state PreparingRebalance with
> >> old
> >> > > generation 0 (__consumer_offsets-2) (reason: Adding new member
> >> > >
> >> >
> >>
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> >> > > with group instance id None)
> >> > > 2020-10-28 12:11:19.882  INFO 27127 --- [cutor-Rebalance]
> >> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
> >> > Stabilized
> >> > > group demo-application generation 1 (__consumer_offsets-2)
> >> > > 2020-10-28 12:11:19.947  WARN 27127 --- [-StreamThread-1]
> >> > > org.apache.kafka.clients.NetworkClient   : [Consumer
> >> > >
> >> >
> >>
> clientId=demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer,
> >> > > groupId=demo-application] Error while fetching metadata with
> >> correlation
> >> > id
> >> > > 9 :
> >> > >
> >> >
> >>
> {demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-0000000018-topic=UNKNOWN_TOPIC_OR_PARTITION,
> >> > > demo-application-GroupName-repartition=UNKNOWN_TOPIC_OR_PARTITION,
> >> > >
> >> >
> >>
> demo-application-KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-0000000010-topic=UNKNOWN_TOPIC_OR_PARTITION}
> >> > > 2020-10-28 12:11:23.462  INFO 27127 --- [er-event-thread]
> >> > > kafka.controller.KafkaController         : [Controller id=0]
> >> Processing
> >> > > automatic preferred replica leader election
> >> > > 2020-10-28 12:11:29.887  INFO 27127 --- [cutor-Heartbeat]
> >> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
> >> Member
> >> > >
> >> >
> >>
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> >> > > in group demo-application has failed, removing it from the group
> >> > > 2020-10-28 12:11:29.887  INFO 27127 --- [cutor-Heartbeat]
> >> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
> >> > Preparing
> >> > > to rebalance group demo-application in state PreparingRebalance with
> >> old
> >> > > generation 1 (__consumer_offsets-2) (reason: removing member
> >> > >
> >> >
> >>
> demo-application-714a21af-5fe5-4b9c-8450-53033309a406-StreamThread-1-consumer-cabbd9ce-83a7-4691-8599-b2ffe77da282
> >> > > on heartbeat expiration)
> >> > > 2020-10-28 12:11:29.888  INFO 27127 --- [cutor-Heartbeat]
> >> > > k.coordinator.group.GroupCoordinator     : [GroupCoordinator 0]:
> Group
> >> > > demo-application with generation 2 is now empty
> (__consumer_offsets-2)
> >> > >
> >> > >
> >> > >>
> >> > >> Cheers,
> >> > >> Sophie
> >> > >>
> >> > >> On Thu, Oct 15, 2020 at 10:01 PM Alex Jablonski <
> >> > >> ajablon...@thoughtworks.com>
> >> > >> wrote:
> >> > >>
> >> > >> > Hey there!
> >> > >> >
> >> > >> > My team and I have run across a bit of a jam in our application
> >> where,
> >> > >> > given a particular setup, our Kafka Streams application never
> >> seems to
> >> > >> > start successfully, instead just getting stuck in the REBALANCING
> >> > state.
> >> > >> > We've been able to get the crucial factors that cause this
> behavior
> >> > >> down to
> >> > >> > a particular combination of (1) grouping, (2) windowing, (3)
> >> > >> aggregating,
> >> > >> > and (4) foreign-key joining, with some of those steps specifying
> >> > Serdes
> >> > >> > besides the default.
> >> > >> >
> >> > >> > It's probably more useful to see a minimal example, so there's
> one
> >> > here
> >> > >> > <
> >> > >>
> >> >
> >>
> https://github.com/ajablonski/streams-issue-demo/blob/master/build.gradle
> >> > >> > >.
> >> > >> > The underlying Kafka Streams version is 2.5.1. The first test
> >> should
> >> > >> show
> >> > >> > the application eventually transition to running state, but it
> >> doesn't
> >> > >> > within the 30 second timeout I've set. Interestingly, getting rid
> >> of
> >> > the
> >> > >> > 'Grouped.with' argument to the 'groupBy' function and the
> >> > >> > 'Materialized.with' in 'aggregate' in the 'StreamsConfiguration'
> >> lets
> >> > >> the
> >> > >> > application transition to "RUNNING", though without the correct
> >> Serdes
> >> > >> > that's not too valuable.
> >> > >> >
> >> > >> > There might be a cleaner way to organize the particular flow in
> the
> >> > toy
> >> > >> > example, but is there something fundamentally wrong with the
> >> approach
> >> > >> laid
> >> > >> > out in that application that would cause Streams to be stuck in
> >> > >> > REBALANCING? I'd appreciate any advice folks could give!
> >> > >> >
> >> > >> > Thanks!
> >> > >> > Alex Jablonski
> >> > >> >
> >> > >>
> >> > >
> >> >
> >>
> >
>

Reply via email to