This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1e45927  KAFKA-12648: fix IllegalStateException in ClientState after 
removing topologies (#11591)
1e45927 is described below

commit 1e459271d777e4721c3f7a36c5b4fbb2a5793f63
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Fri Dec 10 14:26:27 2021 -0800

    KAFKA-12648: fix IllegalStateException in ClientState after removing 
topologies (#11591)
    
    Fix for one of the causes of failure in the NamedTopologyIntegrationTest: 
org.apache.kafka.streams.errors.StreamsException: 
java.lang.IllegalStateException: Must initialize prevActiveTasks from 
ownedPartitions before initializing remaining tasks.
    
    This exception could occur if a member sent in a subscription where all of 
its ownedPartitions were from a named topology that is no longer recognized by 
the group leader, eg because it was just removed from the client. We should 
filter each ClientState based on the current topology only so the assignor only 
processes the partitions/tasks it can identify. The member with the out-of-date 
tasks will eventually clean them up when the #removeNamedTopology API is 
invoked on them
    
    Reviewers: Matthias J. Sax <[email protected]>, Guozhang Wang 
<[email protected]>, Walker Carlson <[email protected]>
---
 .../internals/StreamsPartitionAssignor.java        |  2 +-
 .../internals/assignment/ClientState.java          | 17 ++++++++++-
 .../KafkaStreamsNamedTopologyWrapper.java          |  4 +--
 .../internals/StreamsPartitionAssignorTest.java    |  8 ++---
 .../internals/assignment/ClientStateTest.java      | 34 ++++++++++++++++++----
 5 files changed, 51 insertions(+), 14 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 2ae381d..083253c 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -680,7 +680,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         for (final Map.Entry<UUID, ClientMetadata> entry : 
clientMetadataMap.entrySet()) {
             final UUID uuid = entry.getKey();
             final ClientState state = entry.getValue().state;
-            state.initializePrevTasks(taskForPartition);
+            state.initializePrevTasks(taskForPartition, 
taskManager.topologyMetadata().hasNamedTopologies());
 
             state.computeTaskLags(uuid, allTaskEndOffsetSums);
             clientStates.put(uuid, state);
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
index d828f1e..46f107e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/ClientState.java
@@ -277,14 +277,29 @@ public class ClientState {
         consumerToPreviousStatefulTaskIds.put(consumerId, 
taskOffsetSums.keySet());
     }
 
-    public void initializePrevTasks(final Map<TopicPartition, TaskId> 
taskForPartitionMap) {
+    public void initializePrevTasks(final Map<TopicPartition, TaskId> 
taskForPartitionMap,
+                                    final boolean hasNamedTopologies) {
         if (!previousActiveTasks.taskIds().isEmpty() || 
!previousStandbyTasks.taskIds().isEmpty()) {
             throw new IllegalStateException("Already added previous tasks to 
this client state.");
         }
+
+        maybeFilterUnknownPrevTasksAndPartitions(taskForPartitionMap, 
hasNamedTopologies);
         initializePrevActiveTasksFromOwnedPartitions(taskForPartitionMap);
         initializeRemainingPrevTasksFromTaskOffsetSums();
     }
 
+    private void maybeFilterUnknownPrevTasksAndPartitions(final 
Map<TopicPartition, TaskId> taskForPartitionMap,
+                                                          final boolean 
hasNamedTopologies) {
+        // If this application uses named topologies, then it's possible for 
members to report tasks
+        // or partitions in their subscription that belong to a named topology 
that the group leader
+        // doesn't currently recognize, eg because it was just removed
+        if (hasNamedTopologies) {
+            ownedPartitions.keySet().retainAll(taskForPartitionMap.keySet());
+            
previousActiveTasks.taskIds().retainAll(taskForPartitionMap.values());
+            
previousStandbyTasks.taskIds().retainAll(taskForPartitionMap.values());
+        }
+    }
+
     /**
      * Compute the lag for each stateful task, including tasks this client did 
not previously have.
      */
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
index e7ed7ef..0dd7eca 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
@@ -153,7 +153,7 @@ public class KafkaStreamsNamedTopologyWrapper extends 
KafkaStreams {
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
     public AddNamedTopologyResult addNamedTopology(final NamedTopology 
newTopology) {
-        log.debug("Adding topology: {}", newTopology.name());
+        log.info("Adding topology: {}", newTopology.name());
         if (hasStartedOrFinishedShuttingDown()) {
             throw new IllegalStateException("Cannot add a NamedTopology while 
the state is " + super.state);
         } else if (getTopologyByName(newTopology.name()).isPresent()) {
@@ -179,7 +179,7 @@ public class KafkaStreamsNamedTopologyWrapper extends 
KafkaStreams {
      * @throws TopologyException        if this topology subscribes to any 
input topics or pattern already in use
      */
     public RemoveNamedTopologyResult removeNamedTopology(final String 
topologyToRemove, final boolean resetOffsets) {
-        log.debug("Removing topology: {}", topologyToRemove);
+        log.info("Removing topology: {}", topologyToRemove);
         if (!isRunningOrRebalancing()) {
             throw new IllegalStateException("Cannot remove a NamedTopology 
while the state is " + super.state);
         } else if (!getTopologyByName(topologyToRemove).isPresent()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 24bf5b8..971b688 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -312,7 +312,7 @@ public class StreamsPartitionAssignorTest {
         state.addPreviousTasksAndOffsetSums(CONSUMER_1, 
getTaskOffsetSums(asList(TASK_0_0, TASK_1_1, TASK_1_3), EMPTY_TASKS));
         state.addPreviousTasksAndOffsetSums(CONSUMER_2, 
getTaskOffsetSums(asList(TASK_0_3, TASK_1_0), EMPTY_TASKS));
         state.addPreviousTasksAndOffsetSums(CONSUMER_3, 
getTaskOffsetSums(asList(TASK_0_1, TASK_0_2, TASK_1_2), EMPTY_TASKS));
-        state.initializePrevTasks(emptyMap());
+        state.initializePrevTasks(emptyMap(), false);
         state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks));
 
         assertEquivalentAssignment(
@@ -342,7 +342,7 @@ public class StreamsPartitionAssignorTest {
         state.addPreviousTasksAndOffsetSums(CONSUMER_1, 
getTaskOffsetSums(asList(TASK_0_0, TASK_1_1, TASK_1_3), EMPTY_TASKS));
         state.addPreviousTasksAndOffsetSums(CONSUMER_2, 
getTaskOffsetSums(asList(TASK_0_3, TASK_1_0), EMPTY_TASKS));
         state.addPreviousTasksAndOffsetSums(CONSUMER_3, 
getTaskOffsetSums(asList(TASK_0_1, TASK_0_2, TASK_1_2), EMPTY_TASKS));
-        state.initializePrevTasks(emptyMap());
+        state.initializePrevTasks(emptyMap(), false);
         state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks));
 
         // We should be able to add a new task without sacrificing stickiness
@@ -378,7 +378,7 @@ public class StreamsPartitionAssignorTest {
         state.addPreviousTasksAndOffsetSums(CONSUMER_1, 
getTaskOffsetSums(asList(TASK_0_0, TASK_1_1, TASK_1_3), EMPTY_TASKS));
         state.addPreviousTasksAndOffsetSums(CONSUMER_2, 
getTaskOffsetSums(asList(TASK_0_3, TASK_1_0), EMPTY_TASKS));
         state.addPreviousTasksAndOffsetSums(CONSUMER_3, 
getTaskOffsetSums(asList(TASK_0_1, TASK_0_2, TASK_1_2), EMPTY_TASKS));
-        state.initializePrevTasks(emptyMap());
+        state.initializePrevTasks(emptyMap(), false);
         state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks));
 
         // Consumer 3 leaves the group
@@ -421,7 +421,7 @@ public class StreamsPartitionAssignorTest {
         consumers.add(CONSUMER_4);
         state.addPreviousTasksAndOffsetSums(CONSUMER_4, 
getTaskOffsetSums(EMPTY_TASKS, EMPTY_TASKS));
 
-        state.initializePrevTasks(emptyMap());
+        state.initializePrevTasks(emptyMap(), false);
         state.computeTaskLags(UUID_1, getTaskEndOffsetSums(allTasks));
 
         final Map<String, List<TaskId>> assignment = assignTasksToThreads(
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
index e8acdc3..928129c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/ClientStateTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.processor.internals.assignment;
 
+import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.internals.Task;
@@ -312,13 +313,32 @@ public class ClientStateTest {
     public void shouldAddTasksWithLatestOffsetToPrevActiveTasks() {
         final Map<TaskId, Long> taskOffsetSums = 
Collections.singletonMap(TASK_0_1, Task.LATEST_OFFSET);
         client.addPreviousTasksAndOffsetSums("c1", taskOffsetSums);
-        client.initializePrevTasks(Collections.emptyMap());
+        client.initializePrevTasks(Collections.emptyMap(), false);
         assertThat(client.prevActiveTasks(), 
equalTo(Collections.singleton(TASK_0_1)));
         assertThat(client.previousAssignedTasks(), 
equalTo(Collections.singleton(TASK_0_1)));
         assertTrue(client.prevStandbyTasks().isEmpty());
     }
 
     @Test
+    public void 
shouldThrowWhenSomeOwnedPartitionsAreNotRecognizedWhenInitializingPrevTasks() {
+        final Map<TopicPartition, TaskId> taskForPartitionMap = 
Collections.singletonMap(TP_0_1, TASK_0_1);
+        client.addOwnedPartitions(Collections.singleton(TP_0_0), "c1");
+        client.addPreviousTasksAndOffsetSums("c1", Collections.emptyMap());
+        assertThrows(IllegalStateException.class, () -> 
client.initializePrevTasks(taskForPartitionMap, false));
+    }
+
+    @Test
+    public void 
shouldFilterOutUnrecognizedPartitionsAndInitializePrevTasksWhenUsingNamedTopologies()
 {
+        final Map<TopicPartition, TaskId> taskForPartitionMap = 
Collections.singletonMap(TP_0_1, TASK_0_1);
+        client.addOwnedPartitions(Collections.singleton(TP_0_0), "c1");
+        client.addPreviousTasksAndOffsetSums("c1", Collections.emptyMap());
+        client.initializePrevTasks(taskForPartitionMap, true);
+        assertThat(client.prevActiveTasks().isEmpty(), is(true));
+        assertThat(client.previousAssignedTasks().isEmpty(), is(true));
+        assertThat(client.prevStandbyTasks().isEmpty(), is(true));
+    }
+
+    @Test
     public void shouldReturnPreviousStatefulTasksForConsumer() {
         client.addPreviousTasksAndOffsetSums("c1", mkMap(
                 mkEntry(TASK_0_0, 100L),
@@ -327,7 +347,7 @@ public class ClientStateTest {
         client.addPreviousTasksAndOffsetSums("c2", 
Collections.singletonMap(TASK_0_2, 0L));
         client.addPreviousTasksAndOffsetSums("c3", Collections.emptyMap());
 
-        client.initializePrevTasks(Collections.emptyMap());
+        client.initializePrevTasks(Collections.emptyMap(), false);
 
         assertThat(client.prevOwnedStatefulTasksByConsumer("c1"), 
equalTo(mkSet(TASK_0_0, TASK_0_1)));
         assertThat(client.prevOwnedStatefulTasksByConsumer("c2"), 
equalTo(mkSet(TASK_0_2)));
@@ -338,13 +358,15 @@ public class ClientStateTest {
     public void shouldReturnPreviousActiveStandbyTasksForConsumer() {
         client.addOwnedPartitions(mkSet(TP_0_1, TP_1_1), "c1");
         client.addOwnedPartitions(mkSet(TP_0_2, TP_1_2), "c2");
-        client.initializePrevTasks(mkMap(
+        client.initializePrevTasks(
+            mkMap(
                 mkEntry(TP_0_0, TASK_0_0),
                 mkEntry(TP_0_1, TASK_0_1),
                 mkEntry(TP_0_2, TASK_0_2),
                 mkEntry(TP_1_0, TASK_0_0),
                 mkEntry(TP_1_1, TASK_0_1),
-                mkEntry(TP_1_2, TASK_0_2))
+                mkEntry(TP_1_2, TASK_0_2)),
+            false
         );
 
         client.addPreviousTasksAndOffsetSums("c1", mkMap(
@@ -406,7 +428,7 @@ public class ClientStateTest {
             mkEntry(TASK_0_2, 100L)
         );
         client.addPreviousTasksAndOffsetSums("c1", taskOffsetSums);
-        client.initializePrevTasks(Collections.emptyMap());
+        client.initializePrevTasks(Collections.emptyMap(), false);
         assertThat(client.prevStandbyTasks(), equalTo(mkSet(TASK_0_1, 
TASK_0_2)));
         assertThat(client.previousAssignedTasks(), equalTo(mkSet(TASK_0_1, 
TASK_0_2)));
         assertTrue(client.prevActiveTasks().isEmpty());
@@ -501,7 +523,7 @@ public class ClientStateTest {
     @Test
     public void 
shouldThrowIllegalStateExceptionIfAttemptingToInitializeNonEmptyPrevTaskSets() {
         client.addPreviousActiveTasks(Collections.singleton(TASK_0_1));
-        assertThrows(IllegalStateException.class, () -> 
client.initializePrevTasks(Collections.emptyMap()));
+        assertThrows(IllegalStateException.class, () -> 
client.initializePrevTasks(Collections.emptyMap(), false));
     }
 
     @Test

Reply via email to