This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 4.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/4.0 by this push:
     new ed23f6be900 KAFKA-18839: Drop EAGER rebalancing support in Kafka 
Streams (#18988)
ed23f6be900 is described below

commit ed23f6be9007f6c0add40d2ff89a7bb28107a20b
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Feb 25 19:05:03 2025 -0800

    KAFKA-18839: Drop EAGER rebalancing support in Kafka Streams (#18988)
    
    In 3.1 we deprecated the eager rebalancing protocol and marked it for
    removal in a later release. We aim to officially drop support and remove
    the protocol from Streams in 4.0.
    
    The effect of this PR is that it will no longer be possible to perform a
    live upgrade Kafka Streams directly to 4.0 from version 2.3 or below.
    Users will have to go through a bridge release between 2.4 - 3.9
    instead.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../org/apache/kafka/streams/StreamsConfig.java    |  12 +-
 .../internals/StreamsPartitionAssignor.java        |  18 +-
 .../assignment/AssignorConfiguration.java          | 104 +----------
 .../internals/StreamsPartitionAssignorTest.java    |  92 +--------
 .../assignment/AssignorConfigurationTest.java      |  28 +--
 .../StreamsUpgradeToCooperativeRebalanceTest.java  | 136 --------------
 .../streams/streams_application_upgrade_test.py    |  19 +-
 .../streams_cooperative_rebalance_upgrade_test.py  | 206 ---------------------
 .../tests/streams/streams_upgrade_test.py          |   3 +-
 9 files changed, 47 insertions(+), 571 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index b92431ed2be..2e156d43d94 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -794,13 +794,11 @@ public class StreamsConfig extends AbstractConfig {
     /** {@code upgrade.from} */
     @SuppressWarnings("WeakerAccess")
     public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
-    private static final String UPGRADE_FROM_DOC = "Allows upgrading in a 
backward compatible way. " +
-        "This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when 
upgrading from [2.0, 2.3] to 2.4+. " +
-        "When upgrading from 3.3 to a newer version it is not required to 
specify this config. Default is `null`. " +
-        "Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + 
UPGRADE_FROM_0101 + "\", \"" +
-        UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + 
UPGRADE_FROM_10 + "\", \"" +
-        UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + 
UPGRADE_FROM_21 + "\", \"" +
-        UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\", \"" + 
UPGRADE_FROM_24 + "\", \"" +
+    private static final String UPGRADE_FROM_DOC = "Allows live upgrading (and 
downgrading in some cases -- see upgrade guide) in a backward compatible way. 
Default is `null`. " +
+        "Please refer to the Kafka Streams upgrade guide for instructions on 
how and when to use this config. " +
+        "Note that when upgrading from 3.5 to a newer version it is never 
required to specify this config, " +
+        "while upgrading live directly to 4.0+ from 2.3 or below is no longer 
supported even with this config. " +
+        "Accepted values are \"" + UPGRADE_FROM_24 + "\", \"" +
         UPGRADE_FROM_25 + "\", \"" + UPGRADE_FROM_26 + "\", \"" + 
UPGRADE_FROM_27 + "\", \"" +
         UPGRADE_FROM_28 + "\", \"" + UPGRADE_FROM_30 + "\", \"" + 
UPGRADE_FROM_31 + "\", \"" +
         UPGRADE_FROM_32 + "\", \"" + UPGRADE_FROM_33 + "\", \"" + 
UPGRADE_FROM_34 + "\", \"" +
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
index 4e26b697e05..f458629cf1f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
@@ -92,6 +92,7 @@ import java.util.function.Function;
 import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
+import static java.util.Collections.singletonList;
 import static java.util.Collections.unmodifiableSet;
 import static java.util.Map.Entry.comparingByKey;
 import static org.apache.kafka.common.utils.Utils.filterMap;
@@ -216,11 +217,13 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
     private Queue<StreamsException> nonFatalExceptionsToHandle;
     private Time time;
 
+    // since live upgrades from 2.3 (or earlier) to 4.0 or above are no longer 
supported, we can always
+    // start with the latest supported metadata version since version probing 
will take
+    // care of downgrading it if/when necessary
     protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;
 
     private InternalTopicManager internalTopicManager;
     private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
-    private RebalanceProtocol rebalanceProtocol;
     private AssignmentListener assignmentListener;
 
     private 
Supplier<Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor>>
@@ -242,7 +245,6 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
 
         logPrefix = assignorConfiguration.logPrefix();
         log = new LogContext(logPrefix).logger(getClass());
-        usedSubscriptionMetadataVersion = 
assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion);
 
         final ReferenceContainer referenceContainer = 
assignorConfiguration.referenceContainer();
         mainConsumerSupplier = () -> 
Objects.requireNonNull(referenceContainer.mainConsumer, "Main consumer was not 
specified");
@@ -258,7 +260,6 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         userEndPoint = assignorConfiguration.userEndPoint();
         internalTopicManager = assignorConfiguration.internalTopicManager();
         copartitionedTopicsEnforcer = 
assignorConfiguration.copartitionedTopicsEnforcer();
-        rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
         customTaskAssignorSupplier = assignorConfiguration::customTaskAssignor;
         legacyTaskAssignorSupplier = assignorConfiguration::taskAssignor;
         assignmentListener = assignorConfiguration.assignmentListener();
@@ -273,12 +274,7 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
 
     @Override
     public List<RebalanceProtocol> supportedProtocols() {
-        final List<RebalanceProtocol> supportedProtocols = new ArrayList<>();
-        supportedProtocols.add(RebalanceProtocol.EAGER);
-        if (rebalanceProtocol == RebalanceProtocol.COOPERATIVE) {
-            supportedProtocols.add(rebalanceProtocol);
-        }
-        return supportedProtocols;
+        return singletonList(RebalanceProtocol.COOPERATIVE);
     }
 
     @Override
@@ -1669,10 +1665,6 @@ public class StreamsPartitionAssignor implements 
ConsumerPartitionAssignor, Conf
         this.internalTopicManager = internalTopicManager;
     }
 
-    RebalanceProtocol rebalanceProtocol() {
-        return rebalanceProtocol;
-    }
-
     protected String userEndPoint() {
         return userEndPoint;
     }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
index bc2324044da..b210e638eca 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfiguration.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.processor.internals.assignment;
 
 import org.apache.kafka.clients.CommonClientConfigs;
-import 
org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.utils.LogContext;
@@ -37,7 +36,6 @@ import java.util.Optional;
 import static org.apache.kafka.common.utils.Utils.getHost;
 import static org.apache.kafka.common.utils.Utils.getPort;
 import static 
org.apache.kafka.streams.StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS;
-import static 
org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;
 
 public final class AssignorConfiguration {
     private final String internalTaskAssignorClass;
@@ -61,6 +59,8 @@ public final class AssignorConfiguration {
         final LogContext logContext = new LogContext(logPrefix);
         log = logContext.logger(getClass());
 
+        validateUpgradeFrom();
+
         {
             final Object o = 
configs.get(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR);
             if (o == null) {
@@ -94,7 +94,9 @@ public final class AssignorConfiguration {
         return referenceContainer;
     }
 
-    public RebalanceProtocol rebalanceProtocol() {
+    // cooperative rebalancing was introduced in 2.4 and the old protocol 
(eager rebalancing) was removed
+    // in 4.0, meaning live upgrades from 2.3 or below to 4.0+ are no longer 
possible without a bridge release
+    public void validateUpgradeFrom() {
         final String upgradeFrom = 
streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
         if (upgradeFrom != null) {
             switch (UpgradeFromValues.fromString(upgradeFrom)) {
@@ -108,106 +110,20 @@ public final class AssignorConfiguration {
                 case UPGRADE_FROM_21:
                 case UPGRADE_FROM_22:
                 case UPGRADE_FROM_23:
-                    // ATTENTION: The following log messages is used for 
verification in system test
-                    // 
streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance
-                    // If you change it, please do also change the system test 
accordingly and
-                    // verify whether the test passes.
-                    log.info("Eager rebalancing protocol is enabled now for 
upgrade from {}.x", upgradeFrom);
-                    log.warn("The eager rebalancing protocol is deprecated and 
will stop being supported in a future release." +
-                        " Please be prepared to remove the 'upgrade.from' 
config soon.");
-                    return RebalanceProtocol.EAGER;
-                case UPGRADE_FROM_24:
-                case UPGRADE_FROM_25:
-                case UPGRADE_FROM_26:
-                case UPGRADE_FROM_27:
-                case UPGRADE_FROM_28:
-                case UPGRADE_FROM_30:
-                case UPGRADE_FROM_31:
-                case UPGRADE_FROM_32:
-                case UPGRADE_FROM_33:
-                case UPGRADE_FROM_34:
-                case UPGRADE_FROM_35:
-                case UPGRADE_FROM_36:
-                case UPGRADE_FROM_37:
-                case UPGRADE_FROM_38:
-                case UPGRADE_FROM_39:
-                    // we need to add new version when new "upgrade.from" 
values become available
+                    final String errMsg = String.format(
+                        "The eager rebalancing protocol is no longer supported 
in 4.0 which means live upgrades from 2.3 or below are not possible."
+                            + " Please see the Streams upgrade guide for the 
bridge releases and recommended upgrade path. Got upgrade.from='%s'", 
upgradeFrom);
+                    log.error(errMsg);
+                    throw new ConfigException(errMsg);
 
-                    // This config is for explicitly sending FK response to a 
requested partition
-                    // and should not affect the rebalance protocol
-                    break;
-                default:
-                    throw new IllegalArgumentException("Unknown configuration 
value for parameter 'upgrade.from': " + upgradeFrom);
             }
         }
-        // ATTENTION: The following log messages is used for verification in 
system test
-        // 
streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance
-        // If you change it, please do also change the system test accordingly 
and
-        // verify whether the test passes.
-        log.info("Cooperative rebalancing protocol is enabled now");
-        return RebalanceProtocol.COOPERATIVE;
     }
 
     public String logPrefix() {
         return logPrefix;
     }
 
-    public int configuredMetadataVersion(final int priorVersion) {
-        final String upgradeFrom = 
streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
-        if (upgradeFrom != null) {
-            switch (UpgradeFromValues.fromString(upgradeFrom)) {
-                case UPGRADE_FROM_0100:
-                    log.info(
-                        "Downgrading metadata.version from {} to 1 for upgrade 
from 0.10.0.x.",
-                        LATEST_SUPPORTED_VERSION
-                    );
-                    return 1;
-                case UPGRADE_FROM_0101:
-                case UPGRADE_FROM_0102:
-                case UPGRADE_FROM_0110:
-                case UPGRADE_FROM_10:
-                case UPGRADE_FROM_11:
-                    log.info(
-                        "Downgrading metadata.version from {} to 2 for upgrade 
from {}.x.",
-                        LATEST_SUPPORTED_VERSION,
-                        upgradeFrom
-                    );
-                    return 2;
-                case UPGRADE_FROM_20:
-                case UPGRADE_FROM_21:
-                case UPGRADE_FROM_22:
-                case UPGRADE_FROM_23:
-                    // These configs are for cooperative rebalancing and 
should not affect the metadata version
-                    break;
-                case UPGRADE_FROM_24:
-                case UPGRADE_FROM_25:
-                case UPGRADE_FROM_26:
-                case UPGRADE_FROM_27:
-                case UPGRADE_FROM_28:
-                case UPGRADE_FROM_30:
-                case UPGRADE_FROM_31:
-                case UPGRADE_FROM_32:
-                case UPGRADE_FROM_33:
-                case UPGRADE_FROM_34:
-                case UPGRADE_FROM_35:
-                case UPGRADE_FROM_36:
-                case UPGRADE_FROM_37:
-                case UPGRADE_FROM_38:
-                case UPGRADE_FROM_39:
-                    // we need to add new version when new "upgrade.from" 
values become available
-
-                    // This config is for explicitly sending FK response to a 
requested partition
-                    // and should not affect the metadata version
-                    break;
-                default:
-                    throw new IllegalArgumentException(
-                        "Unknown configuration value for parameter 
'upgrade.from': " + upgradeFrom
-                    );
-            }
-        }
-        return priorVersion;
-    }
-
     public String userEndPoint() {
         final String configuredUserEndpoint = 
streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG);
         if (configuredUserEndpoint != null && 
!configuredUserEndpoint.isEmpty()) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
index 592326fae87..29fa204a579 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignorTest.java
@@ -159,7 +159,6 @@ import static org.hamcrest.Matchers.anEmptyMap;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -371,23 +370,11 @@ public class StreamsPartitionAssignorTest {
 
     @ParameterizedTest
     @MethodSource("parameter")
-    public void shouldUseEagerRebalancingProtocol(final Map<String, Object> 
parameterizedConfig) {
-        setUp(parameterizedConfig, false);
-        createDefaultMockTaskManager();
-        
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG,
 StreamsConfig.UPGRADE_FROM_23), parameterizedConfig);
-
-        assertEquals(1, partitionAssignor.supportedProtocols().size());
-        
assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.EAGER));
-        
assertFalse(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE));
-    }
-
-    @ParameterizedTest
-    @MethodSource("parameter")
-    public void shouldUseCooperativeRebalancingProtocol(final Map<String, 
Object> parameterizedConfig) {
+    public void shouldSupportOnlyCooperativeRebalancingProtocol(final 
Map<String, Object> parameterizedConfig) {
         setUp(parameterizedConfig, false);
         configureDefault(parameterizedConfig);
 
-        assertEquals(2, partitionAssignor.supportedProtocols().size());
+        assertEquals(1, partitionAssignor.supportedProtocols().size());
         
assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE));
     }
 
@@ -586,7 +573,7 @@ public class StreamsPartitionAssignorTest {
 
     @ParameterizedTest
     @MethodSource("parameter")
-    public void testEagerSubscription(final Map<String, Object> 
parameterizedConfig) {
+    public void shouldThrowOnEagerSubscription(final Map<String, Object> 
parameterizedConfig) {
         setUp(parameterizedConfig, false);
         builder.addSource(null, "source1", null, null, null, "topic1");
         builder.addSource(null, "source2", null, null, null, "topic2");
@@ -600,17 +587,10 @@ public class StreamsPartitionAssignorTest {
         );
 
         createMockTaskManager(prevTasks, standbyTasks);
-        
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG,
 StreamsConfig.UPGRADE_FROM_23), parameterizedConfig);
-        assertThat(partitionAssignor.rebalanceProtocol(), 
equalTo(RebalanceProtocol.EAGER));
-
-        final Set<String> topics = Set.of("topic1", "topic2");
-        final Subscription subscription = new Subscription(new 
ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
-
-        Collections.sort(subscription.topics());
-        assertEquals(asList("topic1", "topic2"), subscription.topics());
-
-        final SubscriptionInfo info = getInfo(PID_1, prevTasks, standbyTasks, 
uniqueField);
-        assertEquals(info, SubscriptionInfo.decode(subscription.userData()));
+        assertThrows(
+            ConfigException.class,
+            () -> 
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG,
 StreamsConfig.UPGRADE_FROM_23), parameterizedConfig)
+        );
     }
 
     @ParameterizedTest
@@ -2135,64 +2115,6 @@ public class StreamsPartitionAssignorTest {
         
assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(),
 equalTo(smallestVersion));
     }
 
-    @ParameterizedTest
-    @MethodSource("parameter")
-    public void shouldDownGradeSubscriptionToVersion1(final Map<String, 
Object> parameterizedConfig) {
-        setUp(parameterizedConfig, false);
-        createDefaultMockTaskManager();
-        
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG,
 StreamsConfig.UPGRADE_FROM_0100), parameterizedConfig);
-
-        final Set<String> topics = Set.of("topic1");
-        final Subscription subscription = new Subscription(new 
ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
-
-        assertThat(SubscriptionInfo.decode(subscription.userData()).version(), 
equalTo(1));
-    }
-
-    @ParameterizedTest
-    @MethodSource("parameter")
-    public void shouldDownGradeSubscriptionToVersion2For0101(final Map<String, 
Object> parameterizedConfig) {
-        setUp(parameterizedConfig, false);
-        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101, 
parameterizedConfig);
-    }
-
-    @ParameterizedTest
-    @MethodSource("parameter")
-    public void shouldDownGradeSubscriptionToVersion2For0102(final Map<String, 
Object> parameterizedConfig) {
-        setUp(parameterizedConfig, false);
-        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102, 
parameterizedConfig);
-    }
-
-    @ParameterizedTest
-    @MethodSource("parameter")
-    public void shouldDownGradeSubscriptionToVersion2For0110(final Map<String, 
Object> parameterizedConfig) {
-        setUp(parameterizedConfig, false);
-        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110, 
parameterizedConfig);
-    }
-
-    @ParameterizedTest
-    @MethodSource("parameter")
-    public void shouldDownGradeSubscriptionToVersion2For10(final Map<String, 
Object> parameterizedConfig) {
-        setUp(parameterizedConfig, false);
-        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10, 
parameterizedConfig);
-    }
-
-    @ParameterizedTest
-    @MethodSource("parameter")
-    public void shouldDownGradeSubscriptionToVersion2For11(final Map<String, 
Object> parameterizedConfig) {
-        setUp(parameterizedConfig, false);
-        shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11, 
parameterizedConfig);
-    }
-
-    private void shouldDownGradeSubscriptionToVersion2(final Object 
upgradeFromValue, final Map<String, Object> parameterizedConfig) {
-        createDefaultMockTaskManager();
-        
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG,
 upgradeFromValue), parameterizedConfig);
-
-        final Set<String> topics = Set.of("topic1");
-        final Subscription subscription = new Subscription(new 
ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));
-
-        assertThat(SubscriptionInfo.decode(subscription.userData()).version(), 
equalTo(2));
-    }
-
     @ParameterizedTest
     @MethodSource("parameter")
     public void 
shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins(final
 Map<String, Object> parameterizedConfig) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
index e6ac8aa50ad..261f718f09b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/AssignorConfigurationTest.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import static 
org.apache.kafka.streams.processor.internals.assignment.AssignmentTestUtils.EMPTY_RACK_AWARE_ASSIGNMENT_TAGS;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.mockito.Mockito.mock;
 
@@ -54,30 +55,21 @@ public class AssignorConfigurationTest {
     }
 
     @Test
-    public void rebalanceProtocolShouldSupportAllUpgradeFromVersions() {
+    public void 
shouldSupportAllUpgradeFromVersionsFromCooperativeRebalancingOn() {
+        boolean beforeCooperative = true;
         for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) 
{
-            config.put(StreamsConfig.UPGRADE_FROM_CONFIG, 
upgradeFrom.toString());
-            final AssignorConfiguration assignorConfiguration = new 
AssignorConfiguration(config);
-
-            try {
-                assignorConfiguration.rebalanceProtocol();
-            } catch (final Exception error) {
-                throw new AssertionError("Upgrade from " + upgradeFrom + " 
failed with " + error.getMessage() + "!");
+            if (upgradeFrom.toString().equals("2.4")) {
+                beforeCooperative = false;
             }
-        }
-    }
 
-    @Test
-    public void configuredMetadataVersionShouldSupportAllUpgradeFromVersions() 
{
-        for (final UpgradeFromValues upgradeFrom : UpgradeFromValues.values()) 
{
             config.put(StreamsConfig.UPGRADE_FROM_CONFIG, 
upgradeFrom.toString());
-            final AssignorConfiguration assignorConfiguration = new 
AssignorConfiguration(config);
 
-            try {
-                assignorConfiguration.configuredMetadataVersion(0);
-            } catch (final Exception error) {
-                throw new AssertionError("Upgrade from " + upgradeFrom + " 
failed with " + error.getMessage() + "!");
+            if (beforeCooperative) {
+                assertThrows(ConfigException.class, () -> new 
AssignorConfiguration(config));
+            } else {
+                assertDoesNotThrow(() -> new AssignorConfiguration(config));
             }
         }
     }
+
 }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
deleted file mode 100644
index 0a7bbe14f5c..00000000000
--- 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeToCooperativeRebalanceTest.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.KafkaStreams.State;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.TaskMetadata;
-import org.apache.kafka.streams.ThreadMetadata;
-import org.apache.kafka.streams.kstream.ForeachAction;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-
-public class StreamsUpgradeToCooperativeRebalanceTest {
-
-
-    public static void main(final String[] args) throws Exception {
-        if (args.length < 1) {
-            System.err.println("StreamsUpgradeToCooperativeRebalanceTest 
requires one argument (properties-file) but no args provided");
-        }
-        System.out.println("Args are " + Arrays.toString(args));
-        final String propFileName = args[0];
-        final Properties streamsProperties = Utils.loadProps(propFileName);
-
-        final Properties config = new Properties();
-        System.out.println("StreamsTest instance started 
(StreamsUpgradeToCooperativeRebalanceTest)");
-        System.out.println("props=" + streamsProperties);
-
-        config.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"cooperative-rebalance-upgrade");
-        config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000L);
-        config.putAll(streamsProperties);
-
-        final String sourceTopic = 
streamsProperties.getProperty("source.topic", "source");
-        final String sinkTopic = streamsProperties.getProperty("sink.topic", 
"sink");
-        final String taskDelimiter = "#";
-        final int reportInterval = 
Integer.parseInt(streamsProperties.getProperty("report.interval", "100"));
-        final String upgradePhase = 
streamsProperties.getProperty("upgrade.phase", "");
-
-        final StreamsBuilder builder = new StreamsBuilder();
-
-        builder.<String, String>stream(sourceTopic)
-            .peek(new ForeachAction<String, String>() {
-                int recordCounter = 0;
-
-                @Override
-                public void apply(final String key, final String value) {
-                    if (recordCounter++ % reportInterval == 0) {
-                        System.out.printf("%sProcessed %d records so far%n", 
upgradePhase, recordCounter);
-                        System.out.flush();
-                    }
-                }
-            }
-            ).to(sinkTopic);
-
-        final KafkaStreams streams = new KafkaStreams(builder.build(), config);
-
-        streams.setStateListener((newState, oldState) -> {
-            if (newState == State.RUNNING && oldState == State.REBALANCING) {
-                System.out.printf("%sSTREAMS in a RUNNING State%n", 
upgradePhase);
-                final Set<ThreadMetadata> allThreadMetadata = 
streams.metadataForLocalThreads();
-                final StringBuilder taskReportBuilder = new StringBuilder();
-                final List<String> activeTasks = new ArrayList<>();
-                final List<String> standbyTasks = new ArrayList<>();
-                for (final ThreadMetadata threadMetadata : allThreadMetadata) {
-                    getTasks(threadMetadata.activeTasks(), activeTasks);
-                    if (!threadMetadata.standbyTasks().isEmpty()) {
-                        getTasks(threadMetadata.standbyTasks(), standbyTasks);
-                    }
-                }
-                addTasksToBuilder(activeTasks, taskReportBuilder);
-                taskReportBuilder.append(taskDelimiter);
-                if (!standbyTasks.isEmpty()) {
-                    addTasksToBuilder(standbyTasks, taskReportBuilder);
-                }
-                System.out.println("TASK-ASSIGNMENTS:" + taskReportBuilder);
-            }
-
-            if (newState == State.REBALANCING) {
-                System.out.printf("%sStarting a REBALANCE%n", upgradePhase);
-            }
-        });
-
-
-        streams.start();
-
-        Exit.addShutdownHook("streams-shutdown-hook", () -> {
-            streams.close();
-            System.out.printf("%sCOOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED%n", 
upgradePhase);
-            System.out.flush();
-        });
-    }
-
-    private static void addTasksToBuilder(final List<String> tasks, final 
StringBuilder builder) {
-        if (!tasks.isEmpty()) {
-            for (final String task : tasks) {
-                builder.append(task).append(",");
-            }
-            builder.setLength(builder.length() - 1);
-        }
-    }
-
-    private static void getTasks(final Set<TaskMetadata> taskMetadata,
-                                 final List<String> taskList) {
-        for (final TaskMetadata task : taskMetadata) {
-            final Set<TopicPartition> topicPartitions = task.topicPartitions();
-            for (final TopicPartition topicPartition : topicPartitions) {
-                taskList.add(topicPartition.toString());
-            }
-        }
-    }
-}
diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
index 70d30b35012..1ae56ac0d95 100644
--- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
@@ -23,19 +23,18 @@ from kafkatest.services.streams import 
StreamsSmokeTestDriverService, StreamsSmo
 from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
   LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5, 
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, DEV_VERSION, KafkaVersion
 
-smoke_test_versions = [str(LATEST_2_2), str(LATEST_2_3), str(LATEST_2_4),
-                       str(LATEST_2_5), str(LATEST_2_6), str(LATEST_2_7),
-                       str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1),
-                       str(LATEST_3_2), str(LATEST_3_3), str(LATEST_3_4),
-                       str(LATEST_3_5), str(LATEST_3_6), str(LATEST_3_7),
-                       str(LATEST_3_8), str(LATEST_3_9)]
+
+smoke_test_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6),
+                       str(LATEST_2_7), str(LATEST_2_8), str(LATEST_3_0),
+                       str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3),
+                       str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6),
+                       str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9)]
 
 class StreamsUpgradeTest(Test):
     """
-    Test upgrading Kafka Streams (all version combination)
-    If metadata was changes, upgrade is more difficult
-    Metadata version was bumped in 0.10.1.0 and
-    subsequently bumped in 2.0.0
+    Test upgrading Kafka Streams (all possible version combination)
+    Directly upgrading from 2.3 or below is no longer supported as
+    of version 4.0
     """
 
     def __init__(self, test_context):
diff --git 
a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py
deleted file mode 100644
index a478f11f340..00000000000
--- 
a/tests/kafkatest/tests/streams/streams_cooperative_rebalance_upgrade_test.py
+++ /dev/null
@@ -1,206 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import time
-from ducktape.mark import matrix
-from ducktape.mark.resource import cluster
-from ducktape.tests.test import Test
-from kafkatest.services.kafka import KafkaService, quorum
-from kafkatest.services.verifiable_producer import VerifiableProducer
-from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3
-from kafkatest.services.streams import CooperativeRebalanceUpgradeService
-from kafkatest.tests.streams.utils import verify_stopped, stop_processors, 
verify_running
-
-
-class StreamsCooperativeRebalanceUpgradeTest(Test):
-    """
-    Test of a rolling upgrade from eager rebalance to
-    cooperative rebalance
-    """
-
-    source_topic = "source"
-    sink_topic = "sink"
-    task_delimiter = "#"
-    report_interval = "1000"
-    processing_message = "Processed [0-9]* records so far"
-    stopped_message = "COOPERATIVE-REBALANCE-TEST-CLIENT-CLOSED"
-    running_state_msg = "STREAMS in a RUNNING State"
-    cooperative_turned_off_msg = "Eager rebalancing protocol is enabled now 
for upgrade from %s"
-    cooperative_enabled_msg = "Cooperative rebalancing protocol is enabled now"
-    first_bounce_phase = "first_bounce_phase-"
-    second_bounce_phase = "second_bounce_phase-"
-
-    # !!CAUTION!!: THIS LIST OF VERSIONS IS FIXED, NO VERSIONS MUST BE ADDED
-    streams_eager_rebalance_upgrade_versions = [str(LATEST_2_1), 
str(LATEST_2_2), str(LATEST_2_3)]
-
-    def __init__(self, test_context):
-        super(StreamsCooperativeRebalanceUpgradeTest, 
self).__init__(test_context)
-        self.topics = {
-            self.source_topic: {'partitions': 9},
-            self.sink_topic: {'partitions': 9}
-        }
-
-        self.kafka = KafkaService(self.test_context, num_nodes=3,
-                                  zk=None, topics=self.topics,
-                                  controller_num_nodes_override=1)
-
-        self.producer = VerifiableProducer(self.test_context,
-                                           1,
-                                           self.kafka,
-                                           self.source_topic,
-                                           throughput=1000,
-                                           acks=1)
-
-    @cluster(num_nodes=8)
-    @matrix(upgrade_from_version=streams_eager_rebalance_upgrade_versions, 
metadata_quorum=[quorum.combined_kraft])
-    def test_upgrade_to_cooperative_rebalance(self, upgrade_from_version, 
metadata_quorum):
-        self.kafka.start()
-
-        processor1 = CooperativeRebalanceUpgradeService(self.test_context, 
self.kafka)
-        processor2 = CooperativeRebalanceUpgradeService(self.test_context, 
self.kafka)
-        processor3 = CooperativeRebalanceUpgradeService(self.test_context, 
self.kafka)
-
-        processors = [processor1, processor2, processor3]
-
-        # produce records continually during the test
-        self.producer.start()
-
-        # start all processors without upgrade_from config; normal operations 
mode
-        self.logger.info("Starting all streams clients in normal running mode")
-        for processor in processors:
-            processor.set_version(upgrade_from_version)
-            self.set_props(processor)
-            processor.CLEAN_NODE_ENABLED = False
-            # can't use state as older version don't have state listener
-            # so just verify up and running
-            verify_running(processor, self.processing_message)
-
-        # all running rebalancing has ceased
-        for processor in processors:
-            self.verify_processing(processor, self.processing_message)
-
-        # first rolling bounce with "upgrade.from" config set
-        previous_phase = ""
-        self.maybe_upgrade_rolling_bounce_and_verify(processors,
-                                                     previous_phase,
-                                                     self.first_bounce_phase,
-                                                     upgrade_from_version)
-
-        # All nodes processing, rebalancing has ceased
-        for processor in processors:
-            self.verify_processing(processor, self.first_bounce_phase + 
self.processing_message)
-
-        # second rolling bounce without "upgrade.from" config
-        self.maybe_upgrade_rolling_bounce_and_verify(processors,
-                                                     self.first_bounce_phase,
-                                                     self.second_bounce_phase)
-
-        # All nodes processing, rebalancing has ceased
-        for processor in processors:
-            self.verify_processing(processor, self.second_bounce_phase + 
self.processing_message)
-
-        # now verify tasks are unique
-        for processor in processors:
-            self.get_tasks_for_processor(processor)
-            self.logger.info("Active tasks %s" % processor.active_tasks)
-
-        overlapping_tasks = 
processor1.active_tasks.intersection(processor2.active_tasks)
-        assert len(overlapping_tasks) == int(0), \
-            "Final task assignments are not unique %s %s" % 
(processor1.active_tasks, processor2.active_tasks)
-
-        overlapping_tasks = 
processor1.active_tasks.intersection(processor3.active_tasks)
-        assert len(overlapping_tasks) == int(0), \
-            "Final task assignments are not unique %s %s" % 
(processor1.active_tasks, processor3.active_tasks)
-
-        overlapping_tasks = 
processor2.active_tasks.intersection(processor3.active_tasks)
-        assert len(overlapping_tasks) == int(0), \
-            "Final task assignments are not unique %s %s" % 
(processor2.active_tasks, processor3.active_tasks)
-
-        # test done close all down
-        stop_processors(processors, self.second_bounce_phase + 
self.stopped_message)
-
-        self.producer.stop()
-        self.kafka.stop()
-
-    def maybe_upgrade_rolling_bounce_and_verify(self,
-                                                processors,
-                                                previous_phase,
-                                                current_phase,
-                                                upgrade_from_version=None):
-        for processor in processors:
-            # stop the processor in prep for setting "update.from" or removing 
"update.from"
-            verify_stopped(processor, previous_phase + self.stopped_message)
-            # upgrade to version with cooperative rebalance
-            processor.set_version("")
-            processor.set_upgrade_phase(current_phase)
-
-            if upgrade_from_version is not None:
-                # need to remove minor version numbers for check of valid 
upgrade from numbers
-                upgrade_version = 
upgrade_from_version[:upgrade_from_version.rfind('.')]
-                rebalance_mode_msg = self.cooperative_turned_off_msg % 
upgrade_version
-            else:
-                upgrade_version = None
-                rebalance_mode_msg = self.cooperative_enabled_msg
-
-            self.set_props(processor, upgrade_version)
-            node = processor.node
-            with node.account.monitor_log(processor.STDOUT_FILE) as 
stdout_monitor:
-                with node.account.monitor_log(processor.LOG_FILE) as 
log_monitor:
-                    processor.start()
-                    # verify correct rebalance mode either turned off for 
upgrade or enabled after upgrade
-                    log_monitor.wait_until(rebalance_mode_msg,
-                                           timeout_sec=60,
-                                           err_msg="Never saw '%s' message " % 
rebalance_mode_msg + str(processor.node.account))
-
-                # verify rebalanced into a running state
-                rebalance_msg = current_phase + self.running_state_msg
-                stdout_monitor.wait_until(rebalance_msg,
-                                          timeout_sec=60,
-                                          err_msg="Never saw '%s' message " % 
rebalance_msg + str(
-                                              processor.node.account))
-
-                # verify processing
-                verify_processing_msg = current_phase + self.processing_message
-                stdout_monitor.wait_until(verify_processing_msg,
-                                          timeout_sec=60,
-                                          err_msg="Never saw '%s' message " % 
verify_processing_msg + str(
-                                              processor.node.account))
-
-    def verify_processing(self, processor, pattern):
-        self.logger.info("Verifying %s processing pattern in STDOUT_FILE" % 
pattern)
-        with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
monitor:
-            monitor.wait_until(pattern,
-                               timeout_sec=60,
-                               err_msg="Never saw processing of %s " % pattern 
+ str(processor.node.account))
-
-    def get_tasks_for_processor(self, processor):
-        retries = 0
-        while retries < 5:
-            found_tasks = list(processor.node.account.ssh_capture("grep 
TASK-ASSIGNMENTS %s | tail -n 1" % processor.STDOUT_FILE, allow_fail=True))
-            self.logger.info("Returned %s from assigned task check" % 
found_tasks)
-            if len(found_tasks) > 0:
-                task_string = str(found_tasks[0]).strip()
-                self.logger.info("Converted %s from assigned task check" % 
task_string)
-                processor.set_tasks(task_string)
-                return
-            retries += 1
-            time.sleep(1)
-        return
-
-    def set_props(self, processor, upgrade_from=None):
-        processor.SOURCE_TOPIC = self.source_topic
-        processor.SINK_TOPIC = self.sink_topic
-        processor.REPORT_INTERVAL = self.report_interval
-        processor.UPGRADE_FROM = upgrade_from
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py 
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 10ec025f194..f44680f2b43 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -33,8 +33,7 @@ broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0), 
str(LATEST_3_1), st
                            str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5), 
str(LATEST_3_6),
                            str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), 
str(DEV_BRANCH)]
 
-metadata_2_versions = [str(LATEST_0_11), str(LATEST_1_0), str(LATEST_1_1), 
str(LATEST_2_0),
-                       str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8),
+metadata_2_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8),
                        str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2), 
str(LATEST_3_3)]
 # upgrading from version (2.4...3.3) is broken and only fixed later in 3.3.3 
(unreleased) and 3.4.0
 # -> https://issues.apache.org/jira/browse/KAFKA-14646


Reply via email to