This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7e57300  KAFKA-12486: Enforce Rebalance when a TaskCorruptedException 
is throw… (#11076)
7e57300 is described below

commit 7e573001484427dc73d821cc232a4c3bb3b5f5bb
Author: vamossagar12 <[email protected]>
AuthorDate: Wed Sep 29 05:20:16 2021 +0530

    KAFKA-12486: Enforce Rebalance when a TaskCorruptedException is throw… 
(#11076)
    
    This PR aims to utilize HighAvailabilityTaskAssignor to avoid downtime on 
corrupted tasks. The idea is that, when we hit TaskCorruptedException on an 
active task, a rebalance is triggered after we've wiped out the corrupted state 
stores. This will allow the assignor to temporarily redirect this task to 
another client who can resume work on the task while the original owner works 
on restoring the state from scratch.
    
    Reviewers: Anna Sophie Blee-Goldman <[email protected]>
---
 .../streams/processor/internals/StreamThread.java  |  10 +-
 .../streams/processor/internals/TaskManager.java   |   3 +-
 .../processor/internals/StreamThreadTest.java      | 140 ++++++++++++++++++++-
 3 files changed, 150 insertions(+), 3 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index a1da373..addfe15 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -310,6 +310,7 @@ public class StreamThread extends Thread {
     // These are used to signal from outside the stream thread, but the 
variables themselves are internal to the thread
     private final AtomicLong cacheResizeSize = new AtomicLong(-1L);
     private final AtomicBoolean leaveGroupRequested = new AtomicBoolean(false);
+    private final boolean eosEnabled;
 
     public static StreamThread create(final TopologyMetadata topologyMetadata,
                                       final StreamsConfig config,
@@ -547,6 +548,7 @@ public class StreamThread extends Thread {
         this.commitTimeMs = 
config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG);
 
         this.numIterations = 1;
+        this.eosEnabled = eosEnabled(config);
     }
 
     private static final class InternalConsumerConfig extends ConsumerConfig {
@@ -609,7 +611,13 @@ public class StreamThread extends Thread {
                 log.warn("Detected the states of tasks " + e.corruptedTasks() 
+ " are corrupted. " +
                          "Will close the task as dirty and re-create and 
bootstrap from scratch.", e);
                 try {
-                    taskManager.handleCorruption(e.corruptedTasks());
+                    // check if any active task got corrupted. We will trigger 
a rebalance in that case.
+                    // once the task corruptions have been handled
+                    final boolean enforceRebalance = 
taskManager.handleCorruption(e.corruptedTasks());
+                    if (enforceRebalance && eosEnabled) {
+                        log.info("Active task(s) got corrupted. Triggering a 
rebalance.");
+                        mainConsumer.enforceRebalance();
+                    }
                 } catch (final TaskMigratedException taskMigrated) {
                     handleTaskMigrated(taskMigrated);
                 }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 9269c9d..67113a7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -170,7 +170,7 @@ public class TaskManager {
     /**
      * @throws TaskMigratedException
      */
-    void handleCorruption(final Set<TaskId> corruptedTasks) {
+    boolean handleCorruption(final Set<TaskId> corruptedTasks) {
         final Set<Task> corruptedActiveTasks = new HashSet<>();
         final Set<Task> corruptedStandbyTasks = new HashSet<>();
 
@@ -210,6 +210,7 @@ public class TaskManager {
         }
 
         closeDirtyAndRevive(corruptedActiveTasks, true);
+        return !corruptedActiveTasks.isEmpty();
     }
 
     private void closeDirtyAndRevive(final Collection<Task> 
taskWithChangelogs, final boolean markAsCorrupted) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index e57c565..6337a13 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -148,6 +148,7 @@ public class StreamThreadTest {
     private final String stateDir = TestUtils.tempDirectory().getPath();
     private final MockClientSupplier clientSupplier = new MockClientSupplier();
     private final StreamsConfig config = new StreamsConfig(configProps(false));
+    private final StreamsConfig eosEnabledConfig = new 
StreamsConfig(configProps(true));
     private final ConsumedInternal<Object, Object> consumed = new 
ConsumedInternal<>();
     private final ChangelogReader changelogReader = new MockChangelogReader();
     private final StateDirectory stateDirectory = new StateDirectory(config, 
mockTime, true, false);
@@ -2287,7 +2288,7 @@ public class StreamThreadTest {
         expect(task2.state()).andReturn(Task.State.RUNNING).anyTimes();
         expect(task2.id()).andReturn(taskId2).anyTimes();
 
-        taskManager.handleCorruption(corruptedTasks);
+        expect(taskManager.handleCorruption(corruptedTasks)).andReturn(true);
 
         EasyMock.replay(task1, task2, taskManager, consumer);
 
@@ -2468,6 +2469,143 @@ public class StreamThreadTest {
     }
 
     @Test
+    @SuppressWarnings("unchecked")
+    public void 
shouldEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnActiveTask() {
+        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
+        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
+        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+        consumer.subscribe((Collection<String>) anyObject(), anyObject());
+        EasyMock.expectLastCall().anyTimes();
+        consumer.unsubscribe();
+        EasyMock.expectLastCall().anyTimes();
+        EasyMock.replay(consumerGroupMetadata);
+        final Task task1 = mock(Task.class);
+        final Task task2 = mock(Task.class);
+
+        final TaskId taskId1 = new TaskId(0, 0);
+        final TaskId taskId2 = new TaskId(0, 2);
+
+        final Set<TaskId> corruptedTasks = singleton(taskId1);
+
+        expect(task1.state()).andReturn(Task.State.RUNNING).anyTimes();
+        expect(task1.id()).andReturn(taskId1).anyTimes();
+        expect(task2.state()).andReturn(Task.State.CREATED).anyTimes();
+        expect(task2.id()).andReturn(taskId2).anyTimes();
+        expect(taskManager.handleCorruption(corruptedTasks)).andReturn(true);
+
+        consumer.enforceRebalance();
+        expectLastCall();
+
+        EasyMock.replay(task1, task2, taskManager, consumer);
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+        final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            eosEnabledConfig,
+            null,
+            consumer,
+            consumer,
+            null,
+            null,
+            taskManager,
+            streamsMetrics,
+            topologyMetadata,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE),
+            null,
+            HANDLER,
+            null
+        ) {
+            @Override
+            void runOnce() {
+                setState(State.PENDING_SHUTDOWN);
+                throw new TaskCorruptedException(corruptedTasks);
+            }
+        }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+
+        thread.setState(StreamThread.State.STARTING);
+        thread.runLoop();
+
+        verify(taskManager);
+        verify(consumer);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void 
shouldNotEnforceRebalanceWhenTaskCorruptedExceptionIsThrownForAnInactiveTask() {
+        final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
+        
expect(taskManager.producerClientIds()).andStubReturn(Collections.emptySet());
+        final Consumer<byte[], byte[]> consumer = mock(Consumer.class);
+        final ConsumerGroupMetadata consumerGroupMetadata = 
mock(ConsumerGroupMetadata.class);
+        expect(consumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
+        
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
+        consumer.subscribe((Collection<String>) anyObject(), anyObject());
+        EasyMock.expectLastCall().anyTimes();
+        consumer.unsubscribe();
+        EasyMock.expectLastCall().anyTimes();
+        EasyMock.replay(consumerGroupMetadata);
+        final Task task1 = mock(Task.class);
+        final Task task2 = mock(Task.class);
+
+        final TaskId taskId1 = new TaskId(0, 0);
+        final TaskId taskId2 = new TaskId(0, 2);
+
+        final Set<TaskId> corruptedTasks = singleton(taskId1);
+
+        expect(task1.state()).andReturn(Task.State.CLOSED).anyTimes();
+        expect(task1.id()).andReturn(taskId1).anyTimes();
+        expect(task2.state()).andReturn(Task.State.CLOSED).anyTimes();
+        expect(task2.id()).andReturn(taskId2).anyTimes();
+        expect(taskManager.handleCorruption(corruptedTasks)).andReturn(false);
+
+        EasyMock.replay(task1, task2, taskManager, consumer);
+
+        final StreamsMetricsImpl streamsMetrics =
+            new StreamsMetricsImpl(metrics, CLIENT_ID, 
StreamsConfig.METRICS_LATEST, mockTime);
+        final TopologyMetadata topologyMetadata = new 
TopologyMetadata(internalTopologyBuilder, config);
+        topologyMetadata.buildAndRewriteTopology();
+        final StreamThread thread = new StreamThread(
+            mockTime,
+            eosEnabledConfig,
+            null,
+            consumer,
+            consumer,
+            null,
+            null,
+            taskManager,
+            streamsMetrics,
+            topologyMetadata,
+            CLIENT_ID,
+            new LogContext(""),
+            new AtomicInteger(),
+            new AtomicLong(Long.MAX_VALUE),
+            null,
+            HANDLER,
+            null
+        ) {
+            @Override
+            void runOnce() {
+                setState(State.PENDING_SHUTDOWN);
+                throw new TaskCorruptedException(corruptedTasks);
+            }
+        }.updateThreadMetadata(getSharedAdminClientId(CLIENT_ID));
+
+        thread.setState(StreamThread.State.STARTING);
+        thread.runLoop();
+
+        verify(taskManager);
+        verify(consumer);
+    }
+
+    @Test
     public void shouldNotCommitNonRunningNonRestoringTasks() {
         final TaskManager taskManager = 
EasyMock.createNiceMock(TaskManager.class);
         final Consumer<byte[], byte[]> consumer = mock(Consumer.class);

Reply via email to