This is an automated email from the ASF dual-hosted git repository.

vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.7 by this push:
     new 190acf7  KAFKA-10605: Deprecate old PAPI registration methods (#9448)
190acf7 is described below

commit 190acf7453fe3b58d46138611055c188b4752c97
Author: John Roesler <[email protected]>
AuthorDate: Mon Oct 19 15:29:27 2020 -0500

    KAFKA-10605: Deprecate old PAPI registration methods (#9448)
    
    Add deprecation annotations to the methods replaced in KIP-478.
    
    Reviewers: Bill Bejeck <[email protected]>
---
 .../examples/wordcount/WordCountProcessorDemo.java | 32 ++++++++++----------
 .../examples/wordcount/WordCountProcessorTest.java | 34 +++++++++++-----------
 .../java/org/apache/kafka/streams/Topology.java    |  6 ++++
 .../apache/kafka/streams/TopologyDescription.java  |  2 +-
 .../org/apache/kafka/streams/KafkaStreamsTest.java |  1 +
 .../org/apache/kafka/streams/TopologyTest.java     |  2 ++
 .../processor/internals/ProcessorTopologyTest.java |  7 +++++
 7 files changed, 49 insertions(+), 35 deletions(-)

diff --git 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index c3f47da..646a016 100644
--- 
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++ 
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -22,10 +22,11 @@ import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueIterator;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
@@ -54,17 +55,17 @@ import java.util.concurrent.CountDownLatch;
  */
 public final class WordCountProcessorDemo {
 
-    static class MyProcessorSupplier implements ProcessorSupplier<String, 
String> {
+    static class MyProcessorSupplier implements ProcessorSupplier<String, 
String, String, String> {
 
         @Override
-        public Processor<String, String> get() {
-            return new Processor<String, String>() {
+        public Processor<String, String, String, String> get() {
+            return new Processor<String, String, String, String>() {
                 private ProcessorContext context;
                 private KeyValueStore<String, Integer> kvStore;
 
                 @Override
                 @SuppressWarnings("unchecked")
-                public void init(final ProcessorContext context) {
+                public void init(final ProcessorContext<String, String> 
context) {
                     this.context = context;
                     this.context.schedule(Duration.ofSeconds(1), 
PunctuationType.STREAM_TIME, timestamp -> {
                         try (final KeyValueIterator<String, Integer> iter = 
kvStore.all()) {
@@ -75,30 +76,27 @@ public final class WordCountProcessorDemo {
 
                                 System.out.println("[" + entry.key + ", " + 
entry.value + "]");
 
-                                context.forward(entry.key, 
entry.value.toString());
+                                context.forward(new Record<>(entry.key, 
entry.value.toString(), timestamp));
                             }
                         }
                     });
-                    this.kvStore = (KeyValueStore<String, Integer>) 
context.getStateStore("Counts");
+                    kvStore = context.getStateStore("Counts");
                 }
 
                 @Override
-                public void process(final String dummy, final String line) {
-                    final String[] words = 
line.toLowerCase(Locale.getDefault()).split(" ");
+                public void process(final Record<String, String> record) {
+                    final String[] words = 
record.value().toLowerCase(Locale.getDefault()).split(" ");
 
                     for (final String word : words) {
-                        final Integer oldValue = this.kvStore.get(word);
+                        final Integer oldValue = kvStore.get(word);
 
                         if (oldValue == null) {
-                            this.kvStore.put(word, 1);
+                            kvStore.put(word, 1);
                         } else {
-                            this.kvStore.put(word, oldValue + 1);
+                            kvStore.put(word, oldValue + 1);
                         }
                     }
                 }
-
-                @Override
-                public void close() {}
             };
         }
     }
diff --git 
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
 
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
index ba52990..f75d635 100644
--- 
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
+++ 
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -17,27 +17,27 @@
 package org.apache.kafka.streams.examples.wordcount;
 
 import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.MockProcessorContext;
-import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.Stores;
 import org.junit.Test;
 
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.List;
 
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.Assert.assertTrue;
 
 /**
  * Demonstrate the use of {@link MockProcessorContext} for testing the {@link 
Processor} in the {@link WordCountProcessorDemo}.
  */
 public class WordCountProcessorTest {
-    @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
     @Test
     public void test() {
-        final MockProcessorContext context = new MockProcessorContext();
+        final MockProcessorContext<String, String> context = new 
MockProcessorContext<String, String>();
 
         // Create, initialize, and register the state store.
         final KeyValueStore<String, Integer> store =
@@ -45,15 +45,14 @@ public class WordCountProcessorTest {
                 .withLoggingDisabled() // Changelog is not supported by 
MockProcessorContext.
                 // Caching is disabled by default, but FYI: caching is also 
not supported by MockProcessorContext.
                 .build();
-        store.init(context, store);
-        context.register(store, null);
+        store.init(context.getStateStoreContext(), store);
 
         // Create and initialize the processor under test
-        final Processor<String, String> processor = new 
WordCountProcessorDemo.MyProcessorSupplier().get();
+        final Processor<String, String, String, String> processor = new 
WordCountProcessorDemo.MyProcessorSupplier().get();
         processor.init(context);
 
         // send a record to the processor
-        processor.process("key", "alpha beta gamma alpha");
+        processor.process(new Record<>("key", "alpha beta gamma alpha", 0L));
 
         // note that the processor does not forward during process()
         assertTrue(context.forwarded().isEmpty());
@@ -62,10 +61,11 @@ public class WordCountProcessorTest {
         context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
 
         // finally, we can verify the output.
-        final Iterator<MockProcessorContext.CapturedForward> capturedForwards 
= context.forwarded().iterator();
-        assertEquals(new KeyValue<>("alpha", "2"), 
capturedForwards.next().keyValue());
-        assertEquals(new KeyValue<>("beta", "1"), 
capturedForwards.next().keyValue());
-        assertEquals(new KeyValue<>("gamma", "1"), 
capturedForwards.next().keyValue());
-        assertFalse(capturedForwards.hasNext());
+        final List<MockProcessorContext.CapturedForward<String, String>> 
expected = Arrays.asList(
+            new MockProcessorContext.CapturedForward<>(new Record<>("alpha", 
"2", 0L)),
+            new MockProcessorContext.CapturedForward<>(new Record<>("beta", 
"1", 0L)),
+            new MockProcessorContext.CapturedForward<>(new Record<>("gamma", 
"1", 0L))
+        );
+        assertThat(context.forwarded(), is(expected));
     }
 }
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java 
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 9a1cf42..8753e54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -655,8 +655,10 @@ public class Topology {
      * and process
      * @return itself
      * @throws TopologyException if parent processor is not added yet, or if 
this processor's name is equal to the parent's name
+     * @deprecated Since 2.7.0 Use {@link this#addProcessor(String, 
ProcessorSupplier, String...)} instead.
      */
     @SuppressWarnings("rawtypes")
+    @Deprecated
     public synchronized Topology addProcessor(final String name,
                                               final 
org.apache.kafka.streams.processor.ProcessorSupplier supplier,
                                               final String... parentNames) {
@@ -740,7 +742,9 @@ public class Topology {
      * @param stateUpdateSupplier   the instance of {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already 
registered
+     * @deprecated Since 2.7.0. Use {@link this#addGlobalStore(StoreBuilder, 
String, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead.
      */
+    @Deprecated
     public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> 
storeBuilder,
                                                        final String sourceName,
                                                        final Deserializer<K> 
keyDeserializer,
@@ -784,7 +788,9 @@ public class Topology {
      * @param stateUpdateSupplier   the instance of {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
      * @return itself
      * @throws TopologyException if the processor of state is already 
registered
+     * @deprecated Since 2.7.0. Use {@link this#addGlobalStore(StoreBuilder, 
String, TimestampExtractor, Deserializer, Deserializer, String, String, 
ProcessorSupplier)} instead.
      */
+    @Deprecated
     public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?> 
storeBuilder,
                                                        final String sourceName,
                                                        final 
TimestampExtractor timestampExtractor,
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java 
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index a90f782..5d9e1cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -60,7 +60,7 @@ public interface TopologyDescription {
     /**
      * Represents a {@link 
Topology#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder, String,
      * org.apache.kafka.common.serialization.Deserializer, 
org.apache.kafka.common.serialization.Deserializer, String,
-     * String, org.apache.kafka.streams.processor.ProcessorSupplier) global 
store}.
+     * String, org.apache.kafka.streams.processor.api.ProcessorSupplier) 
global store}.
      * Adding a global store results in adding a source node and one stateful 
processor node.
      * Note, that all added global stores form a single unit (similar to a 
{@link Subtopology}) even if different
      * global stores are not connected to each other.
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java 
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index a47998e..f259696 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -937,6 +937,7 @@ public class KafkaStreamsTest {
     }
 
     @SuppressWarnings("unchecked")
+    @Deprecated // testing old PAPI
     private Topology getStatefulTopology(final String inputTopic,
                                          final String outputTopic,
                                          final String globalTopicName,
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java 
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index f1a7749..9ecab31 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -382,6 +382,7 @@ public class TopologyTest {
         }
     }
 
+    @Deprecated // testing old PAPI
     @Test(expected = TopologyException.class)
     public void 
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
         
EasyMock.expect(globalStoreBuilder.name()).andReturn("anyName").anyTimes();
@@ -1227,6 +1228,7 @@ public class TopologyTest {
         return expectedSinkNode;
     }
 
+    @Deprecated // testing old PAPI
     private void addGlobalStoreToTopologyAndExpectedDescription(final String 
globalStoreName,
                                                                 final String 
sourceName,
                                                                 final String 
globalTopicName,
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index bb63637..b69ae87 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -293,6 +293,7 @@ public class ProcessorTopologyTest {
         assertNull(store.get("key4"));
     }
 
+    @Deprecated // testing old PAPI
     @Test
     public void 
testDrivingConnectedStateStoreInDifferentProcessorsTopologyWithOldAPI() {
         final String storeName = "connectedStore";
@@ -355,6 +356,7 @@ public class ProcessorTopologyTest {
         assertNull(store.get("key4"));
     }
 
+    @Deprecated // testing old PAPI
     @Test
     public void shouldDriveGlobalStore() {
         final String storeName = "my-store";
@@ -621,6 +623,7 @@ public class ProcessorTopologyTest {
         return topology.getInternalBuilder("anyAppId").buildTopology();
     }
 
+    @Deprecated // testing old PAPI
     private ProcessorTopology createGlobalStoreTopology(final 
KeyValueBytesStoreSupplier storeSupplier) {
         final TopologyWrapper topology = new TopologyWrapper();
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
@@ -682,6 +685,7 @@ public class ProcessorTopologyTest {
             .addSink("sink2", OUTPUT_TOPIC_2, constantPartitioner(partition), 
"child2");
     }
 
+    @Deprecated // testing old PAPI
     private Topology createMultiplexingTopology() {
         return topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
@@ -690,6 +694,7 @@ public class ProcessorTopologyTest {
             .addSink("sink2", OUTPUT_TOPIC_2, "processor");
     }
 
+    @Deprecated // testing old PAPI
     private Topology createMultiplexByNameTopology() {
         return topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
@@ -698,6 +703,7 @@ public class ProcessorTopologyTest {
             .addSink("sink1", OUTPUT_TOPIC_2, "processor");
     }
 
+    @Deprecated // testing old PAPI
     private Topology createStatefulTopology(final String storeName) {
         return topology
             .addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER, 
INPUT_TOPIC_1)
@@ -706,6 +712,7 @@ public class ProcessorTopologyTest {
             .addSink("counts", OUTPUT_TOPIC_1, "processor");
     }
 
+    @Deprecated // testing old PAPI
     private Topology createConnectedStateStoreTopology(final String storeName) 
{
         final StoreBuilder<KeyValueStore<String, String>> storeBuilder = 
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName), 
Serdes.String(), Serdes.String());
         return topology

Reply via email to