This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1528264f021 MINOR: update EosIntegrationTest (#16697)
1528264f021 is described below

commit 1528264f0217820a86f01964219dbe11b3a60c0e
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Wed Jul 31 11:25:44 2024 -0700

    MINOR: update EosIntegrationTest (#16697)
    
    Refactor test to move off deprecated `transform()` in favor of
    `process()`.
    
    Reviewers: Bill Bejeck <b...@confluent.io>
---
 .../streams/integration/EosIntegrationTest.java    | 125 ++++++++++-----------
 1 file changed, 59 insertions(+), 66 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
index 38e7e5cc0ca..94e48ee3d49 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
@@ -43,12 +43,10 @@ import 
org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
 import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
-import org.apache.kafka.streams.kstream.Transformer;
-import org.apache.kafka.streams.kstream.TransformerSupplier;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateRestoreListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.StreamThread;
 import org.apache.kafka.streams.query.QueryResult;
@@ -863,8 +861,8 @@ public class EosIntegrationTest {
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, 
applicationId);
         streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.Integer().getClass());
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.IntegerSerde.class);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
eosConfig);
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1);
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
@@ -901,10 +899,10 @@ public class EosIntegrationTest {
             .addSource("source", MULTI_PARTITION_INPUT_TOPIC)
             .addProcessor("processor", () -> new Processor<Integer, String, 
Integer, String>() {
                 KeyValueStore<Integer, String> stateStore;
-                
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context;
+                ProcessorContext<Integer, String> context;
 
                 @Override
-                public void init(final 
org.apache.kafka.streams.processor.api.ProcessorContext<Integer, String> 
context) {
+                public void init(final ProcessorContext<Integer, String> 
context) {
                     Processor.super.init(context);
                     this.context = context;
                     stateStore = context.getStateStore(stateStoreName);
@@ -1014,7 +1012,7 @@ public class EosIntegrationTest {
             final long topicEndOffset = consumer.position(tp);
 
             assertTrue(topicEndOffset >= checkpointedOffset,
-                    "changelog topic end " + topicEndOffset + " is less than 
checkpointed offset " + checkpointedOffset);
+                "changelog topic end " + topicEndOffset + " is less than 
checkpointed offset " + checkpointedOffset);
 
             consumer.seekToBeginning(partitions);
 
@@ -1070,80 +1068,75 @@ public class EosIntegrationTest {
         }
 
         final KStream<Long, Long> input = 
builder.stream(MULTI_PARTITION_INPUT_TOPIC);
-        input.transform(new TransformerSupplier<Long, Long, KeyValue<Long, 
Long>>() {
-            @SuppressWarnings("unchecked")
-            @Override
-            public Transformer<Long, Long, KeyValue<Long, Long>> get() {
-                return new Transformer<Long, Long, KeyValue<Long, Long>>() {
-                    ProcessorContext context;
-                    KeyValueStore<Long, Long> state = null;
+        input.process(() -> new Processor<Long, Long, Long, Long>() {
+                ProcessorContext<Long, Long> context;
+                KeyValueStore<Long, Long> state = null;
 
-                    @Override
-                    public void init(final ProcessorContext context) {
-                        this.context = context;
+                @Override
+                public void init(final ProcessorContext<Long, Long> context) {
+                    this.context = context;
 
-                        if (withState) {
-                            state = context.getStateStore(storeName);
-                        }
+                    if (withState) {
+                        state = context.getStateStore(storeName);
                     }
+                }
 
-                    @Override
-                    public KeyValue<Long, Long> transform(final Long key, 
final Long value) {
-                        if (stallInjected.compareAndSet(true, false)) {
-                            LOG.info(dummyHostName + " is executing the 
injected stall");
-                            stallingHost.set(dummyHostName);
-                            while (doStall) {
-                                final Thread thread = Thread.currentThread();
-                                if (thread.isInterrupted()) {
+                @Override
+                public void process(final Record<Long, Long> record) {
+                    if (stallInjected.compareAndSet(true, false)) {
+                        LOG.info(dummyHostName + " is executing the injected 
stall");
+                        stallingHost.set(dummyHostName);
+                        while (doStall) {
+                            final Thread thread = Thread.currentThread();
+                            if (thread.isInterrupted()) {
+                                throw new RuntimeException("Detected we've 
been interrupted.");
+                            }
+                            if (!processingThreadsEnabled) {
+                                if (!((StreamThread) thread).isRunning()) {
                                     throw new RuntimeException("Detected we've 
been interrupted.");
                                 }
-                                if (!processingThreadsEnabled) {
-                                    if (!((StreamThread) thread).isRunning()) {
-                                        throw new RuntimeException("Detected 
we've been interrupted.");
-                                    }
-                                }
-                                try {
-                                    Thread.sleep(100);
-                                } catch (final InterruptedException e) {
-                                    throw new RuntimeException(e);
-                                }
                             }
-                        }
-
-                        if ((value + 1) % 10 == 0) {
-                            context.commit();
-                            commitRequested.incrementAndGet();
-                        }
-
-                        if (state != null) {
-                            Long sum = state.get(key);
-
-                            if (sum == null) {
-                                sum = value;
-                            } else {
-                                sum += value;
+                            try {
+                                Thread.sleep(100);
+                            } catch (final InterruptedException e) {
+                                throw new RuntimeException(e);
                             }
-                            state.put(key, sum);
-                            state.flush();
                         }
+                    }
 
+                    final long key = record.key();
+                    final long value = record.value();
 
-                        if (errorInjected.compareAndSet(true, false)) {
-                            // only tries to fail once on one of the task
-                            throw new RuntimeException("Injected test 
exception.");
-                        }
+                    if ((value + 1) % 10 == 0) {
+                        context.commit();
+                        commitRequested.incrementAndGet();
+                    }
+
+                    if (state != null) {
+                        Long sum = state.get(key);
 
-                        if (state != null) {
-                            return new KeyValue<>(key, state.get(key));
+                        if (sum == null) {
+                            sum = value;
                         } else {
-                            return new KeyValue<>(key, value);
+                            sum += value;
                         }
+                        state.put(key, sum);
+                        state.flush();
                     }
 
-                    @Override
-                    public void close() { }
-                };
-            } }, storeNames)
+
+                    if (errorInjected.compareAndSet(true, false)) {
+                        // only tries to fail once on one of the task
+                        throw new RuntimeException("Injected test exception.");
+                    }
+
+                    if (state != null) {
+                        context.forward(record.withValue(state.get(key)));
+                    } else {
+                        context.forward(record);
+                    }
+                }
+            }, storeNames)
             .to(SINGLE_PARTITION_OUTPUT_TOPIC);
 
         stateTmpDir = TestUtils.tempDirectory().getPath() + File.separator;

Reply via email to