This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 32b8e326da4 MINOR: Add 4.1.0 to streams system tests (#20480)
32b8e326da4 is described below
commit 32b8e326da4668f8533091c41e0790abe9d68aca
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Sep 10 16:23:55 2025 +0200
MINOR: Add 4.1.0 to streams system tests (#20480)
This PR updates all the streams system tests to include 4.1.0.
Reviewers: Lucas Brutschy <[email protected]>
---
build.gradle | 16 +
settings.gradle | 1 +
.../org/apache/kafka/streams/StreamsConfig.java | 6 +
.../kafka/streams/internals/UpgradeFromValues.java | 3 +-
.../kafka/streams/tests/SmokeTestClient.java | 299 +++++++++
.../kafka/streams/tests/SmokeTestDriver.java | 670 +++++++++++++++++++++
.../apache/kafka/streams/tests/SmokeTestUtil.java | 131 ++++
.../kafka/streams/tests/StreamsSmokeTest.java | 100 +++
.../kafka/streams/tests/StreamsUpgradeTest.java | 120 ++++
.../streams/streams_application_upgrade_test.py | 4 +-
.../streams/streams_broker_compatibility_test.py | 6 +-
.../tests/streams/streams_upgrade_test.py | 6 +-
12 files changed, 1353 insertions(+), 9 deletions(-)
diff --git a/build.gradle b/build.gradle
index 2b4f1294e9c..26ca93d0738 100644
--- a/build.gradle
+++ b/build.gradle
@@ -3316,6 +3316,22 @@ project(':streams:upgrade-system-tests-40') {
}
}
+project(':streams:upgrade-system-tests-41') {
+ base {
+ archivesName = "kafka-streams-upgrade-system-tests-41"
+ }
+
+ dependencies {
+ testImplementation libs.kafkaStreams_41
+ testRuntimeOnly libs.junitJupiter
+ testRuntimeOnly runtimeTestLibs
+ }
+
+ systemTestLibs {
+ dependsOn testJar
+ }
+}
+
project(':jmh-benchmarks') {
apply plugin: 'com.gradleup.shadow'
diff --git a/settings.gradle b/settings.gradle
index 906ea1f8f7d..7c37a046838 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -117,6 +117,7 @@ include 'clients',
'streams:upgrade-system-tests-38',
'streams:upgrade-system-tests-39',
'streams:upgrade-system-tests-40',
+ 'streams:upgrade-system-tests-41',
'tools',
'tools:tools-api',
'transaction-coordinator',
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 1bd533fbd14..7f498e32bdc 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -443,6 +443,12 @@ public class StreamsConfig extends AbstractConfig {
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_40 =
UpgradeFromValues.UPGRADE_FROM_40.toString();
+ /**
+ * Config value for parameter {@link #UPGRADE_FROM_CONFIG "upgrade.from"}
for upgrading an application from version {@code 4.1.x}.
+ */
+ @SuppressWarnings("WeakerAccess")
+ public static final String UPGRADE_FROM_41 =
UpgradeFromValues.UPGRADE_FROM_41.toString();
+
/**
* Config value for parameter {@link #PROCESSING_GUARANTEE_CONFIG
"processing.guarantee"} for at-least-once processing guarantees.
diff --git
a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
index 12cf3ead085..798383980b5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
+++
b/streams/src/main/java/org/apache/kafka/streams/internals/UpgradeFromValues.java
@@ -42,7 +42,8 @@ public enum UpgradeFromValues {
UPGRADE_FROM_37("3.7"),
UPGRADE_FROM_38("3.8"),
UPGRADE_FROM_39("3.9"),
- UPGRADE_FROM_40("4.0");
+ UPGRADE_FROM_40("4.0"),
+ UPGRADE_FROM_41("4.1");
private final String value;
diff --git
a/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
b/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
new file mode 100644
index 00000000000..dc0ad4d5601
--- /dev/null
+++
b/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/SmokeTestClient.java
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.KafkaThread;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.Topology;
+import org.apache.kafka.streams.errors.StreamsUncaughtExceptionHandler;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
+import org.apache.kafka.streams.kstream.KGroupedStream;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Materialized;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.kstream.Suppressed.BufferConfig;
+import org.apache.kafka.streams.kstream.TimeWindows;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.WindowStore;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.kafka.streams.kstream.Suppressed.untilWindowCloses;
+
+public class SmokeTestClient extends SmokeTestUtil {
+
+ private final String name;
+
+ private KafkaStreams streams;
+ private boolean uncaughtException = false;
+ private boolean started;
+ private volatile boolean closed;
+
+ private static void addShutdownHook(final String name, final Runnable
runnable) {
+ if (name != null) {
+ Runtime.getRuntime().addShutdownHook(KafkaThread.nonDaemon(name,
runnable));
+ } else {
+ Runtime.getRuntime().addShutdownHook(new Thread(runnable));
+ }
+ }
+
+ private static File tempDirectory() {
+ final String prefix = "kafka-";
+ final File file;
+ try {
+ file = Files.createTempDirectory(prefix).toFile();
+ } catch (final IOException ex) {
+ throw new RuntimeException("Failed to create a temp dir", ex);
+ }
+ file.deleteOnExit();
+
+ addShutdownHook("delete-temp-file-shutdown-hook", () -> {
+ try {
+ Utils.delete(file);
+ } catch (final IOException e) {
+ System.out.println("Error deleting " + file.getAbsolutePath());
+ e.printStackTrace(System.out);
+ }
+ });
+
+ return file;
+ }
+
+ public SmokeTestClient(final String name) {
+ this.name = name;
+ }
+
+ public boolean started() {
+ return started;
+ }
+
+ public boolean closed() {
+ return closed;
+ }
+
+ public void start(final Properties streamsProperties) {
+ final Topology build = getTopology();
+ streams = new KafkaStreams(build, getStreamsConfig(streamsProperties));
+
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+ streams.setStateListener((newState, oldState) -> {
+ System.out.printf("%s %s: %s -> %s%n", name, Instant.now(),
oldState, newState);
+ if (oldState == KafkaStreams.State.REBALANCING && newState ==
KafkaStreams.State.RUNNING) {
+ started = true;
+ countDownLatch.countDown();
+ }
+
+ if (newState == KafkaStreams.State.NOT_RUNNING) {
+ closed = true;
+ }
+ });
+
+ streams.setUncaughtExceptionHandler(e -> {
+ System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+ System.out.println(name + ": FATAL: An unexpected exception is
encountered: " + e);
+ e.printStackTrace(System.out);
+ uncaughtException = true;
+ return
StreamsUncaughtExceptionHandler.StreamThreadExceptionResponse.SHUTDOWN_CLIENT;
+ });
+
+ addShutdownHook("streams-shutdown-hook", this::close);
+
+ streams.start();
+ try {
+ if (!countDownLatch.await(1, TimeUnit.MINUTES)) {
+ System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION:
Didn't start in one minute");
+ }
+ } catch (final InterruptedException e) {
+ System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: " + e);
+ e.printStackTrace(System.out);
+ }
+ System.out.println(name + ": SMOKE-TEST-CLIENT-STARTED");
+ System.out.println(name + " started at " + Instant.now());
+ }
+
+ public void closeAsync() {
+ streams.close(Duration.ZERO);
+ }
+
+ public void close() {
+ final boolean closed = streams.close(Duration.ofMinutes(1));
+
+ if (closed && !uncaughtException) {
+ System.out.println(name + ": SMOKE-TEST-CLIENT-CLOSED");
+ } else if (closed) {
+ System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION");
+ } else {
+ System.out.println(name + ": SMOKE-TEST-CLIENT-EXCEPTION: Didn't
close");
+ }
+ }
+
+ private Properties getStreamsConfig(final Properties props) {
+ final Properties fullProps = new Properties(props);
+ fullProps.put(StreamsConfig.APPLICATION_ID_CONFIG, "SmokeTest");
+ fullProps.put(StreamsConfig.CLIENT_ID_CONFIG, "SmokeTest-" + name);
+ fullProps.put(StreamsConfig.STATE_DIR_CONFIG,
tempDirectory().getAbsolutePath());
+ fullProps.putAll(props);
+ return fullProps;
+ }
+
+ public Topology getTopology() {
+ final StreamsBuilder builder = new StreamsBuilder();
+ final Consumed<String, Integer> stringIntConsumed =
Consumed.with(stringSerde, intSerde);
+ final KStream<String, Integer> source = builder.stream("data",
stringIntConsumed);
+ source.filterNot((k, v) -> k.equals("flush"))
+ .to("echo", Produced.with(stringSerde, intSerde));
+ final KStream<String, Integer> data = source.filter((key, value) ->
value == null || value != END);
+ data.process(SmokeTestUtil.printProcessorSupplier("data", name));
+
+ // min
+ final KGroupedStream<String, Integer> groupedData =
data.groupByKey(Grouped.with(stringSerde, intSerde));
+
+ final KTable<Windowed<String>, Integer> minAggregation = groupedData
+ .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofDays(1),
Duration.ofMinutes(1)))
+ .aggregate(
+ () -> Integer.MAX_VALUE,
+ (aggKey, value, aggregate) -> (value < aggregate) ? value :
aggregate,
+ Materialized
+ .<String, Integer, WindowStore<Bytes,
byte[]>>as("uwin-min")
+ .withValueSerde(intSerde)
+ .withRetention(Duration.ofHours(25))
+ );
+
+ streamify(minAggregation, "min-raw");
+
+
streamify(minAggregation.suppress(untilWindowCloses(BufferConfig.unbounded())),
"min-suppressed");
+
+ minAggregation
+ .toStream(new Unwindow<>())
+ .filterNot((k, v) -> k.equals("flush"))
+ .to("min", Produced.with(stringSerde, intSerde));
+
+ final KTable<Windowed<String>, Integer> smallWindowSum = groupedData
+ .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofSeconds(2),
Duration.ofSeconds(30)).advanceBy(Duration.ofSeconds(1)))
+ .reduce(Integer::sum);
+
+ streamify(smallWindowSum, "sws-raw");
+
streamify(smallWindowSum.suppress(untilWindowCloses(BufferConfig.unbounded())),
"sws-suppressed");
+
+ final KTable<String, Integer> minTable = builder.table(
+ "min",
+ Consumed.with(stringSerde, intSerde),
+ Materialized.as("minStoreName"));
+
+
minTable.toStream().process(SmokeTestUtil.printProcessorSupplier("min", name));
+
+ // max
+ groupedData
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
+ .aggregate(
+ () -> Integer.MIN_VALUE,
+ (aggKey, value, aggregate) -> (value > aggregate) ? value :
aggregate,
+ Materialized.<String, Integer, WindowStore<Bytes,
byte[]>>as("uwin-max").withValueSerde(intSerde))
+ .toStream(new Unwindow<>())
+ .filterNot((k, v) -> k.equals("flush"))
+ .to("max", Produced.with(stringSerde, intSerde));
+
+ final KTable<String, Integer> maxTable = builder.table(
+ "max",
+ Consumed.with(stringSerde, intSerde),
+ Materialized.as("maxStoreName"));
+
maxTable.toStream().process(SmokeTestUtil.printProcessorSupplier("max", name));
+
+ // sum
+ groupedData
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
+ .aggregate(
+ () -> 0L,
+ (aggKey, value, aggregate) -> (long) value + aggregate,
+ Materialized.<String, Long, WindowStore<Bytes,
byte[]>>as("win-sum").withValueSerde(longSerde))
+ .toStream(new Unwindow<>())
+ .filterNot((k, v) -> k.equals("flush"))
+ .to("sum", Produced.with(stringSerde, longSerde));
+
+ final Consumed<String, Long> stringLongConsumed =
Consumed.with(stringSerde, longSerde);
+ final KTable<String, Long> sumTable = builder.table("sum",
stringLongConsumed);
+
sumTable.toStream().process(SmokeTestUtil.printProcessorSupplier("sum", name));
+
+ // cnt
+ groupedData
+ .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofDays(2)))
+ .count(Materialized.as("uwin-cnt"))
+ .toStream(new Unwindow<>())
+ .filterNot((k, v) -> k.equals("flush"))
+ .to("cnt", Produced.with(stringSerde, longSerde));
+
+ final KTable<String, Long> cntTable = builder.table(
+ "cnt",
+ Consumed.with(stringSerde, longSerde),
+ Materialized.as("cntStoreName"));
+
cntTable.toStream().process(SmokeTestUtil.printProcessorSupplier("cnt", name));
+
+ // dif
+ maxTable
+ .join(
+ minTable,
+ (value1, value2) -> value1 - value2)
+ .toStream()
+ .filterNot((k, v) -> k.equals("flush"))
+ .to("dif", Produced.with(stringSerde, intSerde));
+
+ // avg
+ sumTable
+ .join(
+ cntTable,
+ (value1, value2) -> (double) value1 / (double) value2)
+ .toStream()
+ .filterNot((k, v) -> k.equals("flush"))
+ .to("avg", Produced.with(stringSerde, doubleSerde));
+
+ // test repartition
+ final Agg agg = new Agg();
+ cntTable.groupBy(agg.selector(), Grouped.with(stringSerde, longSerde))
+ .aggregate(agg.init(), agg.adder(), agg.remover(),
+ Materialized.<String,
Long>as(Stores.inMemoryKeyValueStore("cntByCnt"))
+ .withKeySerde(Serdes.String())
+ .withValueSerde(Serdes.Long()))
+ .toStream()
+ .to("tagg", Produced.with(stringSerde, longSerde));
+
+ return builder.build();
+ }
+
+ private static void streamify(final KTable<Windowed<String>, Integer>
windowedTable, final String topic) {
+ windowedTable
+ .toStream()
+ .filterNot((k, v) -> k.key().equals("flush"))
+ .map((key, value) -> new KeyValue<>(key.toString(), value))
+ .to(topic, Produced.with(stringSerde, intSerde));
+ }
+}
diff --git
a/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
b/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
new file mode 100644
index 00000000000..8ab48f7cf5f
--- /dev/null
+++
b/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/SmokeTestDriver.java
@@ -0,0 +1,670 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static java.util.Collections.emptyMap;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+
+public class SmokeTestDriver extends SmokeTestUtil {
+ private static final String[] NUMERIC_VALUE_TOPICS = {
+ "data",
+ "echo",
+ "max",
+ "min", "min-suppressed", "min-raw",
+ "dif",
+ "sum",
+ "sws-raw", "sws-suppressed",
+ "cnt",
+ "avg",
+ "tagg"
+ };
+ private static final String[] STRING_VALUE_TOPICS = {
+ "fk"
+ };
+
+ private static final String[] TOPICS = new
String[NUMERIC_VALUE_TOPICS.length + STRING_VALUE_TOPICS.length];
+ static {
+ System.arraycopy(NUMERIC_VALUE_TOPICS, 0, TOPICS, 0,
NUMERIC_VALUE_TOPICS.length);
+ System.arraycopy(STRING_VALUE_TOPICS, 0, TOPICS,
NUMERIC_VALUE_TOPICS.length, STRING_VALUE_TOPICS.length);
+ }
+
+ private static final int MAX_RECORD_EMPTY_RETRIES = 30;
+
+ private static class ValueList {
+ public final String key;
+ private final int[] values;
+ private int index;
+
+ ValueList(final int min, final int max) {
+ key = min + "-" + max;
+
+ values = new int[max - min + 1];
+ for (int i = 0; i < values.length; i++) {
+ values[i] = min + i;
+ }
+ // We want to randomize the order of data to test not completely
predictable processing order
+ // However, values are also use as a timestamp of the record.
(TODO: separate data and timestamp)
+ // We keep some correlation of time and order. Thus, the shuffling
is done with a sliding window
+ shuffle(values, 10);
+
+ index = 0;
+ }
+
+ int next() {
+ return (index < values.length) ? values[index++] : -1;
+ }
+ }
+
+ public static String[] topics() {
+ return Arrays.copyOf(TOPICS, TOPICS.length);
+ }
+
+ static void generatePerpetually(final String kafka,
+ final int numKeys,
+ final int maxRecordsPerKey) {
+ final Properties producerProps = generatorProperties(kafka);
+
+ int numRecordsProduced = 0;
+
+ final ValueList[] data = new ValueList[numKeys];
+ for (int i = 0; i < numKeys; i++) {
+ data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+ }
+
+ final Random rand = new Random();
+
+ try (final KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps)) {
+ while (true) {
+ final int index = rand.nextInt(numKeys);
+ final String key = data[index].key;
+ final int value = data[index].next();
+
+ final ProducerRecord<byte[], byte[]> record =
+ new ProducerRecord<>(
+ "data",
+ stringSerde.serializer().serialize("", key),
+ intSerde.serializer().serialize("", value)
+ );
+ producer.send(record);
+
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+ producer.send(fkRecord);
+
+ numRecordsProduced++;
+ if (numRecordsProduced % 100 == 0) {
+ System.out.println(Instant.now() + " " +
numRecordsProduced + " records produced");
+ }
+ Utils.sleep(2);
+ }
+ }
+ }
+
+ public static Map<String, Set<Integer>> generate(final String kafka,
+ final int numKeys,
+ final int
maxRecordsPerKey,
+ final Duration
timeToSpend) {
+ final Properties producerProps = generatorProperties(kafka);
+
+ int numRecordsProduced = 0;
+
+ final Map<String, Set<Integer>> allData = new HashMap<>();
+ final ValueList[] data = new ValueList[numKeys];
+ for (int i = 0; i < numKeys; i++) {
+ data[i] = new ValueList(i, i + maxRecordsPerKey - 1);
+ allData.put(data[i].key, new HashSet<>());
+ }
+ final Random rand = new Random();
+
+ int remaining = data.length;
+
+ final long recordPauseTime = timeToSpend.toMillis() / numKeys /
maxRecordsPerKey;
+
+ final List<ProducerRecord<byte[], byte[]>> dataNeedRetry = new
ArrayList<>();
+ final List<ProducerRecord<byte[], byte[]>> fkNeedRetry = new
ArrayList<>();
+
+ try (final KafkaProducer<byte[], byte[]> producer = new
KafkaProducer<>(producerProps)) {
+ while (remaining > 0) {
+ final int index = rand.nextInt(remaining);
+ final String key = data[index].key;
+ final int value = data[index].next();
+
+ if (value < 0) {
+ remaining--;
+ data[index] = data[remaining];
+ } else {
+ final ProducerRecord<byte[], byte[]> record =
+ new ProducerRecord<>(
+ "data",
+ stringSerde.serializer().serialize("", key),
+ intSerde.serializer().serialize("", value)
+ );
+
+ producer.send(record, new TestCallback(record,
dataNeedRetry));
+
+ final ProducerRecord<byte[], byte[]> fkRecord =
+ new ProducerRecord<>(
+ "fk",
+ intSerde.serializer().serialize("", value),
+ stringSerde.serializer().serialize("", key)
+ );
+
+ producer.send(fkRecord, new TestCallback(fkRecord,
fkNeedRetry));
+
+ numRecordsProduced++;
+ allData.get(key).add(value);
+ if (numRecordsProduced % 100 == 0) {
+ System.out.println(Instant.now() + " " +
numRecordsProduced + " records produced");
+ }
+ Utils.sleep(Math.max(recordPauseTime, 2));
+ }
+ }
+ producer.flush();
+
+ retry(producer, dataNeedRetry, stringSerde);
+ retry(producer, fkNeedRetry, intSerde);
+
+ flush(producer,
+ "data",
+ stringSerde.serializer().serialize("", "flush"),
+ intSerde.serializer().serialize("", 0)
+ );
+ flush(producer,
+ "fk",
+ intSerde.serializer().serialize("", 0),
+ stringSerde.serializer().serialize("", "flush")
+ );
+ }
+ return Collections.unmodifiableMap(allData);
+ }
+
+ private static void retry(final KafkaProducer<byte[], byte[]> producer,
+ List<ProducerRecord<byte[], byte[]>> needRetry,
+ final Serde<?> keySerde) {
+ int remainingRetries = 5;
+ while (!needRetry.isEmpty()) {
+ final List<ProducerRecord<byte[], byte[]>> needRetry2 = new
ArrayList<>();
+ for (final ProducerRecord<byte[], byte[]> record : needRetry) {
+ System.out.println(
+ "retry producing " +
keySerde.deserializer().deserialize("", record.key()));
+ producer.send(record, new TestCallback(record, needRetry2));
+ }
+ producer.flush();
+ needRetry = needRetry2;
+ if (--remainingRetries == 0 && !needRetry.isEmpty()) {
+ System.err.println("Failed to produce all records after
multiple retries");
+ Exit.exit(1);
+ }
+ }
+ }
+
+ private static void flush(final KafkaProducer<byte[], byte[]> producer,
+ final String topic,
+ final byte[] keyBytes,
+ final byte[] valBytes) {
+ // now that we've sent everything, we'll send some final records with
a timestamp high enough to flush out
+ // all suppressed records.
+ final List<PartitionInfo> partitions = producer.partitionsFor(topic);
+ for (final PartitionInfo partition : partitions) {
+ producer.send(new ProducerRecord<>(
+ partition.topic(),
+ partition.partition(),
+ System.currentTimeMillis() + Duration.ofDays(2).toMillis(),
+ keyBytes,
+ valBytes
+ ));
+ }
+ }
+
+ private static Properties generatorProperties(final String kafka) {
+ final Properties producerProps = new Properties();
+ producerProps.put(ProducerConfig.CLIENT_ID_CONFIG, "SmokeTest");
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class);
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+ return producerProps;
+ }
+
+ private static class TestCallback implements Callback {
+ private final ProducerRecord<byte[], byte[]> originalRecord;
+ private final List<ProducerRecord<byte[], byte[]>> needRetry;
+
+ TestCallback(final ProducerRecord<byte[], byte[]> originalRecord,
+ final List<ProducerRecord<byte[], byte[]>> needRetry) {
+ this.originalRecord = originalRecord;
+ this.needRetry = needRetry;
+ }
+
+ @Override
+ public void onCompletion(final RecordMetadata metadata, final
Exception exception) {
+ if (exception != null) {
+ if (exception instanceof TimeoutException) {
+ needRetry.add(originalRecord);
+ } else {
+ exception.printStackTrace();
+ Exit.exit(1);
+ }
+ }
+ }
+ }
+
+ private static void shuffle(final int[] data,
@SuppressWarnings("SameParameterValue") final int windowSize) {
+ final Random rand = new Random();
+ for (int i = 0; i < data.length; i++) {
+ // we shuffle data within windowSize
+ final int j = rand.nextInt(Math.min(data.length - i, windowSize))
+ i;
+
+ // swap
+ final int tmp = data[i];
+ data[i] = data[j];
+ data[j] = tmp;
+ }
+ }
+
+ public static class NumberDeserializer implements Deserializer<Number> {
+ @Override
+ public Number deserialize(final String topic, final byte[] data) {
+ final Number value;
+ switch (topic) {
+ case "data":
+ case "echo":
+ case "min":
+ case "min-raw":
+ case "min-suppressed":
+ case "sws-raw":
+ case "sws-suppressed":
+ case "max":
+ case "dif":
+ value = intSerde.deserializer().deserialize(topic, data);
+ break;
+ case "sum":
+ case "cnt":
+ case "tagg":
+ value = longSerde.deserializer().deserialize(topic, data);
+ break;
+ case "avg":
+ value = doubleSerde.deserializer().deserialize(topic,
data);
+ break;
+ default:
+ throw new RuntimeException("unknown topic: " + topic);
+ }
+ return value;
+ }
+ }
+
+ public static VerificationResult verify(final String kafka,
+ final Map<String, Set<Integer>>
inputs,
+ final int maxRecordsPerKey) {
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.CLIENT_ID_CONFIG, "verifier");
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafka);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
NumberDeserializer.class);
+ props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
+
+ final KafkaConsumer<String, Number> consumer = new
KafkaConsumer<>(props);
+ final List<TopicPartition> partitions = getAllPartitions(consumer,
NUMERIC_VALUE_TOPICS);
+ consumer.assign(partitions);
+ consumer.seekToBeginning(partitions);
+
+ final int recordsGenerated = inputs.size() * maxRecordsPerKey;
+ int recordsProcessed = 0;
+ final Map<String, AtomicInteger> processed =
+ Stream.of(NUMERIC_VALUE_TOPICS)
+ .collect(Collectors.toMap(t -> t, t -> new
AtomicInteger(0)));
+
+ final Map<String, Map<String, LinkedList<ConsumerRecord<String,
Number>>>> events = new HashMap<>();
+
+ VerificationResult verificationResult = new VerificationResult(false,
"no results yet");
+ int retry = 0;
+ final long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start <
TimeUnit.MINUTES.toMillis(6)) {
+ final ConsumerRecords<String, Number> records =
consumer.poll(Duration.ofSeconds(5));
+ if (records.isEmpty() && recordsProcessed >= recordsGenerated) {
+ verificationResult = verifyAll(inputs, events, false);
+ if (verificationResult.passed()) {
+ break;
+ } else if (retry++ > MAX_RECORD_EMPTY_RETRIES) {
+ System.out.println(Instant.now() + " Didn't get any more
results, verification hasn't passed, and out of retries.");
+ break;
+ } else {
+ System.out.println(Instant.now() + " Didn't get any more
results, but verification hasn't passed (yet). Retrying..." + retry);
+ }
+ } else {
+ System.out.println(Instant.now() + " Get some more results
from " + records.partitions() + ", resetting retry.");
+
+ retry = 0;
+ for (final ConsumerRecord<String, Number> record : records) {
+ final String key = record.key();
+
+ final String topic = record.topic();
+ processed.get(topic).incrementAndGet();
+
+ if (topic.equals("echo")) {
+ recordsProcessed++;
+ if (recordsProcessed % 100 == 0) {
+ System.out.println("Echo records processed = " +
recordsProcessed);
+ }
+ }
+
+ events.computeIfAbsent(topic, t -> new HashMap<>())
+ .computeIfAbsent(key, k -> new LinkedList<>())
+ .add(record);
+ }
+
+ System.out.println(processed);
+ }
+ }
+ consumer.close();
+ final long finished = System.currentTimeMillis() - start;
+ System.out.println("Verification time=" + finished);
+ System.out.println("-------------------");
+ System.out.println("Result Verification");
+ System.out.println("-------------------");
+ System.out.println("recordGenerated=" + recordsGenerated);
+ System.out.println("recordProcessed=" + recordsProcessed);
+
+ if (recordsProcessed > recordsGenerated) {
+ System.out.println("PROCESSED-MORE-THAN-GENERATED");
+ } else if (recordsProcessed < recordsGenerated) {
+ System.out.println("PROCESSED-LESS-THAN-GENERATED");
+ }
+
+ boolean success;
+
+ final Map<String, Set<Number>> received =
+ events.get("echo")
+ .entrySet()
+ .stream()
+ .map(entry -> mkEntry(
+ entry.getKey(),
+
entry.getValue().stream().map(ConsumerRecord::value).collect(Collectors.toSet()))
+ )
+ .collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+
+ success = inputs.equals(received);
+
+ if (success) {
+ System.out.println("ALL-RECORDS-DELIVERED");
+ } else {
+ int missedCount = 0;
+ for (final Map.Entry<String, Set<Integer>> entry :
inputs.entrySet()) {
+ missedCount += received.get(entry.getKey()).size();
+ }
+ System.out.println("missedRecords=" + missedCount);
+ }
+
+ // give it one more try if it's not already passing.
+ if (!verificationResult.passed()) {
+ verificationResult = verifyAll(inputs, events, true);
+ }
+ success &= verificationResult.passed();
+
+ System.out.println(verificationResult.result());
+
+ System.out.println(success ? "SUCCESS" : "FAILURE");
+ return verificationResult;
+ }
+
+ public static class VerificationResult {
+ private final boolean passed;
+ private final String result;
+
+ VerificationResult(final boolean passed, final String result) {
+ this.passed = passed;
+ this.result = result;
+ }
+
+ public boolean passed() {
+ return passed;
+ }
+
+ public String result() {
+ return result;
+ }
+ }
+
+ private static VerificationResult verifyAll(final Map<String,
Set<Integer>> inputs,
+ final Map<String, Map<String,
LinkedList<ConsumerRecord<String, Number>>>> events,
+ final boolean printResults) {
+ final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ boolean pass;
+ try (final PrintStream resultStream = new
PrintStream(byteArrayOutputStream)) {
+ pass = verifyTAgg(resultStream, inputs, events.get("tagg"),
printResults);
+ pass &= verifySuppressed(resultStream, "min-suppressed", events,
printResults);
+ pass &= verify(resultStream, "min-suppressed", inputs, events,
windowedKey -> {
+ final String unwindowedKey = windowedKey.substring(1,
windowedKey.length() - 1).replaceAll("@.*", "");
+ return getMin(unwindowedKey);
+ }, printResults);
+ pass &= verifySuppressed(resultStream, "sws-suppressed", events,
printResults);
+ pass &= verify(resultStream, "min", inputs, events,
SmokeTestDriver::getMin, printResults);
+ pass &= verify(resultStream, "max", inputs, events,
SmokeTestDriver::getMax, printResults);
+ pass &= verify(resultStream, "dif", inputs, events, key ->
getMax(key).intValue() - getMin(key).intValue(), printResults);
+ pass &= verify(resultStream, "sum", inputs, events,
SmokeTestDriver::getSum, printResults);
+ pass &= verify(resultStream, "cnt", inputs, events, key1 ->
getMax(key1).intValue() - getMin(key1).intValue() + 1L, printResults);
+ pass &= verify(resultStream, "avg", inputs, events,
SmokeTestDriver::getAvg, printResults);
+ }
+ return new VerificationResult(pass, new
String(byteArrayOutputStream.toByteArray(), StandardCharsets.UTF_8));
+ }
+
+ private static boolean verify(final PrintStream resultStream,
+ final String topic,
+ final Map<String, Set<Integer>> inputData,
+ final Map<String, Map<String,
LinkedList<ConsumerRecord<String, Number>>>> events,
+ final Function<String, Number>
keyToExpectation,
+ final boolean printResults) {
+ final Map<String, LinkedList<ConsumerRecord<String, Number>>>
observedInputEvents = events.get("data");
+ final Map<String, LinkedList<ConsumerRecord<String, Number>>>
outputEvents = events.getOrDefault(topic, emptyMap());
+ if (outputEvents.isEmpty()) {
+ resultStream.println(topic + " is empty");
+ return false;
+ } else {
+ resultStream.printf("verifying %s with %d keys%n", topic,
outputEvents.size());
+
+ if (outputEvents.size() != inputData.size()) {
+ resultStream.printf("fail: resultCount=%d
expectedCount=%s%n\tresult=%s%n\texpected=%s%n",
+ outputEvents.size(), inputData.size(),
outputEvents.keySet(), inputData.keySet());
+ return false;
+ }
+ for (final Map.Entry<String, LinkedList<ConsumerRecord<String,
Number>>> entry : outputEvents.entrySet()) {
+ final String key = entry.getKey();
+ final Number expected = keyToExpectation.apply(key);
+ final Number actual = entry.getValue().getLast().value();
+ if (!expected.equals(actual)) {
+ resultStream.printf("%s fail: key=%s actual=%s
expected=%s%n", topic, key, actual, expected);
+
+ if (printResults) {
+ resultStream.printf("\t inputEvents=%n%s%n\t" +
+
"echoEvents=%n%s%n\tmaxEvents=%n%s%n\tminEvents=%n%s%n\tdifEvents=%n%s%n\tcntEvents=%n%s%n\ttaggEvents=%n%s%n",
+ indent("\t\t", observedInputEvents.get(key)),
+ indent("\t\t", events.getOrDefault("echo",
emptyMap()).getOrDefault(key, new LinkedList<>())),
+ indent("\t\t", events.getOrDefault("max",
emptyMap()).getOrDefault(key, new LinkedList<>())),
+ indent("\t\t", events.getOrDefault("min",
emptyMap()).getOrDefault(key, new LinkedList<>())),
+ indent("\t\t", events.getOrDefault("dif",
emptyMap()).getOrDefault(key, new LinkedList<>())),
+ indent("\t\t", events.getOrDefault("cnt",
emptyMap()).getOrDefault(key, new LinkedList<>())),
+ indent("\t\t", events.getOrDefault("tagg",
emptyMap()).getOrDefault(key, new LinkedList<>())));
+
+ if (!Set.of("echo", "max", "min", "dif", "cnt",
"tagg").contains(topic))
+ resultStream.printf("%sEvents=%n%s%n", topic,
indent("\t\t", entry.getValue()));
+ }
+
+ return false;
+ }
+ }
+ return true;
+ }
+ }
+
+
+ private static boolean verifySuppressed(final PrintStream resultStream,
+
@SuppressWarnings("SameParameterValue") final String topic,
+ final Map<String, Map<String,
LinkedList<ConsumerRecord<String, Number>>>> events,
+ final boolean printResults) {
+ resultStream.println("verifying suppressed " + topic);
+ final Map<String, LinkedList<ConsumerRecord<String, Number>>>
topicEvents = events.getOrDefault(topic, emptyMap());
+ for (final Map.Entry<String, LinkedList<ConsumerRecord<String,
Number>>> entry : topicEvents.entrySet()) {
+ if (entry.getValue().size() != 1) {
+ final String unsuppressedTopic = topic.replace("-suppressed",
"-raw");
+ final String key = entry.getKey();
+ final String unwindowedKey = key.substring(1, key.length() -
1).replaceAll("@.*", "");
+ resultStream.printf("fail: key=%s%n\tnon-unique result:%n%s%n",
+ key,
+ indent("\t\t", entry.getValue()));
+
+ if (printResults)
+
resultStream.printf("\tresultEvents:%n%s%n\tinputEvents:%n%s%n",
+ indent("\t\t", events.get(unsuppressedTopic).get(key)),
+ indent("\t\t", events.get("data").get(unwindowedKey)));
+
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private static String indent(@SuppressWarnings("SameParameterValue") final
String prefix,
+ final Iterable<ConsumerRecord<String,
Number>> list) {
+ final StringBuilder stringBuilder = new StringBuilder();
+ for (final ConsumerRecord<String, Number> record : list) {
+ stringBuilder.append(prefix).append(record).append('\n');
+ }
+ return stringBuilder.toString();
+ }
+
+ private static Long getSum(final String key) {
+ final int min = getMin(key).intValue();
+ final int max = getMax(key).intValue();
+ return ((long) min + max) * (max - min + 1L) / 2L;
+ }
+
+ private static Double getAvg(final String key) {
+ final int min = getMin(key).intValue();
+ final int max = getMax(key).intValue();
+ return ((long) min + max) / 2.0;
+ }
+
+
+ private static boolean verifyTAgg(final PrintStream resultStream,
+ final Map<String, Set<Integer>> allData,
+ final Map<String,
LinkedList<ConsumerRecord<String, Number>>> taggEvents,
+ final boolean printResults) {
+ if (taggEvents == null) {
+ resultStream.println("tagg is missing");
+ return false;
+ } else if (taggEvents.isEmpty()) {
+ resultStream.println("tagg is empty");
+ return false;
+ } else {
+ resultStream.println("verifying tagg");
+
+ // generate expected answer
+ final Map<String, Long> expected = new HashMap<>();
+ for (final String key : allData.keySet()) {
+ final int min = getMin(key).intValue();
+ final int max = getMax(key).intValue();
+ final String cnt = Long.toString(max - min + 1L);
+
+ expected.put(cnt, expected.getOrDefault(cnt, 0L) + 1);
+ }
+
+ // check the result
+ for (final Map.Entry<String, LinkedList<ConsumerRecord<String,
Number>>> entry : taggEvents.entrySet()) {
+ final String key = entry.getKey();
+ Long expectedCount = expected.remove(key);
+ if (expectedCount == null) {
+ expectedCount = 0L;
+ }
+
+ if (entry.getValue().getLast().value().longValue() !=
expectedCount) {
+ resultStream.println("fail: key=" + key + " tagg=" +
entry.getValue() + " expected=" + expectedCount);
+
+ if (printResults)
+ resultStream.println("\t taggEvents: " +
entry.getValue());
+ return false;
+ }
+ }
+
+ }
+ return true;
+ }
+
+ private static Number getMin(final String key) {
+ return Integer.parseInt(key.split("-")[0]);
+ }
+
+ private static Number getMax(final String key) {
+ return Integer.parseInt(key.split("-")[1]);
+ }
+
+ private static List<TopicPartition> getAllPartitions(final
KafkaConsumer<?, ?> consumer, final String... topics) {
+ final List<TopicPartition> partitions = new ArrayList<>();
+
+ for (final String topic : topics) {
+ for (final PartitionInfo info : consumer.partitionsFor(topic)) {
+ partitions.add(new TopicPartition(info.topic(),
info.partition()));
+ }
+ }
+ return partitions;
+ }
+
+}
diff --git
a/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
b/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
new file mode 100644
index 00000000000..2e4938edfe7
--- /dev/null
+++
b/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/SmokeTestUtil.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.kstream.Aggregator;
+import org.apache.kafka.streams.kstream.Initializer;
+import org.apache.kafka.streams.kstream.KeyValueMapper;
+import org.apache.kafka.streams.kstream.Windowed;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+
+import java.time.Instant;
+
+public class SmokeTestUtil {
+
+ static final int END = Integer.MAX_VALUE;
+
+ static ProcessorSupplier<Object, Object, Void, Void>
printProcessorSupplier(final String topic) {
+ return printProcessorSupplier(topic, "");
+ }
+
+ static ProcessorSupplier<Object, Object, Void, Void>
printProcessorSupplier(final String topic, final String name) {
+ return () -> new ContextualProcessor<Object, Object, Void, Void>() {
+ private int numRecordsProcessed = 0;
+ private long smallestOffset = Long.MAX_VALUE;
+ private long largestOffset = Long.MIN_VALUE;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ super.init(context);
+ System.out.println("[4.1] initializing processor: topic=" +
topic + " taskId=" + context.taskId());
+ System.out.flush();
+ numRecordsProcessed = 0;
+ smallestOffset = Long.MAX_VALUE;
+ largestOffset = Long.MIN_VALUE;
+ }
+
+ @Override
+ public void process(final Record<Object, Object> record) {
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.printf("%s: %s%n", name, Instant.now());
+ System.out.println("processed " + numRecordsProcessed + "
records from topic=" + topic);
+ }
+
+ context().recordMetadata().ifPresent(recordMetadata -> {
+ if (smallestOffset > recordMetadata.offset()) {
+ smallestOffset = recordMetadata.offset();
+ }
+ if (largestOffset < recordMetadata.offset()) {
+ largestOffset = recordMetadata.offset();
+ }
+ });
+ }
+
+ @Override
+ public void close() {
+ System.out.printf("Close processor for task %s%n",
context().taskId());
+ System.out.println("processed " + numRecordsProcessed + "
records");
+ final long processed;
+ if (largestOffset >= smallestOffset) {
+ processed = 1L + largestOffset - smallestOffset;
+ } else {
+ processed = 0L;
+ }
+ System.out.println("offset " + smallestOffset + " to " +
largestOffset + " -> processed " + processed);
+ System.out.flush();
+ }
+ };
+ }
+
+ public static final class Unwindow<K, V> implements
KeyValueMapper<Windowed<K>, V, K> {
+ @Override
+ public K apply(final Windowed<K> winKey, final V value) {
+ return winKey.key();
+ }
+ }
+
+ public static class Agg {
+
+ KeyValueMapper<String, Long, KeyValue<String, Long>> selector() {
+ return (key, value) -> new KeyValue<>(value == null ? null :
Long.toString(value), 1L);
+ }
+
+ public Initializer<Long> init() {
+ return () -> 0L;
+ }
+
+ Aggregator<String, Long, Long> adder() {
+ return (aggKey, value, aggregate) -> aggregate + value;
+ }
+
+ Aggregator<String, Long, Long> remover() {
+ return (aggKey, value, aggregate) -> aggregate - value;
+ }
+ }
+
+ public static Serde<String> stringSerde = Serdes.String();
+
+ public static Serde<Integer> intSerde = Serdes.Integer();
+
+ static Serde<Long> longSerde = Serdes.Long();
+
+ static Serde<Double> doubleSerde = Serdes.Double();
+
+ public static void sleep(final long duration) {
+ try {
+ Thread.sleep(duration);
+ } catch (final Exception ignore) { }
+ }
+
+}
diff --git
a/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
b/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
new file mode 100644
index 00000000000..5803b2fbd02
--- /dev/null
+++
b/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/StreamsSmokeTest.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Exit;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.StreamsConfig;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.apache.kafka.streams.tests.SmokeTestDriver.generate;
+import static
org.apache.kafka.streams.tests.SmokeTestDriver.generatePerpetually;
+
+public class StreamsSmokeTest {
+
+ /**
+ * args ::= kafka propFileName command disableAutoTerminate
+ * command := "run" | "process"
+ *
+ * @param args
+ */
+ public static void main(final String[] args) throws IOException {
+ if (args.length < 2) {
+ System.err.println("StreamsSmokeTest are expecting two parameters:
propFile, command; but only see " + args.length + " parameter");
+ Exit.exit(1);
+ }
+
+ final String propFileName = args[0];
+ final String command = args[1];
+ final boolean disableAutoTerminate = args.length > 2;
+
+ final Properties streamsProperties = Utils.loadProps(propFileName);
+ final String kafka =
streamsProperties.getProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+ final String processingGuarantee =
streamsProperties.getProperty(StreamsConfig.PROCESSING_GUARANTEE_CONFIG);
+
+ if (kafka == null) {
+ System.err.println("No bootstrap kafka servers specified in " +
StreamsConfig.BOOTSTRAP_SERVERS_CONFIG);
+ Exit.exit(1);
+ }
+
+ if ("process".equals(command)) {
+ if (!StreamsConfig.AT_LEAST_ONCE.equals(processingGuarantee) &&
+ !StreamsConfig.EXACTLY_ONCE_V2.equals(processingGuarantee)) {
+
+ System.err.println("processingGuarantee must be either " +
StreamsConfig.AT_LEAST_ONCE + " or " +
+ StreamsConfig.EXACTLY_ONCE_V2);
+
+ Exit.exit(1);
+ }
+ }
+
+ System.out.println("StreamsTest instance started (StreamsSmokeTest)");
+ System.out.println("command=" + command);
+ System.out.println("props=" + streamsProperties);
+ System.out.println("disableAutoTerminate=" + disableAutoTerminate);
+
+ switch (command) {
+ case "run":
+ // this starts the driver (data generation and result
verification)
+ final int numKeys = 10;
+ final int maxRecordsPerKey = 500;
+ if (disableAutoTerminate) {
+ generatePerpetually(kafka, numKeys, maxRecordsPerKey);
+ } else {
+ // slow down data production to span 30 seconds so that
system tests have time to
+ // do their bounces, etc.
+ final Map<String, Set<Integer>> allData =
+ generate(kafka, numKeys, maxRecordsPerKey,
Duration.ofSeconds(30));
+ SmokeTestDriver.verify(kafka, allData, maxRecordsPerKey);
+ }
+ break;
+ case "process":
+ // this starts the stream processing app
+ new
SmokeTestClient(UUID.randomUUID().toString()).start(streamsProperties);
+ break;
+ default:
+ System.out.println("unknown command: " + command);
+ }
+ }
+
+}
diff --git
a/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
b/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
new file mode 100644
index 00000000000..350169288e3
--- /dev/null
+++
b/streams/upgrade-system-tests-41/src/test/java/org/apache/kafka/streams/tests/StreamsUpgradeTest.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.tests;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.KStream;
+import org.apache.kafka.streams.kstream.KTable;
+import org.apache.kafka.streams.kstream.Produced;
+import org.apache.kafka.streams.processor.api.ContextualProcessor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Record;
+
+import java.util.Properties;
+
+import static org.apache.kafka.streams.tests.SmokeTestUtil.intSerde;
+import static org.apache.kafka.streams.tests.SmokeTestUtil.stringSerde;
+
+
+public class StreamsUpgradeTest {
+
+ @SuppressWarnings("unchecked")
+ public static void main(final String[] args) throws Exception {
+ if (args.length < 1) {
+ System.err.println("StreamsUpgradeTest requires one argument
(properties-file) but provided none");
+ }
+ final String propFileName = args[0];
+
+ final Properties streamsProperties = Utils.loadProps(propFileName);
+
+ System.out.println("StreamsTest instance started (StreamsUpgradeTest
v4.1)");
+ System.out.println("props=" + streamsProperties);
+
+ final StreamsBuilder builder = new StreamsBuilder();
+ final KTable<String, Integer> dataTable = builder.table(
+ "data", Consumed.with(stringSerde, intSerde));
+ final KStream<String, Integer> dataStream = dataTable.toStream();
+ dataStream.process(printProcessorSupplier("data"));
+ dataStream.to("echo");
+
+ final boolean runFkJoin =
Boolean.parseBoolean(streamsProperties.getProperty(
+ "test.run_fk_join",
+ "false"));
+ if (runFkJoin) {
+ try {
+ final KTable<Integer, String> fkTable = builder.table(
+ "fk", Consumed.with(intSerde, stringSerde));
+ buildFKTable(dataStream, fkTable);
+ } catch (final Exception e) {
+ System.err.println("Caught " + e.getMessage());
+ }
+ }
+
+ final Properties config = new Properties();
+ config.setProperty(
+ StreamsConfig.APPLICATION_ID_CONFIG,
+ "StreamsUpgradeTest");
+ config.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
+ config.putAll(streamsProperties);
+
+ final KafkaStreams streams = new KafkaStreams(builder.build(), config);
+ streams.start();
+
+ Runtime.getRuntime().addShutdownHook(new Thread(() -> {
+ streams.close();
+ System.out.println("UPGRADE-TEST-CLIENT-CLOSED");
+ System.out.flush();
+ }));
+ }
+
+ private static void buildFKTable(final KStream<String, Integer>
primaryTable,
+ final KTable<Integer, String> otherTable)
{
+ final KStream<String, String> kStream = primaryTable.toTable()
+ .join(otherTable, v -> v, (k0, v0) -> v0)
+ .toStream();
+ kStream.process(printProcessorSupplier("fk"));
+ kStream.to("fk-result", Produced.with(stringSerde, stringSerde));
+ }
+
+ private static <KIn, VIn, KOut, VOut> ProcessorSupplier<KIn, VIn, KOut,
VOut> printProcessorSupplier(final String topic) {
+ return () -> new ContextualProcessor<KIn, VIn, KOut, VOut>() {
+ private int numRecordsProcessed = 0;
+
+ @Override
+ public void init(final ProcessorContext<KOut, VOut> context) {
+ System.out.println("[4.1] initializing processor: topic=" +
topic + "taskId=" + context.taskId());
+ numRecordsProcessed = 0;
+ }
+
+ @Override
+ public void process(final Record<KIn, VIn> record) {
+ numRecordsProcessed++;
+ if (numRecordsProcessed % 100 == 0) {
+ System.out.println("processed " + numRecordsProcessed + "
records from topic=" + topic);
+ }
+ }
+
+ @Override
+ public void close() {}
+ };
+ }
+}
diff --git a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
index 6831ec896e8..dcccc65f551 100644
--- a/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_application_upgrade_test.py
@@ -22,7 +22,7 @@ from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsSmokeTestDriverService,
StreamsSmokeTestJobRunnerService
from kafkatest.version import LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5,
LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5,
LATEST_3_6, LATEST_3_7, LATEST_3_8, \
- LATEST_3_9, LATEST_4_0, DEV_VERSION, KafkaVersion
+ LATEST_3_9, LATEST_4_0, LATEST_4_1, DEV_VERSION, KafkaVersion
smoke_test_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6),
@@ -30,7 +30,7 @@ smoke_test_versions = [str(LATEST_2_4), str(LATEST_2_5),
str(LATEST_2_6),
str(LATEST_3_1), str(LATEST_3_2), str(LATEST_3_3),
str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6),
str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9),
- str(LATEST_4_0)]
+ str(LATEST_4_0), str(LATEST_4_1)]
class StreamsUpgradeTest(Test):
"""
diff --git a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
index 168da145876..b32187d84d4 100644
--- a/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
+++ b/tests/kafkatest/tests/streams/streams_broker_compatibility_test.py
@@ -22,7 +22,7 @@ from kafkatest.services.kafka import KafkaService, quorum
from kafkatest.services.streams import StreamsBrokerCompatibilityService
from kafkatest.services.verifiable_consumer import VerifiableConsumer
from kafkatest.version import LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3,
LATEST_3_4, LATEST_3_5, LATEST_3_6, \
- LATEST_3_7, LATEST_3_8, LATEST_3_9, LATEST_4_0, KafkaVersion
+ LATEST_3_7, LATEST_3_8, LATEST_3_9, LATEST_4_0, LATEST_4_1, KafkaVersion
class StreamsBrokerCompatibility(Test):
@@ -57,7 +57,7 @@ class StreamsBrokerCompatibility(Test):
@cluster(num_nodes=4)
@matrix(broker_version=[str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
- str(LATEST_3_8),str(LATEST_3_9),str(LATEST_4_0)],
+
str(LATEST_3_8),str(LATEST_3_9),str(LATEST_4_0),str(LATEST_4_1)],
metadata_quorum=[quorum.combined_kraft]
)
def test_compatible_brokers_eos_disabled(self, broker_version,
metadata_quorum):
@@ -79,7 +79,7 @@ class StreamsBrokerCompatibility(Test):
@cluster(num_nodes=4)
@matrix(broker_version=[str(LATEST_3_0),str(LATEST_3_1),str(LATEST_3_2),str(LATEST_3_3),
str(LATEST_3_4),str(LATEST_3_5),str(LATEST_3_6),str(LATEST_3_7),
- str(LATEST_3_8),str(LATEST_3_9),str(LATEST_4_0)],
+
str(LATEST_3_8),str(LATEST_3_9),str(LATEST_4_0),str(LATEST_4_1)],
metadata_quorum=[quorum.combined_kraft])
def test_compatible_brokers_eos_v2_enabled(self, broker_version,
metadata_quorum):
self.kafka.set_version(KafkaVersion(broker_version))
diff --git a/tests/kafkatest/tests/streams/streams_upgrade_test.py
b/tests/kafkatest/tests/streams/streams_upgrade_test.py
index 1bb223fcb2a..6b7041167ca 100644
--- a/tests/kafkatest/tests/streams/streams_upgrade_test.py
+++ b/tests/kafkatest/tests/streams/streams_upgrade_test.py
@@ -24,14 +24,14 @@ from kafkatest.services.streams import
StreamsSmokeTestDriverService, StreamsSmo
from kafkatest.tests.streams.utils import extract_generation_from_logs,
extract_generation_id
from kafkatest.version import LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4,
LATEST_2_5, LATEST_2_6, LATEST_2_7, LATEST_2_8, \
LATEST_3_0, LATEST_3_1, LATEST_3_2, LATEST_3_3, LATEST_3_4, LATEST_3_5,
LATEST_3_6, LATEST_3_7, LATEST_3_8, LATEST_3_9, \
- LATEST_4_0, DEV_BRANCH, DEV_VERSION, KafkaVersion
+ LATEST_4_0, LATEST_4_1, DEV_BRANCH, DEV_VERSION, KafkaVersion
# broker 0.10.0 is not compatible with newer Kafka Streams versions
# broker 0.10.1 and 0.10.2 do not support headers, as required by suppress()
(since v2.2.1)
broker_upgrade_versions = [str(LATEST_2_8), str(LATEST_3_0), str(LATEST_3_1),
str(LATEST_3_2),
str(LATEST_3_3), str(LATEST_3_4), str(LATEST_3_5),
str(LATEST_3_6),
str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9),
str(LATEST_4_0),
- str(DEV_BRANCH)]
+ str(LATEST_4_1), str(DEV_BRANCH)]
metadata_2_versions = [str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6),
str(LATEST_2_7), str(LATEST_2_8),
str(LATEST_3_0), str(LATEST_3_1), str(LATEST_3_2),
str(LATEST_3_3)]
@@ -39,7 +39,7 @@ metadata_2_versions = [str(LATEST_2_4), str(LATEST_2_5),
str(LATEST_2_6), str(LA
# -> https://issues.apache.org/jira/browse/KAFKA-14646
# thus, we cannot test two bounce rolling upgrade because we know it's broken
# instead we add version 2.4...3.3 to the `metadata_2_versions` upgrade list
-fk_join_versions = [str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6),
str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), str(LATEST_4_0)]
+fk_join_versions = [str(LATEST_3_4), str(LATEST_3_5), str(LATEST_3_6),
str(LATEST_3_7), str(LATEST_3_8), str(LATEST_3_9), str(LATEST_4_0),
str(LATEST_4_1)]
"""