This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ad2845f7156 KAFKA-8080: Remove streams_eos_test system test (#21030)
ad2845f7156 is described below

commit ad2845f71567aa182c6b0026774cd539e559cb85
Author: Jinhe Zhang <[email protected]>
AuthorDate: Wed Dec 3 20:30:54 2025 -0500

    KAFKA-8080: Remove streams_eos_test system test (#21030)
    
    After https://github.com/apache/kafka/pull/6382, the system test
    streams_eos_test.py is redundant. As in
    https://github.com/apache/kafka/pull/20718, the verification logic has
    already been migrated, so we only need to delete the related system
    tests
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../apache/kafka/streams/tests/EosTestClient.java  | 225 -------
 .../apache/kafka/streams/tests/EosTestDriver.java  | 678 ---------------------
 .../apache/kafka/streams/tests/StreamsEosTest.java |  91 ---
 tests/kafkatest/services/streams.py                |  51 --
 tests/kafkatest/tests/streams/streams_eos_test.py  | 183 ------
 6 files changed, 1 insertion(+), 1229 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 40b50d2b5ad..c09526fbb91 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -250,7 +250,7 @@
               
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|StreamThreadTest|TaskManagerTest|StreamTaskTest).java"/>
 
     <suppress checks="NPathComplexity"
-              
files="(EosV2UpgradeIntegrationTest|EosTestDriver|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest|IQv2StoreIntegrationTest).java"/>
+              
files="(EosV2UpgradeIntegrationTest|KStreamKStreamJoinTest|KTableKTableForeignKeyJoinIntegrationTest|RelationalSmokeTest|MockProcessorContextStateStoreTest|TopologyTestDriverTest|IQv2StoreIntegrationTest).java"/>
 
     <suppress 
checks="(FinalLocalVariable|WhitespaceAround|LocalVariableName|ImportControl|AvoidStarImport)"
               files="Murmur3Test.java"/>
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
deleted file mode 100644
index 80a46b3437e..00000000000
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
+++ /dev/null
@@ -1,225 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.clients.consumer.ConsumerGroupMetadata;
-import org.apache.kafka.clients.consumer.internals.AsyncKafkaConsumer;
-import org.apache.kafka.clients.consumer.internals.StreamsRebalanceData;
-import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.streams.KafkaStreams;
-import org.apache.kafka.streams.StreamsBuilder;
-import org.apache.kafka.streams.StreamsConfig;
-import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
-import org.apache.kafka.streams.internals.ConsumerWrapper;
-import org.apache.kafka.streams.kstream.KGroupedStream;
-import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Materialized;
-import org.apache.kafka.streams.kstream.Produced;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-public class EosTestClient extends SmokeTestUtil {
-
-    static final String APP_ID = "EosTest";
-    private final Properties properties;
-    private final boolean withRepartitioning;
-    private final AtomicBoolean notRunningCallbackReceived = new 
AtomicBoolean(false);
-    private static final List<CapturingConsumerWrapper> 
CAPTURING_CONSUMER_WRAPPERS = new ArrayList<>();
-    private int minGroupEpoch = 0;
-
-    private KafkaStreams streams;
-    private boolean uncaughtException;
-
-    EosTestClient(final Properties properties, final boolean 
withRepartitioning) {
-        super();
-        this.properties = properties;
-        this.withRepartitioning = withRepartitioning;
-        
this.properties.put(StreamsConfig.InternalConfig.INTERNAL_CONSUMER_WRAPPER, 
CapturingConsumerWrapper.class);
-        CAPTURING_CONSUMER_WRAPPERS.clear();
-    }
-
-    private volatile boolean isRunning = true;
-
-    public void start() {
-        Exit.addShutdownHook("streams-shutdown-hook", () -> {
-            isRunning = false;
-            streams.close(Duration.ofSeconds(300));
-
-            // need to wait for callback to avoid race condition
-            // -> make sure the callback printout to stdout is there as it is 
expected test output
-            waitForStateTransitionCallback();
-
-            // do not remove these printouts since they are needed for health 
scripts
-            if (!uncaughtException) {
-                System.out.println(System.currentTimeMillis());
-                System.out.println("EOS-TEST-CLIENT-CLOSED");
-                System.out.flush();
-            }
-        });
-
-        while (isRunning) {
-            if (streams == null) {
-                uncaughtException = false;
-
-                streams = createKafkaStreams(properties);
-                streams.setUncaughtExceptionHandler(e -> {
-                    System.out.println(System.currentTimeMillis());
-                    System.out.println("EOS-TEST-CLIENT-EXCEPTION");
-                    e.printStackTrace();
-                    System.out.flush();
-                    uncaughtException = true;
-                    return 
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
-                });
-                streams.setStateListener((newState, oldState) -> {
-                    // don't remove this -- it's required test output
-                    System.out.println(System.currentTimeMillis());
-                    System.out.println("StateChange: " + oldState + " -> " + 
newState);
-                    System.out.flush();
-                    if (newState == KafkaStreams.State.NOT_RUNNING) {
-                        notRunningCallbackReceived.set(true);
-                    }
-                });
-                streams.start();
-            }
-            if (uncaughtException) {
-                streams.close(Duration.ofSeconds(60_000L));
-                streams = null;
-            }
-            logGroupEpochBump();
-            sleep(100);
-        }
-    }
-
-    private KafkaStreams createKafkaStreams(final Properties props) {
-        props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
-        props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
-        props.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 2);
-        props.put(StreamsConfig.PROBING_REBALANCE_INTERVAL_MS_CONFIG, 
Duration.ofMinutes(1).toMillis());
-        props.put(StreamsConfig.MAX_WARMUP_REPLICAS_CONFIG, Integer.MAX_VALUE);
-        props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
-        props.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
-        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 5000L); // increase 
commit interval to make sure a client is killed having an open transaction
-        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
-        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
-
-        final StreamsBuilder builder = new StreamsBuilder();
-        final KStream<String, Integer> data = builder.stream("data");
-
-        data.to("echo");
-        data.process(SmokeTestUtil.printProcessorSupplier("data"));
-
-        final KGroupedStream<String, Integer> groupedData = data.groupByKey();
-        // min
-        groupedData
-            .aggregate(
-                () -> Integer.MAX_VALUE,
-                (aggKey, value, aggregate) -> (value < aggregate) ? value : 
aggregate,
-                Materialized.with(null, intSerde))
-            .toStream()
-            .to("min", Produced.with(stringSerde, intSerde));
-
-        // sum
-        groupedData.aggregate(
-            () -> 0L,
-            (aggKey, value, aggregate) -> (long) value + aggregate,
-            Materialized.with(null, longSerde))
-            .toStream()
-            .to("sum", Produced.with(stringSerde, longSerde));
-
-        if (withRepartitioning) {
-            data.to("repartition");
-            final KStream<String, Integer> repartitionedData = 
builder.stream("repartition");
-
-            
repartitionedData.process(SmokeTestUtil.printProcessorSupplier("repartition"));
-
-            final KGroupedStream<String, Integer> 
groupedDataAfterRepartitioning = repartitionedData.groupByKey();
-            // max
-            groupedDataAfterRepartitioning
-                .aggregate(
-                    () -> Integer.MIN_VALUE,
-                    (aggKey, value, aggregate) -> (value > aggregate) ? value 
: aggregate,
-                    Materialized.with(null, intSerde))
-                .toStream()
-                .to("max", Produced.with(stringSerde, intSerde));
-
-            // count
-            groupedDataAfterRepartitioning.count()
-                .toStream()
-                .to("cnt", Produced.with(stringSerde, longSerde));
-        }
-
-        return new KafkaStreams(builder.build(), props);
-    }
-
-    private void waitForStateTransitionCallback() {
-        final long maxWaitTime = System.currentTimeMillis() + 
TimeUnit.SECONDS.toMillis(300);
-        while (!notRunningCallbackReceived.get() && System.currentTimeMillis() 
< maxWaitTime) {
-            try {
-                Thread.sleep(500);
-            } catch (final InterruptedException ignoreAndSwallow) { /* just 
keep waiting */ }
-        }
-        if (!notRunningCallbackReceived.get()) {
-            System.err.println("State transition callback to NOT_RUNNING never 
received. Timed out after 5 minutes.");
-            System.err.flush();
-        }
-    }
-
-    // Used in the streams group protocol
-    // Detect a completed rebalance by checking if the group epoch has been 
bumped for all threads.
-    private void logGroupEpochBump() {
-        int currentMin = Integer.MAX_VALUE;
-        for (final CapturingConsumerWrapper consumer : 
CAPTURING_CONSUMER_WRAPPERS) {
-            final int groupEpoch = consumer.lastSeenGroupEpoch;
-            if (groupEpoch < currentMin) {
-                currentMin = groupEpoch;
-            }
-        }
-        if (currentMin > minGroupEpoch) {
-            System.out.println("MemberEpochBump");
-        }
-        if (currentMin != Integer.MAX_VALUE) {
-            minGroupEpoch = currentMin;
-        }
-    }
-
-    public static class CapturingConsumerWrapper extends ConsumerWrapper {
-
-        public volatile int lastSeenGroupEpoch = 0;
-
-        @Override
-        public void wrapConsumer(final AsyncKafkaConsumer<byte[], byte[]> 
delegate, final Map<String, Object> config, final 
Optional<StreamsRebalanceData> streamsRebalanceData) {
-            CAPTURING_CONSUMER_WRAPPERS.add(this);
-            super.wrapConsumer(delegate, config, streamsRebalanceData);
-        }
-
-        @Override
-        public ConsumerGroupMetadata groupMetadata() {
-            final ConsumerGroupMetadata consumerGroupMetadata = 
delegate.groupMetadata();
-            lastSeenGroupEpoch = consumerGroupMetadata.generationId();
-            return consumerGroupMetadata;
-        }
-    }
-
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
deleted file mode 100644
index 0815c49db07..00000000000
--- a/streams/src/test/java/org/apache/kafka/streams/tests/EosTestDriver.java
+++ /dev/null
@@ -1,678 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.clients.admin.Admin;
-import org.apache.kafka.clients.admin.ConsumerGroupDescription;
-import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
-import org.apache.kafka.clients.admin.StreamsGroupDescription;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.IsolationLevel;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.errors.TimeoutException;
-import org.apache.kafka.common.serialization.ByteArrayDeserializer;
-import org.apache.kafka.common.serialization.IntegerDeserializer;
-import org.apache.kafka.common.serialization.IntegerSerializer;
-import org.apache.kafka.common.serialization.LongDeserializer;
-import org.apache.kafka.common.serialization.StringDeserializer;
-import org.apache.kafka.common.serialization.StringSerializer;
-import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.Utils;
-
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
-public class EosTestDriver extends SmokeTestUtil {
-
-    private static final int MAX_NUMBER_OF_KEYS = 20000;
-    private static final long MAX_IDLE_TIME_MS = 600000L;
-
-    private static volatile boolean isRunning = true;
-    private static final CountDownLatch TERMINATED = new CountDownLatch(1);
-
-    private static int numRecordsProduced = 0;
-
-    private static synchronized void updateNumRecordsProduces(final int delta) 
{
-        numRecordsProduced += delta;
-    }
-
-    static void generate(final String kafka) {
-        Exit.addShutdownHook("streams-eos-test-driver-shutdown-hook", () -> {
-            System.out.println("Terminating");
-            isRunning = false;
-
-            try {
-                if (TERMINATED.await(5L, TimeUnit.MINUTES)) {
-                    System.out.println("Terminated");
-                } else {
-                    System.out.println("Terminated with timeout");
-                }
-            } catch (final InterruptedException swallow) {
-                swallow.printStackTrace(System.err);
-                System.out.println("Terminated with error");
-            }
-            System.err.flush();
-            System.out.flush();
-        });
-
-        final Properties producerProps = new Properties();
-        producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "EosTest");
-        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class);
-        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
IntegerSerializer.class);
-        producerProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
-
-        final Map<Integer, List<Long>> offsets = new HashMap<>();
-
-        try {
-            try (final KafkaProducer<String, Integer> producer = new 
KafkaProducer<>(producerProps)) {
-                final Random rand = new Random(System.currentTimeMillis());
-
-                while (isRunning) {
-                    final String key = "" + rand.nextInt(MAX_NUMBER_OF_KEYS);
-                    final int value = rand.nextInt(10000);
-
-                    final ProducerRecord<String, Integer> record = new 
ProducerRecord<>("data", key, value);
-
-                    producer.send(record, (metadata, exception) -> {
-                        if (exception != null) {
-                            exception.printStackTrace(System.err);
-                            System.err.flush();
-                            if (exception instanceof TimeoutException) {
-                                try {
-                                    // message == 
org.apache.kafka.common.errors.TimeoutException: Expiring 4 record(s) for 
data-0: 30004 ms has passed since last attempt plus backoff time
-                                    final int expired = 
Integer.parseInt(exception.getMessage().split(" ")[2]);
-                                    updateNumRecordsProduces(-expired);
-                                } catch (final Exception ignore) {
-                                }
-                            }
-                        } else {
-                            offsets.getOrDefault(metadata.partition(), new 
LinkedList<>()).add(metadata.offset());
-                        }
-                    });
-
-                    updateNumRecordsProduces(1);
-                    if (numRecordsProduced % 1000 == 0) {
-                        System.out.println(numRecordsProduced + " records 
produced");
-                        System.out.flush();
-                    }
-                    Utils.sleep(rand.nextInt(10));
-                }
-            }
-            System.out.println("Producer closed: " + numRecordsProduced + " 
records produced");
-            System.out.flush();
-
-            // verify offsets
-            for (final Map.Entry<Integer, List<Long>> offsetsOfPartition : 
offsets.entrySet()) {
-                offsetsOfPartition.getValue().sort(Long::compareTo);
-                for (int i = 0; i < offsetsOfPartition.getValue().size() - 1; 
++i) {
-                    if (offsetsOfPartition.getValue().get(i) != i) {
-                        System.err.println("Offset for partition " + 
offsetsOfPartition.getKey() + " is not " + i + " as expected but " + 
offsetsOfPartition.getValue().get(i));
-                        System.err.flush();
-                    }
-                }
-                System.out.println("Max offset of partition " + 
offsetsOfPartition.getKey() + " is " + 
offsetsOfPartition.getValue().get(offsetsOfPartition.getValue().size() - 1));
-            }
-
-            final Properties props = new Properties();
-            props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
-            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
-            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
-            props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString());
-
-            try (final KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(props)) {
-                final List<TopicPartition> partitions = 
getAllPartitions(consumer, "data");
-                System.out.println("Partitions: " + partitions);
-                System.out.flush();
-                consumer.assign(partitions);
-                consumer.seekToEnd(partitions);
-
-                for (final TopicPartition tp : partitions) {
-                    System.out.println("End-offset for " + tp + " is " + 
consumer.position(tp));
-                    System.out.flush();
-                }
-            }
-            System.out.flush();
-        } finally {
-            TERMINATED.countDown();
-        }
-    }
-
-    public static void verify(final String kafka, final boolean 
withRepartitioning, final String groupProtocol) {
-        final Properties props = new Properties();
-        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
-        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
-        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
-        props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, 
IsolationLevel.READ_COMMITTED.toString());
-
-        try (final KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(props)) {
-            verifyAllTransactionFinished(consumer, kafka, withRepartitioning);
-        } catch (final Exception e) {
-            e.printStackTrace(System.err);
-            System.out.println("FAILED");
-            return;
-        }
-
-        final Map<TopicPartition, Long> committedOffsets;
-        try (final Admin adminClient = Admin.create(props)) {
-            ensureStreamsApplicationDown(adminClient, groupProtocol);
-
-            committedOffsets = getCommittedOffsets(adminClient, 
withRepartitioning);
-        }
-
-        final String[] allInputTopics;
-        final String[] allOutputTopics;
-        if (withRepartitioning) {
-            allInputTopics = new String[] {"data", "repartition"};
-            allOutputTopics = new String[] {"echo", "min", "sum", 
"repartition", "max", "cnt"};
-        } else {
-            allInputTopics = new String[] {"data"};
-            allOutputTopics = new String[] {"echo", "min", "sum"};
-        }
-
-        final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], 
byte[]>>>> inputRecordsPerTopicPerPartition;
-        try (final KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(props)) {
-            final List<TopicPartition> partitions = getAllPartitions(consumer, 
allInputTopics);
-            consumer.assign(partitions);
-            consumer.seekToBeginning(partitions);
-
-            inputRecordsPerTopicPerPartition = getRecords(consumer, 
committedOffsets, withRepartitioning, true);
-        } catch (final Exception e) {
-            e.printStackTrace(System.err);
-            System.out.println("FAILED");
-            return;
-        }
-
-        final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], 
byte[]>>>> outputRecordsPerTopicPerPartition;
-        try (final KafkaConsumer<byte[], byte[]> consumer = new 
KafkaConsumer<>(props)) {
-            final List<TopicPartition> partitions = getAllPartitions(consumer, 
allOutputTopics);
-            consumer.assign(partitions);
-            consumer.seekToBeginning(partitions);
-
-            outputRecordsPerTopicPerPartition = getRecords(consumer, 
consumer.endOffsets(partitions), withRepartitioning, false);
-        } catch (final Exception e) {
-            e.printStackTrace(System.err);
-            System.out.println("FAILED");
-            return;
-        }
-
-        verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), 
outputRecordsPerTopicPerPartition.get("echo"));
-        if (withRepartitioning) {
-            
verifyReceivedAllRecords(inputRecordsPerTopicPerPartition.get("data"), 
outputRecordsPerTopicPerPartition.get("repartition"));
-        }
-
-        verifyMin(inputRecordsPerTopicPerPartition.get("data"), 
outputRecordsPerTopicPerPartition.get("min"));
-        verifySum(inputRecordsPerTopicPerPartition.get("data"), 
outputRecordsPerTopicPerPartition.get("sum"));
-
-        if (withRepartitioning) {
-            verifyMax(inputRecordsPerTopicPerPartition.get("repartition"), 
outputRecordsPerTopicPerPartition.get("max"));
-            verifyCnt(inputRecordsPerTopicPerPartition.get("repartition"), 
outputRecordsPerTopicPerPartition.get("cnt"));
-        }
-
-        // do not modify: required test output
-        System.out.println("ALL-RECORDS-DELIVERED");
-        System.out.flush();
-    }
-
-    private static void ensureStreamsApplicationDown(final Admin adminClient, 
final String groupProtocol) {
-        final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
-        boolean isEmpty;
-        do {
-            if (Objects.equals(groupProtocol, "streams")) {
-                final StreamsGroupDescription description = 
getStreamsGroupDescription(adminClient);
-                isEmpty = description.members().isEmpty();
-                if (System.currentTimeMillis() > maxWaitTime && !isEmpty) {
-                    throwNotDownException(description);
-                }
-            } else {
-                final ConsumerGroupDescription description = 
getConsumerGroupDescription(adminClient);
-                isEmpty = description.members().isEmpty();
-                if (System.currentTimeMillis() > maxWaitTime && !isEmpty) {
-                    throwNotDownException(description);
-                }
-            }
-            sleep(1000L);
-        } while (!isEmpty);
-    }
-
-    private static void throwNotDownException(final Object description) {
-        throw new RuntimeException(
-            "Streams application not down after " + MAX_IDLE_TIME_MS / 1000L + 
" seconds. " +
-                "Group: " + description
-        );
-    }
-
-    private static Map<TopicPartition, Long> getCommittedOffsets(final Admin 
adminClient,
-                                                                 final boolean 
withRepartitioning) {
-        final Map<TopicPartition, OffsetAndMetadata> 
topicPartitionOffsetAndMetadataMap;
-
-        try {
-            final ListConsumerGroupOffsetsResult 
listConsumerGroupOffsetsResult = 
adminClient.listConsumerGroupOffsets(EosTestClient.APP_ID);
-            topicPartitionOffsetAndMetadataMap = 
listConsumerGroupOffsetsResult.partitionsToOffsetAndMetadata().get(10, 
TimeUnit.SECONDS);
-        } catch (final Exception e) {
-            e.printStackTrace();
-            throw new RuntimeException(e);
-        }
-
-        final Map<TopicPartition, Long> committedOffsets = new HashMap<>();
-
-        for (final Map.Entry<TopicPartition, OffsetAndMetadata> entry : 
topicPartitionOffsetAndMetadataMap.entrySet()) {
-            final String topic = entry.getKey().topic();
-            if (topic.equals("data") || withRepartitioning && 
topic.equals("repartition")) {
-                committedOffsets.put(entry.getKey(), 
entry.getValue().offset());
-            }
-        }
-
-        return committedOffsets;
-    }
-
-    private static Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], 
byte[]>>>> getRecords(final KafkaConsumer<byte[], byte[]> consumer,
-                                                                               
                      final Map<TopicPartition, Long> readEndOffsets,
-                                                                               
                      final boolean withRepartitioning,
-                                                                               
                      final boolean isInputTopic) {
-        System.out.println("read end offset: " + readEndOffsets);
-        final Map<String, Map<TopicPartition, List<ConsumerRecord<byte[], 
byte[]>>>> recordPerTopicPerPartition = new HashMap<>();
-        final Map<TopicPartition, Long> maxReceivedOffsetPerPartition = new 
HashMap<>();
-        final Map<TopicPartition, Long> maxConsumerPositionPerPartition = new 
HashMap<>();
-
-        long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
-        boolean allRecordsReceived = false;
-        while (!allRecordsReceived && System.currentTimeMillis() < 
maxWaitTime) {
-            final ConsumerRecords<byte[], byte[]> receivedRecords = 
consumer.poll(Duration.ofSeconds(1L));
-
-            for (final ConsumerRecord<byte[], byte[]> record : 
receivedRecords) {
-                maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
-                final TopicPartition tp = new TopicPartition(record.topic(), 
record.partition());
-                maxReceivedOffsetPerPartition.put(tp, record.offset());
-                final long readEndOffset = readEndOffsets.get(tp);
-                if (record.offset() < readEndOffset) {
-                    addRecord(record, recordPerTopicPerPartition, 
withRepartitioning);
-                } else if (!isInputTopic) {
-                    throw new RuntimeException("FAIL: did receive more records 
than expected for " + tp
-                        + " (expected EOL offset: " + readEndOffset + "; 
current offset: " + record.offset());
-                }
-            }
-
-            for (final TopicPartition tp : readEndOffsets.keySet()) {
-                maxConsumerPositionPerPartition.put(tp, consumer.position(tp));
-                if (consumer.position(tp) >= readEndOffsets.get(tp)) {
-                    consumer.pause(Collections.singletonList(tp));
-                }
-            }
-
-            allRecordsReceived = consumer.paused().size() == 
readEndOffsets.keySet().size();
-        }
-
-        if (!allRecordsReceived) {
-            System.err.println("Pause partitions (ie, received all data): " + 
consumer.paused());
-            System.err.println("Max received offset per partition: " + 
maxReceivedOffsetPerPartition);
-            System.err.println("Max consumer position per partition: " + 
maxConsumerPositionPerPartition);
-            throw new RuntimeException("FAIL: did not receive all records 
after " + (MAX_IDLE_TIME_MS / 1000L) + " sec idle time.");
-        }
-
-        return recordPerTopicPerPartition;
-    }
-
-    private static void addRecord(final ConsumerRecord<byte[], byte[]> record,
-                                  final Map<String, Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>>> recordPerTopicPerPartition,
-                                  final boolean withRepartitioning) {
-
-        final String topic = record.topic();
-        final TopicPartition partition = new TopicPartition(topic, 
record.partition());
-
-        if (verifyTopic(topic, withRepartitioning)) {
-            final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> 
topicRecordsPerPartition =
-                recordPerTopicPerPartition.computeIfAbsent(topic, k -> new 
HashMap<>());
-
-            final List<ConsumerRecord<byte[], byte[]>> records =
-                topicRecordsPerPartition.computeIfAbsent(partition, k -> new 
ArrayList<>());
-
-            records.add(record);
-        } else {
-            throw new RuntimeException("FAIL: received data from unexpected 
topic: " + record);
-        }
-    }
-
-    private static boolean verifyTopic(final String topic,
-                                       final boolean withRepartitioning) {
-        final boolean validTopic = "data".equals(topic) || 
"echo".equals(topic) || "min".equals(topic) || "sum".equals(topic);
-
-        if (withRepartitioning) {
-            return validTopic || "repartition".equals(topic) || 
"max".equals(topic) || "cnt".equals(topic);
-        }
-
-        return validTopic;
-    }
-
-    private static void verifyReceivedAllRecords(final Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>> expectedRecords,
-                                                 final Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>> receivedRecords) {
-        if (expectedRecords.size() != receivedRecords.size()) {
-            throw new RuntimeException("Result verification failed. Received " 
+ receivedRecords.size() + " records but expected " + expectedRecords.size());
-        }
-
-        final StringDeserializer stringDeserializer = new StringDeserializer();
-        final IntegerDeserializer integerDeserializer = new 
IntegerDeserializer();
-        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], 
byte[]>>> partitionRecords : receivedRecords.entrySet()) {
-            final TopicPartition inputTopicPartition = new 
TopicPartition("data", partitionRecords.getKey().partition());
-            final List<ConsumerRecord<byte[], byte[]>> 
receivedRecordsForPartition = partitionRecords.getValue();
-            final List<ConsumerRecord<byte[], byte[]>> 
expectedRecordsForPartition = expectedRecords.get(inputTopicPartition);
-
-            System.out.println(partitionRecords.getKey() + " with " + 
receivedRecordsForPartition.size() + ", " +
-                    inputTopicPartition + " with " + 
expectedRecordsForPartition.size());
-
-            final Iterator<ConsumerRecord<byte[], byte[]>> expectedRecord = 
expectedRecordsForPartition.iterator();
-            RuntimeException exception = null;
-            for (final ConsumerRecord<byte[], byte[]> receivedRecord : 
receivedRecordsForPartition) {
-                if (!expectedRecord.hasNext()) {
-                    exception = new RuntimeException("Result verification 
failed for " + receivedRecord + " since there's no more expected record");
-                }
-
-                final ConsumerRecord<byte[], byte[]> expected = 
expectedRecord.next();
-
-                final String receivedKey = 
stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
-                final int receivedValue = 
integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
-                final String expectedKey = 
stringDeserializer.deserialize(expected.topic(), expected.key());
-                final int expectedValue = 
integerDeserializer.deserialize(expected.topic(), expected.value());
-
-                if (!receivedKey.equals(expectedKey) || receivedValue != 
expectedValue) {
-                    exception = new RuntimeException("Result verification 
failed for " + receivedRecord + " expected <" + expectedKey + "," + 
expectedValue + "> but was <" + receivedKey + "," + receivedValue + ">");
-                }
-            }
-
-            if (exception != null) {
-                throw exception;
-            }
-        }
-    }
-
-    private static void verifyMin(final Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
-                                  final Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
-        final StringDeserializer stringDeserializer = new StringDeserializer();
-        final IntegerDeserializer integerDeserializer = new 
IntegerDeserializer();
-
-        final HashMap<String, Integer> currentMinPerKey = new HashMap<>();
-        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], 
byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
-            final TopicPartition inputTopicPartition = new 
TopicPartition("data", partitionRecords.getKey().partition());
-            final List<ConsumerRecord<byte[], byte[]>> partitionInput = 
inputPerTopicPerPartition.get(inputTopicPartition);
-            final List<ConsumerRecord<byte[], byte[]>> partitionMin = 
partitionRecords.getValue();
-
-            if (partitionInput.size() != partitionMin.size()) {
-                throw new RuntimeException("Result verification failed: 
expected " + partitionInput.size() + " records for "
-                    + partitionRecords.getKey() + " but received " + 
partitionMin.size());
-            }
-
-            final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = 
partitionInput.iterator();
-
-            for (final ConsumerRecord<byte[], byte[]> receivedRecord : 
partitionMin) {
-                final ConsumerRecord<byte[], byte[]> input = 
inputRecords.next();
-
-                final String receivedKey = 
stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
-                final int receivedValue = 
integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
-                final String key = 
stringDeserializer.deserialize(input.topic(), input.key());
-                final int value = 
integerDeserializer.deserialize(input.topic(), input.value());
-
-                Integer min = currentMinPerKey.get(key);
-                if (min == null) {
-                    min = value;
-                } else {
-                    min = Math.min(min, value);
-                }
-                currentMinPerKey.put(key, min);
-
-                if (!receivedKey.equals(key) || receivedValue != min) {
-                    throw new RuntimeException("Result verification failed for 
" + receivedRecord + " expected <" + key + "," + min + "> but was <" + 
receivedKey + "," + receivedValue + ">");
-                }
-            }
-        }
-    }
-
-    private static void verifySum(final Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
-                                  final Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>> minPerTopicPerPartition) {
-        final StringDeserializer stringDeserializer = new StringDeserializer();
-        final IntegerDeserializer integerDeserializer = new 
IntegerDeserializer();
-        final LongDeserializer longDeserializer = new LongDeserializer();
-
-        final HashMap<String, Long> currentSumPerKey = new HashMap<>();
-        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], 
byte[]>>> partitionRecords : minPerTopicPerPartition.entrySet()) {
-            final TopicPartition inputTopicPartition = new 
TopicPartition("data", partitionRecords.getKey().partition());
-            final List<ConsumerRecord<byte[], byte[]>> partitionInput = 
inputPerTopicPerPartition.get(inputTopicPartition);
-            final List<ConsumerRecord<byte[], byte[]>> partitionSum = 
partitionRecords.getValue();
-
-            if (partitionInput.size() != partitionSum.size()) {
-                throw new RuntimeException("Result verification failed: 
expected " + partitionInput.size() + " records for "
-                    + partitionRecords.getKey() + " but received " + 
partitionSum.size());
-            }
-
-            final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = 
partitionInput.iterator();
-
-            for (final ConsumerRecord<byte[], byte[]> receivedRecord : 
partitionSum) {
-                final ConsumerRecord<byte[], byte[]> input = 
inputRecords.next();
-
-                final String receivedKey = 
stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
-                final long receivedValue = 
longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
-                final String key = 
stringDeserializer.deserialize(input.topic(), input.key());
-                final int value = 
integerDeserializer.deserialize(input.topic(), input.value());
-
-                Long sum = currentSumPerKey.get(key);
-                if (sum == null) {
-                    sum = (long) value;
-                } else {
-                    sum += value;
-                }
-                currentSumPerKey.put(key, sum);
-
-                if (!receivedKey.equals(key) || receivedValue != sum) {
-                    throw new RuntimeException("Result verification failed for 
" + receivedRecord + " expected <" + key + "," + sum + "> but was <" + 
receivedKey + "," + receivedValue + ">");
-                }
-            }
-        }
-    }
-
-    private static void verifyMax(final Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
-                                  final Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>> maxPerTopicPerPartition) {
-        final StringDeserializer stringDeserializer = new StringDeserializer();
-        final IntegerDeserializer integerDeserializer = new 
IntegerDeserializer();
-
-        final HashMap<String, Integer> currentMinPerKey = new HashMap<>();
-        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], 
byte[]>>> partitionRecords : maxPerTopicPerPartition.entrySet()) {
-            final TopicPartition inputTopicPartition = new 
TopicPartition("repartition", partitionRecords.getKey().partition());
-            final List<ConsumerRecord<byte[], byte[]>> partitionInput = 
inputPerTopicPerPartition.get(inputTopicPartition);
-            final List<ConsumerRecord<byte[], byte[]>> partitionMax = 
partitionRecords.getValue();
-
-            if (partitionInput.size() != partitionMax.size()) {
-                throw new RuntimeException("Result verification failed: 
expected " + partitionInput.size() + " records for "
-                    + partitionRecords.getKey() + " but received " + 
partitionMax.size());
-            }
-
-            final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = 
partitionInput.iterator();
-
-            for (final ConsumerRecord<byte[], byte[]> receivedRecord : 
partitionMax) {
-                final ConsumerRecord<byte[], byte[]> input = 
inputRecords.next();
-
-                final String receivedKey = 
stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
-                final int receivedValue = 
integerDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
-                final String key = 
stringDeserializer.deserialize(input.topic(), input.key());
-                final int value = 
integerDeserializer.deserialize(input.topic(), input.value());
-
-
-                Integer max = currentMinPerKey.get(key);
-                if (max == null) {
-                    max = Integer.MIN_VALUE;
-                }
-                max = Math.max(max, value);
-                currentMinPerKey.put(key, max);
-
-                if (!receivedKey.equals(key) || receivedValue != max) {
-                    throw new RuntimeException("Result verification failed for 
" + receivedRecord + " expected <" + key + "," + max + "> but was <" + 
receivedKey + "," + receivedValue + ">");
-                }
-            }
-        }
-    }
-
-    private static void verifyCnt(final Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>> inputPerTopicPerPartition,
-                                  final Map<TopicPartition, 
List<ConsumerRecord<byte[], byte[]>>> cntPerTopicPerPartition) {
-        final StringDeserializer stringDeserializer = new StringDeserializer();
-        final LongDeserializer longDeserializer = new LongDeserializer();
-
-        final HashMap<String, Long> currentSumPerKey = new HashMap<>();
-        for (final Map.Entry<TopicPartition, List<ConsumerRecord<byte[], 
byte[]>>> partitionRecords : cntPerTopicPerPartition.entrySet()) {
-            final TopicPartition inputTopicPartition = new 
TopicPartition("repartition", partitionRecords.getKey().partition());
-            final List<ConsumerRecord<byte[], byte[]>> partitionInput = 
inputPerTopicPerPartition.get(inputTopicPartition);
-            final List<ConsumerRecord<byte[], byte[]>> partitionCnt = 
partitionRecords.getValue();
-
-            if (partitionInput.size() != partitionCnt.size()) {
-                throw new RuntimeException("Result verification failed: 
expected " + partitionInput.size() + " records for "
-                    + partitionRecords.getKey() + " but received " + 
partitionCnt.size());
-            }
-
-            final Iterator<ConsumerRecord<byte[], byte[]>> inputRecords = 
partitionInput.iterator();
-
-            for (final ConsumerRecord<byte[], byte[]> receivedRecord : 
partitionCnt) {
-                final ConsumerRecord<byte[], byte[]> input = 
inputRecords.next();
-
-                final String receivedKey = 
stringDeserializer.deserialize(receivedRecord.topic(), receivedRecord.key());
-                final long receivedValue = 
longDeserializer.deserialize(receivedRecord.topic(), receivedRecord.value());
-                final String key = 
stringDeserializer.deserialize(input.topic(), input.key());
-
-                Long cnt = currentSumPerKey.get(key);
-                if (cnt == null) {
-                    cnt = 0L;
-                }
-                currentSumPerKey.put(key, ++cnt);
-
-                if (!receivedKey.equals(key) || receivedValue != cnt) {
-                    throw new RuntimeException("Result verification failed for 
" + receivedRecord + " expected <" + key + "," + cnt + "> but was <" + 
receivedKey + "," + receivedValue + ">");
-                }
-            }
-        }
-    }
-
-    private static void verifyAllTransactionFinished(final 
KafkaConsumer<byte[], byte[]> consumer,
-                                                     final String kafka,
-                                                     final boolean 
withRepartitioning) {
-        final String[] topics;
-        if (withRepartitioning) {
-            topics = new String[] {"echo", "min", "sum", "repartition", "max", 
"cnt"};
-        } else {
-            topics = new String[] {"echo", "min", "sum"};
-        }
-
-        final List<TopicPartition> partitions = getAllPartitions(consumer, 
topics);
-        consumer.assign(partitions);
-        consumer.seekToEnd(partitions);
-        for (final TopicPartition tp : partitions) {
-            System.out.println(tp + " at position " + consumer.position(tp));
-        }
-
-        final Properties consumerProps = new Properties();
-        consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, 
"consumer-uncommitted");
-        consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
-        consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
-        consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
ByteArrayDeserializer.class);
-
-
-        final long maxWaitTime = System.currentTimeMillis() + MAX_IDLE_TIME_MS;
-        try (final KafkaConsumer<byte[], byte[]> consumerUncommitted = new 
KafkaConsumer<>(consumerProps)) {
-            while (!partitions.isEmpty() && System.currentTimeMillis() < 
maxWaitTime) {
-                consumer.seekToEnd(partitions);
-                final Map<TopicPartition, Long> topicEndOffsets = 
consumerUncommitted.endOffsets(partitions);
-
-                final Iterator<TopicPartition> iterator = 
partitions.iterator();
-                while (iterator.hasNext()) {
-                    final TopicPartition topicPartition = iterator.next();
-                    final long position = consumer.position(topicPartition);
-
-                    if (position == topicEndOffsets.get(topicPartition)) {
-                        iterator.remove();
-                        System.out.println("Removing " + topicPartition + " at 
position " + position);
-                    } else if (consumer.position(topicPartition) > 
topicEndOffsets.get(topicPartition)) {
-                        throw new IllegalStateException("Offset for partition 
" + topicPartition + " is larger than topic endOffset: " + position + " > " + 
topicEndOffsets.get(topicPartition));
-                    } else {
-                        System.out.println("Retry " + topicPartition + " at 
position " + position);
-                    }
-                }
-                sleep(1000L);
-            }
-        }
-
-        if (!partitions.isEmpty()) {
-            throw new RuntimeException("Could not read all verification 
records. Did not receive any new record within the last " + (MAX_IDLE_TIME_MS / 
1000L) + " sec.");
-        }
-    }
-
-    private static List<TopicPartition> getAllPartitions(final 
KafkaConsumer<?, ?> consumer,
-                                                         final String... 
topics) {
-        final ArrayList<TopicPartition> partitions = new ArrayList<>();
-
-        for (final String topic : topics) {
-            for (final PartitionInfo info : consumer.partitionsFor(topic)) {
-                partitions.add(new TopicPartition(info.topic(), 
info.partition()));
-            }
-        }
-        return partitions;
-    }
-
-    private static ConsumerGroupDescription getConsumerGroupDescription(final 
Admin adminClient) {
-        final ConsumerGroupDescription description;
-        try {
-            description = 
adminClient.describeConsumerGroups(Collections.singleton(EosTestClient.APP_ID))
-                .describedGroups()
-                .get(EosTestClient.APP_ID)
-                .get(10, TimeUnit.SECONDS);
-        } catch (final InterruptedException | ExecutionException | 
java.util.concurrent.TimeoutException e) {
-            e.printStackTrace();
-            throw new RuntimeException("Unexpected Exception getting group 
description", e);
-        }
-        return description;
-    }
-
-    private static StreamsGroupDescription getStreamsGroupDescription(final 
Admin adminClient) {
-        final StreamsGroupDescription description;
-        try {
-            description = 
adminClient.describeStreamsGroups(Collections.singleton(EosTestClient.APP_ID))
-                .describedGroups()
-                .get(EosTestClient.APP_ID)
-                .get(10, TimeUnit.SECONDS);
-        } catch (final InterruptedException | ExecutionException | 
java.util.concurrent.TimeoutException e) {
-            e.printStackTrace();
-            throw new RuntimeException("Unexpected Exception getting group 
description", e);
-        }
-        return description;
-    }
-}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java 
b/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
deleted file mode 100644
index 9bb57e286d3..00000000000
--- a/streams/src/test/java/org/apache/kafka/streams/tests/StreamsEosTest.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.tests;
-
-import org.apache.kafka.common.utils.Exit;
-import org.apache.kafka.common.utils.Utils;
-import org.apache.kafka.streams.StreamsConfig;
-
-import java.io.IOException;
-import java.util.Properties;
-
-public class StreamsEosTest {
-
-    /**
-     *  args ::= kafka propFileName command
-     *  command := "run" | "process" | "verify"
-     */
-    public static void main(final String[] args) throws IOException {
-        if (args.length < 2) {
-            System.err.println("StreamsEosTest are expecting two parameters: 
propFile, command; but only see " + args.length + " parameter");
-            Exit.exit(1);
-        }
-
-        final String propFileName = args[0];
-        final String command = args[1];
-
-        final Properties streamsProperties = Utils.loadProps(propFileName);
-        final String kafka = 
streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
-        final String processingGuarantee = 
streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
-
-        if (kafka == null) {
-            System.err.println("No bootstrap kafka servers specified in " + 
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
-            Exit.exit(1);
-        }
-
-        if ("process".equals(command) || "process-complex".equals(command)) {
-            if (!StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {
-
-                System.err.println("processingGuarantee must be " + 
StreamsConfig.EXACTLY_ONCE_V2);
-                Exit.exit(1);
-            }
-        }
-
-        System.out.println("StreamsTest instance started");
-        System.out.println("kafka=" + kafka);
-        System.out.println("props=" + streamsProperties);
-        System.out.println("command=" + command);
-        System.out.flush();
-
-        if (command == null || propFileName == null) {
-            Exit.exit(-1);
-        }
-
-        switch (command) {
-            case "run":
-                EosTestDriver.generate(kafka);
-                break;
-            case "process":
-                new EosTestClient(streamsProperties, false).start();
-                break;
-            case "process-complex":
-                new EosTestClient(streamsProperties, true).start();
-                break;
-            case "verify":
-                EosTestDriver.verify(kafka, false, 
streamsProperties.getProperty("group.protocol"));
-                break;
-            case "verify-complex":
-                EosTestDriver.verify(kafka, true, 
streamsProperties.getProperty("group.protocol"));
-                break;
-            default:
-                System.out.println("unknown command: " + command);
-                System.out.flush();
-                Exit.exit(-1);
-        }
-    }
-
-}
diff --git a/tests/kafkatest/services/streams.py 
b/tests/kafkatest/services/streams.py
index 3a35792a387..4b49017fc42 100644
--- a/tests/kafkatest/services/streams.py
+++ b/tests/kafkatest/services/streams.py
@@ -382,35 +382,6 @@ class StreamsSmokeTestBaseService(StreamsTestBaseService):
 
         return cmd
 
-class StreamsEosTestBaseService(StreamsTestBaseService):
-    """Base class for Streams EOS Test services providing some common settings 
and functionality"""
-
-    clean_node_enabled = True
-
-    def __init__(self, test_context, kafka, command, group_protocol):
-        super(StreamsEosTestBaseService, self).__init__(test_context,
-                                                        kafka,
-                                                        
"org.apache.kafka.streams.tests.StreamsEosTest",
-                                                        command)
-        self.group_protocol = group_protocol
-
-    def prop_file(self):
-        properties = {streams_property.STATE_DIR: self.PERSISTENT_ROOT,
-                      streams_property.KAFKA_SERVERS: 
self.kafka.bootstrap_servers(),
-                      streams_property.PROCESSING_GUARANTEE: "exactly_once_v2",
-                      "acceptable.recovery.lag": "9223372036854775807", # 
enable a one-shot assignment
-                      "session.timeout.ms": "10000", # set back to 10s for 
tests. See KIP-735
-                      "group.protocol": self.group_protocol
-                      }
-
-        cfg = KafkaConfig(**properties)
-        return cfg.render()
-
-    def clean_node(self, node):
-        if self.clean_node_enabled:
-            super(StreamsEosTestBaseService, self).clean_node(node)
-
-
 class StreamsSmokeTestDriverService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsSmokeTestDriverService, self).__init__(test_context, 
kafka, "run")
@@ -443,28 +414,6 @@ class 
StreamsSmokeTestJobRunnerService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka, processing_guarantee, 
group_protocol = 'classic', num_threads = 3, replication_factor = 3):
         super(StreamsSmokeTestJobRunnerService, self).__init__(test_context, 
kafka, "process", processing_guarantee, group_protocol, num_threads, 
replication_factor)
 
-class StreamsEosTestDriverService(StreamsEosTestBaseService):
-    def __init__(self, test_context, kafka):
-        super(StreamsEosTestDriverService, self).__init__(test_context, kafka, 
"run", "classic")
-
-class StreamsEosTestJobRunnerService(StreamsEosTestBaseService):
-    def __init__(self, test_context, kafka, group_protocol):
-        super(StreamsEosTestJobRunnerService, self).__init__(test_context, 
kafka, "process", group_protocol)
-
-class StreamsComplexEosTestJobRunnerService(StreamsEosTestBaseService):
-    def __init__(self, test_context, kafka, group_protocol):
-        super(StreamsComplexEosTestJobRunnerService, 
self).__init__(test_context, kafka, "process-complex", group_protocol)
-
-class StreamsEosTestVerifyRunnerService(StreamsEosTestBaseService):
-    def __init__(self, test_context, kafka, group_protocol):
-        super(StreamsEosTestVerifyRunnerService, self).__init__(test_context, 
kafka, "verify", group_protocol)
-
-
-class StreamsComplexEosTestVerifyRunnerService(StreamsEosTestBaseService):
-    def __init__(self, test_context, kafka, group_protocol):
-        super(StreamsComplexEosTestVerifyRunnerService, 
self).__init__(test_context, kafka, "verify-complex", group_protocol)
-
-
 class StreamsSmokeTestShutdownDeadlockService(StreamsSmokeTestBaseService):
     def __init__(self, test_context, kafka):
         super(StreamsSmokeTestShutdownDeadlockService, 
self).__init__(test_context, kafka, "close-deadlock-test")
diff --git a/tests/kafkatest/tests/streams/streams_eos_test.py 
b/tests/kafkatest/tests/streams/streams_eos_test.py
deleted file mode 100644
index fcab8adfe91..00000000000
--- a/tests/kafkatest/tests/streams/streams_eos_test.py
+++ /dev/null
@@ -1,183 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ducktape.mark import matrix
-from ducktape.mark.resource import cluster
-from kafkatest.services.kafka import quorum
-from kafkatest.services.streams import StreamsEosTestDriverService, 
StreamsEosTestJobRunnerService, \
-    StreamsComplexEosTestJobRunnerService, StreamsEosTestVerifyRunnerService, 
StreamsComplexEosTestVerifyRunnerService
-from kafkatest.tests.streams.base_streams_test import BaseStreamsTest
-
-class StreamsEosTest(BaseStreamsTest):
-    """
-    Test of Kafka Streams exactly-once semantics
-    """
-
-    def __init__(self, test_context):
-        super(StreamsEosTest, self).__init__(test_context, num_controllers=1, 
num_brokers=3, topics={
-            'data': {'partitions': 5, 'replication-factor': 2},
-            'echo': {'partitions': 5, 'replication-factor': 2},
-            'min': {'partitions': 5, 'replication-factor': 2},
-            'sum': {'partitions': 5, 'replication-factor': 2},
-            'repartition': {'partitions': 5, 'replication-factor': 2},
-            'max': {'partitions': 5, 'replication-factor': 2},
-            'cnt': {'partitions': 5, 'replication-factor': 2}
-        })
-        self.driver = StreamsEosTestDriverService(test_context, self.kafka)
-        self.test_context = test_context
-
-    @cluster(num_nodes=8)
-    @matrix(metadata_quorum=[quorum.combined_kraft],
-            group_protocol=["classic", "streams"])
-    def test_rebalance_simple(self, metadata_quorum, group_protocol):
-        self.group_protocol = group_protocol
-        self.run_rebalance(StreamsEosTestJobRunnerService(self.test_context, 
self.kafka, group_protocol),
-                           StreamsEosTestJobRunnerService(self.test_context, 
self.kafka, group_protocol),
-                           StreamsEosTestJobRunnerService(self.test_context, 
self.kafka, group_protocol),
-                           
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, 
group_protocol))
-    @cluster(num_nodes=8)
-    @matrix(metadata_quorum=[quorum.combined_kraft],
-            group_protocol=["classic", "streams"])
-    def test_rebalance_complex(self, metadata_quorum, group_protocol):
-        self.group_protocol = group_protocol
-        
self.run_rebalance(StreamsComplexEosTestJobRunnerService(self.test_context, 
self.kafka, group_protocol),
-                           
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, 
group_protocol),
-                           
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, 
group_protocol),
-                           
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, 
group_protocol))
-
-    def run_rebalance(self, processor1, processor2, processor3, verifier):
-        """
-        Starts and stops two test clients a few times.
-        Ensure that all records are delivered exactly-once.
-        """
-
-        self.driver.start()
-
-        self.add_streams(processor1)
-        processor1.clean_node_enabled = False
-        self.add_streams2(processor1, processor2)
-        self.add_streams3(processor1, processor2, processor3)
-        self.stop_streams3(processor2, processor3, processor1)
-        self.add_streams3(processor2, processor3, processor1)
-        self.stop_streams3(processor1, processor3, processor2)
-        self.stop_streams2(processor1, processor3)
-        self.stop_streams(processor1)
-        processor1.clean_node_enabled = True
-
-        self.driver.stop()
-
-        verifier.start()
-        verifier.wait()
-
-        verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % 
verifier.STDOUT_FILE, allow_fail=False)
-
-    @cluster(num_nodes=8)
-    @matrix(metadata_quorum=[quorum.combined_kraft],
-            group_protocol=["classic", "streams"])
-    def test_failure_and_recovery(self, metadata_quorum, group_protocol):
-        self.group_protocol = group_protocol
-        
self.run_failure_and_recovery(StreamsEosTestJobRunnerService(self.test_context, 
self.kafka, group_protocol),
-                                      
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
-                                      
StreamsEosTestJobRunnerService(self.test_context, self.kafka, group_protocol),
-                                      
StreamsEosTestVerifyRunnerService(self.test_context, self.kafka, 
group_protocol))
-    @cluster(num_nodes=8)
-    @matrix(metadata_quorum=[quorum.combined_kraft],
-            group_protocol=["classic", "streams"])
-    def test_failure_and_recovery_complex(self, metadata_quorum, 
group_protocol):
-        self.group_protocol = group_protocol
-        
self.run_failure_and_recovery(StreamsComplexEosTestJobRunnerService(self.test_context,
 self.kafka, group_protocol),
-                                      
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, 
group_protocol),
-                                      
StreamsComplexEosTestJobRunnerService(self.test_context, self.kafka, 
group_protocol),
-                                      
StreamsComplexEosTestVerifyRunnerService(self.test_context, self.kafka, 
group_protocol))
-
-    def run_failure_and_recovery(self, processor1, processor2, processor3, 
verifier):
-        """
-        Starts two test clients, then abort (kill -9) and restart them a few 
times.
-        Ensure that all records are delivered exactly-once.
-        """
-
-        self.driver.start()
-
-        self.add_streams(processor1)
-        processor1.clean_node_enabled = False
-        self.add_streams2(processor1, processor2)
-        self.add_streams3(processor1, processor2, processor3)
-        self.abort_streams(processor2, processor3, processor1)
-        self.add_streams3(processor2, processor3, processor1)
-        self.abort_streams(processor2, processor3, processor1)
-        self.add_streams3(processor2, processor3, processor1)
-        self.abort_streams(processor1, processor3, processor2)
-        self.stop_streams2(processor1, processor3)
-        self.stop_streams(processor1)
-        processor1.clean_node_enabled = True
-
-        self.driver.stop()
-
-        verifier.start()
-        verifier.wait()
-
-        verifier.node.account.ssh("grep ALL-RECORDS-DELIVERED %s" % 
verifier.STDOUT_FILE, allow_fail=False)
-
-    def add_streams(self, processor):
-        with processor.node.account.monitor_log(processor.STDOUT_FILE) as 
monitor:
-            processor.start()
-            self.wait_for_startup(monitor, processor)
-
-    def add_streams2(self, running_processor, processor_to_be_started):
-        with 
running_processor.node.account.monitor_log(running_processor.STDOUT_FILE) as 
monitor:
-            self.add_streams(processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor)
-
-    def add_streams3(self, running_processor1, running_processor2, 
processor_to_be_started):
-        with 
running_processor1.node.account.monitor_log(running_processor1.STDOUT_FILE) as 
monitor:
-            self.add_streams2(running_processor2, processor_to_be_started)
-            self.wait_for_startup(monitor, running_processor1)
-
-    def stop_streams(self, processor_to_be_stopped):
-        with 
processor_to_be_stopped.node.account.monitor_log(processor_to_be_stopped.STDOUT_FILE)
 as monitor2:
-            processor_to_be_stopped.stop()
-            self.wait_for(monitor2, processor_to_be_stopped, "StateChange: 
PENDING_SHUTDOWN -> NOT_RUNNING")
-
-    def stop_streams2(self, keep_alive_processor, processor_to_be_stopped):
-        with 
keep_alive_processor.node.account.monitor_log(keep_alive_processor.STDOUT_FILE) 
as monitor:
-            self.stop_streams(processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor)
-
-    def stop_streams3(self, keep_alive_processor1, keep_alive_processor2, 
processor_to_be_stopped):
-        with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as monitor:
-            self.stop_streams2(keep_alive_processor2, processor_to_be_stopped)
-            self.wait_for_startup(monitor, keep_alive_processor1)
-
-    def abort_streams(self, keep_alive_processor1, keep_alive_processor2, 
processor_to_be_aborted):
-        with 
keep_alive_processor1.node.account.monitor_log(keep_alive_processor1.STDOUT_FILE)
 as monitor1:
-            with 
keep_alive_processor2.node.account.monitor_log(keep_alive_processor2.STDOUT_FILE)
 as monitor2:
-                processor_to_be_aborted.stop_nodes(False)
-            self.wait_for_startup(monitor2, keep_alive_processor2)
-        self.wait_for_startup(monitor1, keep_alive_processor1)
-
-    def wait_for_startup(self, monitor, processor):
-        if self.group_protocol == "classic":
-            self.wait_for(monitor, processor, "StateChange: REBALANCING -> 
RUNNING")
-        else:
-            # In the streams group protocol, not all members will take part in 
the rebalance.
-            # We can indirectly observe the progress of the group by seeing 
the member epoch being bumped.
-            self.wait_for(monitor, processor, "MemberEpochBump")
-        self.wait_for(monitor, processor, "processed [0-9]* records from 
topic")
-
-    @staticmethod
-    def wait_for(monitor, processor, output):
-        monitor.wait_until(output,
-                           timeout_sec=480,
-                           err_msg=("Never saw output '%s' on " % output) + 
str(processor.node.account))

Reply via email to