This is an automated email from the ASF dual-hosted git repository.

lianetm pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 19d8a414ef6 KAFKA-15900, KAFKA-18310: fix flaky test 
testOutdatedCoordinatorAssignment and AbstractCoordinatorTest (#18945)
19d8a414ef6 is described below

commit 19d8a414ef6d86596d5f8d33828d8d7560cc8678
Author: PoAn Yang <[email protected]>
AuthorDate: Mon Mar 10 23:50:35 2025 +0800

    KAFKA-15900, KAFKA-18310: fix flaky test testOutdatedCoordinatorAssignment 
and AbstractCoordinatorTest (#18945)
    
    Reviewers: Lianet Magrans <[email protected]>
---
 .../consumer/internals/AbstractCoordinator.java    | 74 ++++++++++------------
 .../consumer/internals/BaseHeartbeatThread.java    | 69 ++++++++++++++++++++
 .../consumer/internals/ConsumerCoordinator.java    | 41 +++++++++++-
 .../internals/AbstractCoordinatorTest.java         | 60 ++++++++----------
 .../internals/BaseHeartbeatThreadTest.java         | 65 +++++++++++++++++++
 .../internals/ConsumerCoordinatorTest.java         | 28 +++++++-
 6 files changed, 260 insertions(+), 77 deletions(-)

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
index b530ca562b9..f8165f6656d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
@@ -67,7 +67,6 @@ import org.apache.kafka.common.requests.SyncGroupResponse;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryProvider;
 import org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter;
 import org.apache.kafka.common.utils.ExponentialBackoff;
-import org.apache.kafka.common.utils.KafkaThread;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Timer;
@@ -84,7 +83,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 
 /**
  * AbstractCoordinator implements group management for a single group member 
by interacting with
@@ -135,6 +134,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
     private final GroupCoordinatorMetrics sensors;
     private final GroupRebalanceConfig rebalanceConfig;
     private final Optional<ClientTelemetryReporter> clientTelemetryReporter;
+    private final Optional<Supplier<BaseHeartbeatThread>> 
heartbeatThreadSupplier;
 
     protected final Time time;
     protected final ConsumerNetworkClient client;
@@ -144,7 +144,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
     private String rejoinReason = "";
     private boolean rejoinNeeded = true;
     private boolean needsJoinPrepare = true;
-    private HeartbeatThread heartbeatThread = null;
+    private BaseHeartbeatThread heartbeatThread = null;
     private RequestFuture<ByteBuffer> joinFuture = null;
     private RequestFuture<Void> findCoordinatorFuture = null;
     private volatile RuntimeException fatalFindCoordinatorException = null;
@@ -165,7 +165,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                                Metrics metrics,
                                String metricGrpPrefix,
                                Time time) {
-        this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, 
time, Optional.empty());
+        this(rebalanceConfig, logContext, client, metrics, metricGrpPrefix, 
time, Optional.empty(), Optional.empty());
     }
 
     public AbstractCoordinator(GroupRebalanceConfig rebalanceConfig,
@@ -174,7 +174,8 @@ public abstract class AbstractCoordinator implements 
Closeable {
                                Metrics metrics,
                                String metricGrpPrefix,
                                Time time,
-                               Optional<ClientTelemetryReporter> 
clientTelemetryReporter) {
+                               Optional<ClientTelemetryReporter> 
clientTelemetryReporter,
+                               Optional<Supplier<BaseHeartbeatThread>> 
heartbeatThreadSupplier) {
         Objects.requireNonNull(rebalanceConfig.groupId,
                                "Expected a non-null group id for coordinator 
construction");
         this.rebalanceConfig = rebalanceConfig;
@@ -189,6 +190,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         this.heartbeat = new Heartbeat(rebalanceConfig, time);
         this.sensors = new GroupCoordinatorMetrics(metrics, metricGrpPrefix);
         this.clientTelemetryReporter = clientTelemetryReporter;
+        this.heartbeatThreadSupplier = heartbeatThreadSupplier;
     }
 
     /**
@@ -361,7 +363,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
      */
     protected synchronized void pollHeartbeat(long now) {
         if (heartbeatThread != null) {
-            if (heartbeatThread.hasFailed()) {
+            if (heartbeatThread.isFailed()) {
                 // set the heartbeat thread to null and raise an exception. If 
the user catches it,
                 // the next call to ensureActiveGroup() will spawn a new 
heartbeat thread.
                 RuntimeException cause = heartbeatThread.failureCause();
@@ -381,7 +383,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
         // we don't need to send heartbeats
         if (state.hasNotJoinedGroup())
             return Long.MAX_VALUE;
-        if (heartbeatThread != null && heartbeatThread.hasFailed()) {
+        if (heartbeatThread != null && heartbeatThread.isFailed()) {
             // if an exception occurs in the heartbeat thread, raise it.
             throw heartbeatThread.failureCause();
         }
@@ -417,13 +419,13 @@ public abstract class AbstractCoordinator implements 
Closeable {
 
     private synchronized void startHeartbeatThreadIfNeeded() {
         if (heartbeatThread == null) {
-            heartbeatThread = new HeartbeatThread();
+            heartbeatThread = 
heartbeatThreadSupplier.orElse(HeartbeatThread::new).get();
             heartbeatThread.start();
         }
     }
 
     private void closeHeartbeatThread() {
-        HeartbeatThread thread;
+        BaseHeartbeatThread thread;
         synchronized (this) {
             if (heartbeatThread == null)
                 return;
@@ -1330,6 +1332,13 @@ public abstract class AbstractCoordinator implements 
Closeable {
                         String.format("The total number of %s", 
descriptiveName)));
     }
 
+    /**
+     * Visible for testing.
+     */
+    protected BaseHeartbeatThread heartbeatThread() {
+        return heartbeatThread;
+    }
+
     private class GroupCoordinatorMetrics {
         public final String metricGrpName;
 
@@ -1436,56 +1445,40 @@ public abstract class AbstractCoordinator implements 
Closeable {
         }
     }
 
-    private class HeartbeatThread extends KafkaThread implements AutoCloseable 
{
-        private boolean enabled = false;
-        private boolean closed = false;
-        private final AtomicReference<RuntimeException> failed = new 
AtomicReference<>(null);
+    private class HeartbeatThread extends BaseHeartbeatThread {
 
         private HeartbeatThread() {
             super(HEARTBEAT_THREAD_PREFIX + (rebalanceConfig.groupId.isEmpty() 
? "" : " | " + rebalanceConfig.groupId), true);
         }
 
+        @Override
         public void enable() {
             synchronized (AbstractCoordinator.this) {
                 log.debug("Enabling heartbeat thread");
-                this.enabled = true;
+                super.enable();
                 heartbeat.resetTimeouts();
                 AbstractCoordinator.this.notify();
             }
         }
 
-        public void disable() {
-            synchronized (AbstractCoordinator.this) {
-                log.debug("Disabling heartbeat thread");
-                this.enabled = false;
-            }
-        }
-
+        @Override
         public void close() {
             synchronized (AbstractCoordinator.this) {
-                this.closed = true;
+                super.close();
                 AbstractCoordinator.this.notify();
             }
         }
 
-        private boolean hasFailed() {
-            return failed.get() != null;
-        }
-
-        private RuntimeException failureCause() {
-            return failed.get();
-        }
-
         @Override
         public void run() {
             try {
                 log.debug("Heartbeat thread started");
                 while (true) {
                     synchronized (AbstractCoordinator.this) {
-                        if (closed)
+                        if (isClosed())
                             return;
 
-                        if (!enabled) {
+                        if (!isEnabled()) {
                             AbstractCoordinator.this.wait();
                             continue;
                         }
@@ -1493,7 +1486,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                         // we do not need to heartbeat we are not part of a 
group yet;
                         // also if we already have fatal error, the client 
will be
                         // crashed soon, hence we do not need to continue 
heartbeating either
-                        if (state.hasNotJoinedGroup() || hasFailed()) {
+                        if (state.hasNotJoinedGroup() || isFailed()) {
                             disable();
                             continue;
                         }
@@ -1547,7 +1540,7 @@ public abstract class AbstractCoordinator implements 
Closeable {
                                             heartbeat.receiveHeartbeat();
                                         } else if (e instanceof 
FencedInstanceIdException) {
                                             log.error("Caught fenced 
group.instance.id {} error in heartbeat thread", 
rebalanceConfig.groupInstanceId);
-                                            heartbeatThread.failed.set(e);
+                                            setFailureCause(e);
                                         } else {
                                             heartbeat.failHeartbeat();
                                             // wake up the thread if it's 
sleeping to reschedule the heartbeat
@@ -1561,28 +1554,27 @@ public abstract class AbstractCoordinator implements 
Closeable {
                 }
             } catch (AuthenticationException e) {
                 log.error("An authentication error occurred in the heartbeat 
thread", e);
-                this.failed.set(e);
+                setFailureCause(e);
             } catch (GroupAuthorizationException e) {
                 log.error("A group authorization error occurred in the 
heartbeat thread", e);
-                this.failed.set(e);
+                setFailureCause(e);
             } catch (InterruptedException | InterruptException e) {
                 Thread.interrupted();
                 log.error("Unexpected interrupt received in heartbeat thread", 
e);
-                this.failed.set(new RuntimeException(e));
+                setFailureCause(new RuntimeException(e));
             } catch (Throwable e) {
                 log.error("Heartbeat thread failed due to unexpected error", 
e);
                 if (e instanceof RuntimeException)
-                    this.failed.set((RuntimeException) e);
+                    setFailureCause((RuntimeException) e);
                 else
-                    this.failed.set(new RuntimeException(e));
+                    setFailureCause(new RuntimeException(e));
             } finally {
                 log.debug("Heartbeat thread has closed");
                 synchronized (AbstractCoordinator.this) {
-                    this.closed = true;
+                    super.close();
                 }
             }
         }
-
     }
 
     protected static class Generation {
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThread.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThread.java
new file mode 100644
index 00000000000..c9d96d807e3
--- /dev/null
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThread.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.KafkaThread;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Base class for heartbeat threads. This class provides a mechanism to 
enable/disable the heartbeat thread.
+ * The heartbeat thread should check whether it's enabled by calling {@link 
BaseHeartbeatThread#isEnabled()}
+ * before sending heartbeat requests.
+ */
+public class BaseHeartbeatThread extends KafkaThread implements AutoCloseable {
+    private final AtomicBoolean enabled = new AtomicBoolean(false);
+    private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicReference<RuntimeException> failureCause = new 
AtomicReference<>(null);
+
+    public BaseHeartbeatThread(String name, boolean daemon) {
+        super(name, daemon);
+    }
+
+    public void enable() {
+        enabled.set(true);
+    }
+
+    public void disable() {
+        enabled.set(false);
+    }
+
+    public boolean isEnabled() {
+        return enabled.get();
+    }
+
+    public void setFailureCause(RuntimeException e) {
+        failureCause.set(e);
+    }
+
+    public boolean isFailed() {
+        return failureCause.get() != null;
+    }
+
+    public RuntimeException failureCause() {
+        return failureCause.get();
+    }
+
+    public void close() {
+        closed.set(true);
+    }
+
+    public boolean isClosed() {
+        return closed.get();
+    }
+}
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
index 584a03736f9..784907936f4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java
@@ -86,6 +86,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static 
org.apache.kafka.clients.consumer.ConsumerConfig.ASSIGN_FROM_SUBSCRIBED_ASSIGNORS;
@@ -176,13 +177,51 @@ public final class ConsumerCoordinator extends 
AbstractCoordinator {
                                boolean throwOnFetchStableOffsetsUnsupported,
                                String rackId,
                                Optional<ClientTelemetryReporter> 
clientTelemetryReporter) {
+        this(rebalanceConfig,
+            logContext,
+            client,
+            assignors,
+            metadata,
+            subscriptions,
+            metrics,
+            metricGrpPrefix,
+            time,
+            autoCommitEnabled,
+            autoCommitIntervalMs,
+            interceptors,
+            throwOnFetchStableOffsetsUnsupported,
+            rackId,
+            clientTelemetryReporter,
+            Optional.empty());
+    }
+
+    /**
+     * Initialize the coordination manager.
+     */
+    public ConsumerCoordinator(GroupRebalanceConfig rebalanceConfig,
+                               LogContext logContext,
+                               ConsumerNetworkClient client,
+                               List<ConsumerPartitionAssignor> assignors,
+                               ConsumerMetadata metadata,
+                               SubscriptionState subscriptions,
+                               Metrics metrics,
+                               String metricGrpPrefix,
+                               Time time,
+                               boolean autoCommitEnabled,
+                               int autoCommitIntervalMs,
+                               ConsumerInterceptors<?, ?> interceptors,
+                               boolean throwOnFetchStableOffsetsUnsupported,
+                               String rackId,
+                               Optional<ClientTelemetryReporter> 
clientTelemetryReporter,
+                               Optional<Supplier<BaseHeartbeatThread>> 
heartbeatThreadSupplier) {
         super(rebalanceConfig,
               logContext,
               client,
               metrics,
               metricGrpPrefix,
               time,
-              clientTelemetryReporter);
+              clientTelemetryReporter,
+              heartbeatThreadSupplier);
         this.rebalanceConfig = rebalanceConfig;
         this.log = logContext.logger(ConsumerCoordinator.class);
         this.metadata = metadata;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
index f144ccf5061..aedc4977a03 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinatorTest.java
@@ -51,7 +51,6 @@ import org.apache.kafka.common.requests.LeaveGroupResponse;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
-import org.apache.kafka.common.test.api.Flaky;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -75,6 +74,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Supplier;
 
 import static java.util.Collections.emptyMap;
 import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -86,6 +86,8 @@ import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.junit.jupiter.api.Assertions.fail;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
 
 public class AbstractCoordinatorTest {
     private static final ByteBuffer EMPTY_DATA = ByteBuffer.wrap(new byte[0]);
@@ -120,15 +122,15 @@ public class AbstractCoordinatorTest {
 
     private void setupCoordinator() {
         setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
REBALANCE_TIMEOUT_MS,
-            Optional.empty());
+            Optional.empty(), Optional.empty());
     }
 
     private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs) {
         setupCoordinator(retryBackoffMs, retryBackoffMaxMs, 
REBALANCE_TIMEOUT_MS,
-            Optional.empty());
+            Optional.empty(), Optional.empty());
     }
 
-    private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, 
int rebalanceTimeoutMs, Optional<String> groupInstanceId) {
+    private void setupCoordinator(int retryBackoffMs, int retryBackoffMaxMs, 
int rebalanceTimeoutMs, Optional<String> groupInstanceId, 
Optional<Supplier<BaseHeartbeatThread>> heartbeatThreadSupplier) {
         LogContext logContext = new LogContext();
         this.mockTime = new MockTime();
         ConsumerMetadata metadata = new ConsumerMetadata(retryBackoffMs, 
retryBackoffMaxMs, 60 * 60 * 1000L,
@@ -160,7 +162,8 @@ public class AbstractCoordinatorTest {
         this.coordinator = new DummyCoordinator(rebalanceConfig,
                                                 consumerClient,
                                                 metrics,
-                                                mockTime);
+                                                mockTime,
+                                                heartbeatThreadSupplier);
     }
 
     private void joinGroup() {
@@ -349,8 +352,7 @@ public class AbstractCoordinatorTest {
 
     @Test
     public void testJoinGroupRequestTimeout() {
-        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
REBALANCE_TIMEOUT_MS,
-            Optional.empty());
+        setupCoordinator();
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         coordinator.ensureCoordinatorReady(mockTime.timer(0));
 
@@ -367,7 +369,7 @@ public class AbstractCoordinatorTest {
     @Test
     public void 
testJoinGroupRequestTimeoutLowerBoundedByDefaultRequestTimeout() {
         int rebalanceTimeoutMs = REQUEST_TIMEOUT_MS - 10000;
-        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
rebalanceTimeoutMs, Optional.empty());
+        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
rebalanceTimeoutMs, Optional.empty(), Optional.empty());
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         coordinator.ensureCoordinatorReady(mockTime.timer(0));
 
@@ -387,7 +389,7 @@ public class AbstractCoordinatorTest {
         // Ensure we can handle the maximum allowed rebalance timeout
 
         setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
Integer.MAX_VALUE,
-            Optional.empty());
+            Optional.empty(), Optional.empty());
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         coordinator.ensureCoordinatorReady(mockTime.timer(0));
 
@@ -1094,7 +1096,7 @@ public class AbstractCoordinatorTest {
     }
 
     private void checkLeaveGroupRequestSent(Optional<String> groupInstanceId) {
-        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
Integer.MAX_VALUE, groupInstanceId);
+        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
Integer.MAX_VALUE, groupInstanceId, Optional.empty());
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, 
leaderId, Errors.NONE));
@@ -1189,7 +1191,7 @@ public class AbstractCoordinatorTest {
     private RequestFuture<Void> setupLeaveGroup(LeaveGroupResponse 
leaveGroupResponse,
                                                 String leaveReason,
                                                 String expectedLeaveReason) {
-        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
Integer.MAX_VALUE, Optional.empty());
+        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
Integer.MAX_VALUE, Optional.empty(), Optional.empty());
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, 
leaderId, Errors.NONE));
@@ -1435,10 +1437,10 @@ public class AbstractCoordinatorTest {
         awaitFirstHeartbeat(heartbeatReceived);
     }
 
-    @Flaky("KAFKA-18310")
     @Test
     public void testWakeupAfterSyncGroupSentExternalCompletion() throws 
Exception {
-        setupCoordinator();
+        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
REBALANCE_TIMEOUT_MS,
+            Optional.empty(), Optional.of(() -> 
mock(BaseHeartbeatThread.class)));
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, 
leaderId, Errors.NONE));
@@ -1454,13 +1456,13 @@ public class AbstractCoordinatorTest {
                 return isSyncGroupRequest;
             }
         }, syncGroupResponse(Errors.NONE));
-        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         assertThrows(WakeupException.class, () -> 
coordinator.ensureActiveGroup(), "Should have woken up from 
ensureActiveGroup()");
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(0, coordinator.onJoinCompleteInvokes);
-        assertFalse(heartbeatReceived.get());
+        assertNotNull(coordinator.heartbeatThread());
+        verify(coordinator.heartbeatThread()).enable();
 
         // the join group completes in this poll()
         consumerClient.poll(mockTime.timer(0));
@@ -1468,14 +1470,12 @@ public class AbstractCoordinatorTest {
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
-
-        awaitFirstHeartbeat(heartbeatReceived);
     }
 
-    @Flaky("KAFKA-18310")
     @Test
     public void testWakeupAfterSyncGroupReceived() throws Exception {
-        setupCoordinator();
+        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
REBALANCE_TIMEOUT_MS,
+            Optional.empty(), Optional.of(() -> 
mock(BaseHeartbeatThread.class)));
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, 
leaderId, Errors.NONE));
@@ -1486,7 +1486,6 @@ public class AbstractCoordinatorTest {
                 consumerClient.wakeup();
             return isSyncGroupRequest;
         }, syncGroupResponse(Errors.NONE));
-        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         try {
             coordinator.ensureActiveGroup();
@@ -1496,20 +1495,19 @@ public class AbstractCoordinatorTest {
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(0, coordinator.onJoinCompleteInvokes);
-        assertFalse(heartbeatReceived.get());
+        assertNotNull(coordinator.heartbeatThread());
+        verify(coordinator.heartbeatThread()).enable();
 
         coordinator.ensureActiveGroup();
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
-
-        awaitFirstHeartbeat(heartbeatReceived);
     }
 
-    @Flaky("KAFKA-15474,KAFKA-18310")
     @Test
     public void testWakeupAfterSyncGroupReceivedExternalCompletion() throws 
Exception {
-        setupCoordinator();
+        setupCoordinator(RETRY_BACKOFF_MS, RETRY_BACKOFF_MAX_MS, 
REBALANCE_TIMEOUT_MS,
+            Optional.empty(), Optional.of(() -> 
mock(BaseHeartbeatThread.class)));
 
         mockClient.prepareResponse(groupCoordinatorResponse(node, 
Errors.NONE));
         mockClient.prepareResponse(joinGroupFollowerResponse(1, memberId, 
leaderId, Errors.NONE));
@@ -1520,20 +1518,18 @@ public class AbstractCoordinatorTest {
                 consumerClient.wakeup();
             return isSyncGroupRequest;
         }, syncGroupResponse(Errors.NONE));
-        AtomicBoolean heartbeatReceived = prepareFirstHeartbeat();
 
         assertThrows(WakeupException.class, () -> 
coordinator.ensureActiveGroup(), "Should have woken up from 
ensureActiveGroup()");
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(0, coordinator.onJoinCompleteInvokes);
-        assertFalse(heartbeatReceived.get());
+        assertNotNull(coordinator.heartbeatThread());
+        verify(coordinator.heartbeatThread()).enable();
 
         coordinator.ensureActiveGroup();
 
         assertEquals(1, coordinator.onJoinPrepareInvokes);
         assertEquals(1, coordinator.onJoinCompleteInvokes);
-
-        awaitFirstHeartbeat(heartbeatReceived);
     }
 
     @Test
@@ -1707,8 +1703,9 @@ public class AbstractCoordinatorTest {
         DummyCoordinator(GroupRebalanceConfig rebalanceConfig,
                          ConsumerNetworkClient client,
                          Metrics metrics,
-                         Time time) {
-            super(rebalanceConfig, new LogContext(), client, metrics, 
METRIC_GROUP_PREFIX, time);
+                         Time time,
+                         Optional<Supplier<BaseHeartbeatThread>> 
heartbeatThreadSupplier) {
+            super(rebalanceConfig, new LogContext(), client, metrics, 
METRIC_GROUP_PREFIX, time, Optional.empty(), heartbeatThreadSupplier);
         }
 
         @Override
@@ -1750,5 +1747,4 @@ public class AbstractCoordinatorTest {
             onJoinCompleteInvokes++;
         }
     }
-
 }
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThreadTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThreadTest.java
new file mode 100644
index 00000000000..359a58ea261
--- /dev/null
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/BaseHeartbeatThreadTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.errors.FencedInstanceIdException;
+
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class BaseHeartbeatThreadTest {
+
+    @Test
+    public void testIsEnabled() {
+        try (BaseHeartbeatThread baseHeartbeatThread = new 
BaseHeartbeatThread("test", true)) {
+            assertFalse(baseHeartbeatThread.isEnabled());
+
+            baseHeartbeatThread.enable();
+            assertTrue(baseHeartbeatThread.isEnabled());
+
+            baseHeartbeatThread.disable();
+            assertFalse(baseHeartbeatThread.isEnabled());
+        }
+    }
+
+    @Test
+    public void testIsFailed() {
+        try (BaseHeartbeatThread baseHeartbeatThread = new 
BaseHeartbeatThread("test", true)) {
+            assertFalse(baseHeartbeatThread.isFailed());
+            assertNull(baseHeartbeatThread.failureCause());
+
+            FencedInstanceIdException exception = new 
FencedInstanceIdException("test");
+            baseHeartbeatThread.setFailureCause(exception);
+            assertTrue(baseHeartbeatThread.isFailed());
+            assertEquals(exception, baseHeartbeatThread.failureCause());
+        }
+    }
+
+    @Test
+    public void testIsClosed() {
+        try (BaseHeartbeatThread baseHeartbeatThread = new 
BaseHeartbeatThread("test", true)) {
+            assertFalse(baseHeartbeatThread.isClosed());
+
+            baseHeartbeatThread.close();
+            assertTrue(baseHeartbeatThread.isClosed());
+        }
+    }
+}
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
index 3dd91dc7639..c41ea0029ba 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java
@@ -76,7 +76,6 @@ import 
org.apache.kafka.common.requests.OffsetFetchResponse.PartitionData;
 import org.apache.kafka.common.requests.RequestTestUtils;
 import org.apache.kafka.common.requests.SyncGroupRequest;
 import org.apache.kafka.common.requests.SyncGroupResponse;
-import org.apache.kafka.common.test.api.Flaky;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.common.utils.Time;
@@ -1010,10 +1009,9 @@ public abstract class ConsumerCoordinatorTest {
         assertEquals(getAdded(owned, assigned), rebalanceListener.assigned);
     }
 
-
-    @Flaky("KAFKA-15900")
     @Test
     public void testOutdatedCoordinatorAssignment() {
+        createMockHeartbeatThreadCoordinator();
         final String consumerId = "outdated_assignment";
         final List<TopicPartition> owned = Collections.emptyList();
         final List<String> oldSubscription = singletonList(topic2);
@@ -4145,4 +4143,28 @@ public abstract class ConsumerCoordinatorTest {
             return super.assign(partitionsPerTopic, subscriptions);
         }
     }
+
+    private void createMockHeartbeatThreadCoordinator() {
+        metrics.close();
+        coordinator.close(time.timer(0));
+
+        metrics = new Metrics(time);
+        coordinator = new ConsumerCoordinator(
+            rebalanceConfig,
+            new LogContext(),
+            consumerClient,
+            assignors,
+            metadata,
+            subscriptions,
+            metrics,
+            consumerId + groupId,
+            time,
+            false,
+            autoCommitIntervalMs,
+            null,
+            false,
+            null,
+            Optional.empty(),
+            Optional.of(() -> Mockito.mock(BaseHeartbeatThread.class)));
+    }
 }

Reply via email to