This is an automated email from the ASF dual-hosted git repository.
vvcephei pushed a commit to branch 2.7
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.7 by this push:
new 190acf7 KAFKA-10605: Deprecate old PAPI registration methods (#9448)
190acf7 is described below
commit 190acf7453fe3b58d46138611055c188b4752c97
Author: John Roesler <[email protected]>
AuthorDate: Mon Oct 19 15:29:27 2020 -0500
KAFKA-10605: Deprecate old PAPI registration methods (#9448)
Add deprecation annotations to the methods replaced in KIP-478.
Reviewers: Bill Bejeck <[email protected]>
---
.../examples/wordcount/WordCountProcessorDemo.java | 32 ++++++++++----------
.../examples/wordcount/WordCountProcessorTest.java | 34 +++++++++++-----------
.../java/org/apache/kafka/streams/Topology.java | 6 ++++
.../apache/kafka/streams/TopologyDescription.java | 2 +-
.../org/apache/kafka/streams/KafkaStreamsTest.java | 1 +
.../org/apache/kafka/streams/TopologyTest.java | 2 ++
.../processor/internals/ProcessorTopologyTest.java | 7 +++++
7 files changed, 49 insertions(+), 35 deletions(-)
diff --git
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
index c3f47da..646a016 100644
---
a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
+++
b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java
@@ -22,10 +22,11 @@ import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
-import org.apache.kafka.streams.processor.Processor;
-import org.apache.kafka.streams.processor.ProcessorContext;
-import org.apache.kafka.streams.processor.ProcessorSupplier;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
@@ -54,17 +55,17 @@ import java.util.concurrent.CountDownLatch;
*/
public final class WordCountProcessorDemo {
- static class MyProcessorSupplier implements ProcessorSupplier<String,
String> {
+ static class MyProcessorSupplier implements ProcessorSupplier<String,
String, String, String> {
@Override
- public Processor<String, String> get() {
- return new Processor<String, String>() {
+ public Processor<String, String, String, String> get() {
+ return new Processor<String, String, String, String>() {
private ProcessorContext context;
private KeyValueStore<String, Integer> kvStore;
@Override
@SuppressWarnings("unchecked")
- public void init(final ProcessorContext context) {
+ public void init(final ProcessorContext<String, String>
context) {
this.context = context;
this.context.schedule(Duration.ofSeconds(1),
PunctuationType.STREAM_TIME, timestamp -> {
try (final KeyValueIterator<String, Integer> iter =
kvStore.all()) {
@@ -75,30 +76,27 @@ public final class WordCountProcessorDemo {
System.out.println("[" + entry.key + ", " +
entry.value + "]");
- context.forward(entry.key,
entry.value.toString());
+ context.forward(new Record<>(entry.key,
entry.value.toString(), timestamp));
}
}
});
- this.kvStore = (KeyValueStore<String, Integer>)
context.getStateStore("Counts");
+ kvStore = context.getStateStore("Counts");
}
@Override
- public void process(final String dummy, final String line) {
- final String[] words =
line.toLowerCase(Locale.getDefault()).split(" ");
+ public void process(final Record<String, String> record) {
+ final String[] words =
record.value().toLowerCase(Locale.getDefault()).split(" ");
for (final String word : words) {
- final Integer oldValue = this.kvStore.get(word);
+ final Integer oldValue = kvStore.get(word);
if (oldValue == null) {
- this.kvStore.put(word, 1);
+ kvStore.put(word, 1);
} else {
- this.kvStore.put(word, oldValue + 1);
+ kvStore.put(word, oldValue + 1);
}
}
}
-
- @Override
- public void close() {}
};
}
}
diff --git
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
index ba52990..f75d635 100644
---
a/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
+++
b/streams/examples/src/test/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorTest.java
@@ -17,27 +17,27 @@
package org.apache.kafka.streams.examples.wordcount;
import org.apache.kafka.common.serialization.Serdes;
-import org.apache.kafka.streams.KeyValue;
-import org.apache.kafka.streams.processor.MockProcessorContext;
-import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.api.MockProcessorContext;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.junit.Test;
-import java.util.Iterator;
+import java.util.Arrays;
+import java.util.List;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertTrue;
/**
* Demonstrate the use of {@link MockProcessorContext} for testing the {@link
Processor} in the {@link WordCountProcessorDemo}.
*/
public class WordCountProcessorTest {
- @SuppressWarnings("deprecation") // TODO will be fixed in KAFKA-10437
@Test
public void test() {
- final MockProcessorContext context = new MockProcessorContext();
+ final MockProcessorContext<String, String> context = new
MockProcessorContext<String, String>();
// Create, initialize, and register the state store.
final KeyValueStore<String, Integer> store =
@@ -45,15 +45,14 @@ public class WordCountProcessorTest {
.withLoggingDisabled() // Changelog is not supported by
MockProcessorContext.
// Caching is disabled by default, but FYI: caching is also
not supported by MockProcessorContext.
.build();
- store.init(context, store);
- context.register(store, null);
+ store.init(context.getStateStoreContext(), store);
// Create and initialize the processor under test
- final Processor<String, String> processor = new
WordCountProcessorDemo.MyProcessorSupplier().get();
+ final Processor<String, String, String, String> processor = new
WordCountProcessorDemo.MyProcessorSupplier().get();
processor.init(context);
// send a record to the processor
- processor.process("key", "alpha beta gamma alpha");
+ processor.process(new Record<>("key", "alpha beta gamma alpha", 0L));
// note that the processor does not forward during process()
assertTrue(context.forwarded().isEmpty());
@@ -62,10 +61,11 @@ public class WordCountProcessorTest {
context.scheduledPunctuators().get(0).getPunctuator().punctuate(0L);
// finally, we can verify the output.
- final Iterator<MockProcessorContext.CapturedForward> capturedForwards
= context.forwarded().iterator();
- assertEquals(new KeyValue<>("alpha", "2"),
capturedForwards.next().keyValue());
- assertEquals(new KeyValue<>("beta", "1"),
capturedForwards.next().keyValue());
- assertEquals(new KeyValue<>("gamma", "1"),
capturedForwards.next().keyValue());
- assertFalse(capturedForwards.hasNext());
+ final List<MockProcessorContext.CapturedForward<String, String>>
expected = Arrays.asList(
+ new MockProcessorContext.CapturedForward<>(new Record<>("alpha",
"2", 0L)),
+ new MockProcessorContext.CapturedForward<>(new Record<>("beta",
"1", 0L)),
+ new MockProcessorContext.CapturedForward<>(new Record<>("gamma",
"1", 0L))
+ );
+ assertThat(context.forwarded(), is(expected));
}
}
diff --git a/streams/src/main/java/org/apache/kafka/streams/Topology.java
b/streams/src/main/java/org/apache/kafka/streams/Topology.java
index 9a1cf42..8753e54 100644
--- a/streams/src/main/java/org/apache/kafka/streams/Topology.java
+++ b/streams/src/main/java/org/apache/kafka/streams/Topology.java
@@ -655,8 +655,10 @@ public class Topology {
* and process
* @return itself
* @throws TopologyException if parent processor is not added yet, or if
this processor's name is equal to the parent's name
+ * @deprecated Since 2.7.0 Use {@link this#addProcessor(String,
ProcessorSupplier, String...)} instead.
*/
@SuppressWarnings("rawtypes")
+ @Deprecated
public synchronized Topology addProcessor(final String name,
final
org.apache.kafka.streams.processor.ProcessorSupplier supplier,
final String... parentNames) {
@@ -740,7 +742,9 @@ public class Topology {
* @param stateUpdateSupplier the instance of {@link
org.apache.kafka.streams.processor.ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already
registered
+ * @deprecated Since 2.7.0. Use {@link this#addGlobalStore(StoreBuilder,
String, Deserializer, Deserializer, String, String, ProcessorSupplier)} instead.
*/
+ @Deprecated
public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?>
storeBuilder,
final String sourceName,
final Deserializer<K>
keyDeserializer,
@@ -784,7 +788,9 @@ public class Topology {
* @param stateUpdateSupplier the instance of {@link
org.apache.kafka.streams.processor.ProcessorSupplier}
* @return itself
* @throws TopologyException if the processor of state is already
registered
+ * @deprecated Since 2.7.0. Use {@link this#addGlobalStore(StoreBuilder,
String, TimestampExtractor, Deserializer, Deserializer, String, String,
ProcessorSupplier)} instead.
*/
+ @Deprecated
public synchronized <K, V> Topology addGlobalStore(final StoreBuilder<?>
storeBuilder,
final String sourceName,
final
TimestampExtractor timestampExtractor,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
index a90f782..5d9e1cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
+++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java
@@ -60,7 +60,7 @@ public interface TopologyDescription {
/**
* Represents a {@link
Topology#addGlobalStore(org.apache.kafka.streams.state.StoreBuilder, String,
* org.apache.kafka.common.serialization.Deserializer,
org.apache.kafka.common.serialization.Deserializer, String,
- * String, org.apache.kafka.streams.processor.ProcessorSupplier) global
store}.
+ * String, org.apache.kafka.streams.processor.api.ProcessorSupplier)
global store}.
* Adding a global store results in adding a source node and one stateful
processor node.
* Note, that all added global stores form a single unit (similar to a
{@link Subtopology}) even if different
* global stores are not connected to each other.
diff --git
a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
index a47998e..f259696 100644
--- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java
@@ -937,6 +937,7 @@ public class KafkaStreamsTest {
}
@SuppressWarnings("unchecked")
+ @Deprecated // testing old PAPI
private Topology getStatefulTopology(final String inputTopic,
final String outputTopic,
final String globalTopicName,
diff --git a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
index f1a7749..9ecab31 100644
--- a/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
@@ -382,6 +382,7 @@ public class TopologyTest {
}
}
+ @Deprecated // testing old PAPI
@Test(expected = TopologyException.class)
public void
shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() {
EasyMock.expect(globalStoreBuilder.name()).andReturn("anyName").anyTimes();
@@ -1227,6 +1228,7 @@ public class TopologyTest {
return expectedSinkNode;
}
+ @Deprecated // testing old PAPI
private void addGlobalStoreToTopologyAndExpectedDescription(final String
globalStoreName,
final String
sourceName,
final String
globalTopicName,
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
index bb63637..b69ae87 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java
@@ -293,6 +293,7 @@ public class ProcessorTopologyTest {
assertNull(store.get("key4"));
}
+ @Deprecated // testing old PAPI
@Test
public void
testDrivingConnectedStateStoreInDifferentProcessorsTopologyWithOldAPI() {
final String storeName = "connectedStore";
@@ -355,6 +356,7 @@ public class ProcessorTopologyTest {
assertNull(store.get("key4"));
}
+ @Deprecated // testing old PAPI
@Test
public void shouldDriveGlobalStore() {
final String storeName = "my-store";
@@ -621,6 +623,7 @@ public class ProcessorTopologyTest {
return topology.getInternalBuilder("anyAppId").buildTopology();
}
+ @Deprecated // testing old PAPI
private ProcessorTopology createGlobalStoreTopology(final
KeyValueBytesStoreSupplier storeSupplier) {
final TopologyWrapper topology = new TopologyWrapper();
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
@@ -682,6 +685,7 @@ public class ProcessorTopologyTest {
.addSink("sink2", OUTPUT_TOPIC_2, constantPartitioner(partition),
"child2");
}
+ @Deprecated // testing old PAPI
private Topology createMultiplexingTopology() {
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
@@ -690,6 +694,7 @@ public class ProcessorTopologyTest {
.addSink("sink2", OUTPUT_TOPIC_2, "processor");
}
+ @Deprecated // testing old PAPI
private Topology createMultiplexByNameTopology() {
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
@@ -698,6 +703,7 @@ public class ProcessorTopologyTest {
.addSink("sink1", OUTPUT_TOPIC_2, "processor");
}
+ @Deprecated // testing old PAPI
private Topology createStatefulTopology(final String storeName) {
return topology
.addSource("source", STRING_DESERIALIZER, STRING_DESERIALIZER,
INPUT_TOPIC_1)
@@ -706,6 +712,7 @@ public class ProcessorTopologyTest {
.addSink("counts", OUTPUT_TOPIC_1, "processor");
}
+ @Deprecated // testing old PAPI
private Topology createConnectedStateStoreTopology(final String storeName)
{
final StoreBuilder<KeyValueStore<String, String>> storeBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(storeName),
Serdes.String(), Serdes.String());
return topology