This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b6c1cb0eec4 MINOR: update CachingPersistentWindowStoreTest (#16701)
b6c1cb0eec4 is described below

commit b6c1cb0eec4cad56f8db6eba05d24f6b3c44a211
Author: Matthias J. Sax <matth...@confluent.io>
AuthorDate: Mon Jul 29 12:45:13 2024 -0700

    MINOR: update CachingPersistentWindowStoreTest (#16701)
    
    Refactor test to move off deprecated `transform()` in favor of
    `process()`.
    
    Reviewers: Bill Bejeck <b...@confluent.io>
---
 .../CachingPersistentWindowStoreTest.java          | 41 ++++++++++------------
 1 file changed, 18 insertions(+), 23 deletions(-)

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
index 07293d2d4b9..6a055f51cf9 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/state/internals/CachingPersistentWindowStoreTest.java
@@ -32,11 +32,12 @@ import org.apache.kafka.streams.TopologyTestDriver;
 import org.apache.kafka.streams.errors.InvalidStateStoreException;
 import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.TimeWindowedDeserializer;
-import org.apache.kafka.streams.kstream.Transformer;
 import org.apache.kafka.streams.kstream.Windowed;
 import org.apache.kafka.streams.kstream.internals.TimeWindow;
-import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.StateStoreContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
 import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
 import org.apache.kafka.streams.query.Position;
@@ -102,7 +103,7 @@ public class CachingPersistentWindowStoreTest {
     private static final String TOPIC = "topic";
     private static final String CACHE_NAMESPACE = "0_0-store-name";
 
-    private InternalMockProcessorContext context;
+    private InternalMockProcessorContext<?, ?> context;
     private RocksDBSegmentedBytesStore bytesStore;
     private WindowStore<Bytes, byte[]> underlyingStore;
     private CachingWindowStore cachingStore;
@@ -138,8 +139,8 @@ public class CachingPersistentWindowStoreTest {
         final WindowStore<Bytes, byte[]> inner = mock(WindowStore.class);
         final CachingWindowStore outer = new CachingWindowStore(inner, 
WINDOW_SIZE, SEGMENT_INTERVAL);
         when(inner.name()).thenReturn("store");
-        outer.init((ProcessorContext) context, outer);
-        verify(inner).init((ProcessorContext) context, outer);
+        outer.init((org.apache.kafka.streams.processor.ProcessorContext) 
context, outer);
+        
verify(inner).init((org.apache.kafka.streams.processor.ProcessorContext) 
context, outer);
     }
 
     @SuppressWarnings("unchecked")
@@ -153,30 +154,28 @@ public class CachingPersistentWindowStoreTest {
     }
 
     @Test
-    @SuppressWarnings("deprecation")
     public void shouldNotReturnDuplicatesInRanges() {
         final StreamsBuilder builder = new StreamsBuilder();
 
         final StoreBuilder<WindowStore<String, String>> storeBuilder = 
Stores.windowStoreBuilder(
-            Stores.persistentWindowStore("store-name", ofHours(1L), 
ofMinutes(1L), false),
-            Serdes.String(),
-            Serdes.String())
+                Stores.persistentWindowStore("store-name", ofHours(1L), 
ofMinutes(1L), false),
+                Serdes.String(),
+                Serdes.String())
             .withCachingEnabled();
 
         builder.addStateStore(storeBuilder);
 
         builder.stream(TOPIC,
             Consumed.with(Serdes.String(), Serdes.String()))
-            .transform(() -> new Transformer<String, String, KeyValue<String, 
String>>() {
+            .process(() -> new Processor<String, String, String, String>() {
                 private WindowStore<String, String> store;
                 private int numRecordsProcessed;
-                private ProcessorContext context;
+                private ProcessorContext<String, String> context;
 
-                @SuppressWarnings("unchecked")
                 @Override
-                public void init(final ProcessorContext processorContext) {
+                public void init(final ProcessorContext<String, String> 
processorContext) {
                     this.context = processorContext;
-                    this.store = (WindowStore<String, String>) 
processorContext.getStateStore("store-name");
+                    this.store = processorContext.getStateStore("store-name");
                     int count = 0;
 
                     try (final KeyValueIterator<Windowed<String>, String> all 
= store.all()) {
@@ -190,7 +189,7 @@ public class CachingPersistentWindowStoreTest {
                 }
 
                 @Override
-                public KeyValue<String, String> transform(final String key, 
final String value) {
+                public void process(final Record<String, String> record) {
                     int count = 0;
 
                     try (final KeyValueIterator<Windowed<String>, String> all 
= store.all()) {
@@ -202,22 +201,18 @@ public class CachingPersistentWindowStoreTest {
 
                     assertThat(count, equalTo(numRecordsProcessed));
 
-                    store.put(value, value, context.timestamp());
+                    store.put(record.value(), record.value(), 
record.timestamp());
 
                     numRecordsProcessed++;
 
-                    return new KeyValue<>(key, value);
-                }
-
-                @Override
-                public void close() {
+                    context.forward(record);
                 }
             }, "store-name");
 
         final Properties streamsConfiguration = new Properties();
         streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, 
"earliest");
-        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
-        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass().getName());
+        streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
+        
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.StringSerde.class);
         streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG, 
TestUtils.tempDirectory().getPath());
         streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 
1000L);
 

Reply via email to