This is an automated email from the ASF dual-hosted git repository.
dajac pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c93b717836 KAFKA-13542: Add rebalance reason in Kafka Streams (#12018)
c93b717836 is described below
commit c93b717836b6d92d0f2e9fb101ea6ff3e823ffca
Author: Hao Li <[email protected]>
AuthorDate: Wed Apr 13 04:49:31 2022 -0700
KAFKA-13542: Add rebalance reason in Kafka Streams (#12018)
Reviewers: Bruno Cadonna <[email protected]>, David Jacot
<[email protected]>
---
.../org/apache/kafka/streams/processor/internals/StreamThread.java | 6 +++---
.../apache/kafka/streams/processor/internals/StreamThreadTest.java | 5 ++---
2 files changed, 5 insertions(+), 6 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index ab4b094696..15e4903e63 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -594,7 +594,7 @@ public class StreamThread extends Thread {
runOnce();
if (nextProbingRebalanceMs.get() < time.milliseconds()) {
log.info("Triggering the followup rebalance scheduled for
{} ms.", nextProbingRebalanceMs.get());
- mainConsumer.enforceRebalance();
+ mainConsumer.enforceRebalance("Scheduled probing
rebalance");
nextProbingRebalanceMs.set(Long.MAX_VALUE);
}
} catch (final TaskCorruptedException e) {
@@ -606,7 +606,7 @@ public class StreamThread extends Thread {
final boolean enforceRebalance =
taskManager.handleCorruption(e.corruptedTasks());
if (enforceRebalance && eosEnabled) {
log.info("Active task(s) got corrupted. Triggering a
rebalance.");
- mainConsumer.enforceRebalance();
+ mainConsumer.enforceRebalance("Active tasks
corrupted");
}
} catch (final TaskMigratedException taskMigrated) {
handleTaskMigrated(taskMigrated);
@@ -648,7 +648,7 @@ public class StreamThread extends Thread {
if (assignmentErrorCode.get() ==
AssignorError.SHUTDOWN_REQUESTED.code()) {
log.warn("Detected that shutdown was requested. " +
"All clients in this app will now begin to shutdown");
- mainConsumer.enforceRebalance();
+ mainConsumer.enforceRebalance("Shutdown requested");
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index af02e55511..11bd1d42a7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -516,6 +516,7 @@ public class StreamThreadTest {
final EasyMockConsumerClientSupplier mockClientSupplier = new
EasyMockConsumerClientSupplier(mockConsumer);
mockClientSupplier.setCluster(createCluster());
+ mockConsumer.enforceRebalance("Scheduled probing rebalance");
EasyMock.replay(mockConsumer);
final TopologyMetadata topologyMetadata = new
TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
@@ -538,8 +539,6 @@ public class StreamThreadTest {
null
);
- mockConsumer.enforceRebalance();
-
mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L);
thread.start();
@@ -2430,7 +2429,7 @@ public class StreamThreadTest {
expect(task2.id()).andReturn(taskId2).anyTimes();
expect(taskManager.handleCorruption(corruptedTasks)).andReturn(true);
- consumer.enforceRebalance();
+ consumer.enforceRebalance("Active tasks corrupted");
expectLastCall();
EasyMock.replay(task1, task2, taskManager, consumer);