This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c62c3899aaa KAFKA-12829: Remove deprecated
StreamsBuilder#addGlobalStore of old Processor API (#17059)
c62c3899aaa is described below
commit c62c3899aaa1787b72ec8236e5a2ab290e2989ac
Author: JohnHuang <[email protected]>
AuthorDate: Thu Sep 12 05:22:08 2024 +0800
KAFKA-12829: Remove deprecated StreamsBuilder#addGlobalStore of old
Processor API (#17059)
Reviewers: Chia-Ping Tsai <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../org/apache/kafka/streams/StreamsBuilder.java | 48 ---------------
...amsUncaughtExceptionHandlerIntegrationTest.java | 71 +++++++++++++---------
.../kafka/streams/scala/StreamsBuilder.scala | 23 +------
3 files changed, 44 insertions(+), 98 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index efc66dd35c3..2879436e500 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -33,8 +33,6 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
import org.apache.kafka.streams.processor.internals.SourceNode;
import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -523,52 +521,6 @@ public class StreamsBuilder {
return this;
}
- /**
- * Adds a global {@link StateStore} to the topology.
- * The {@link StateStore} sources its data from all partitions of the
provided input topic.
- * There will be exactly one instance of this {@link StateStore} per Kafka
Streams instance.
- * <p>
- * A {@link SourceNode} with the provided sourceName will be added to
consume the data arriving from the partitions
- * of the input topic.
- * <p>
- * The provided {@link
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an
{@link ProcessorNode} that will receive all
- * records forwarded from the {@link SourceNode}.
- * This {@link ProcessorNode} should be used to keep the {@link
StateStore} up-to-date.
- * The default {@link TimestampExtractor} as specified in the {@link
StreamsConfig config} is used.
- * <p>
- * It is not required to connect a global store to {@link
org.apache.kafka.streams.processor.api.Processor Processors},
- * {@link org.apache.kafka.streams.kstream.Transformer Transformers},
- * or {@link org.apache.kafka.streams.kstream.ValueTransformer
ValueTransformer}; those have read-only access to all global stores by default.
- * <p>
- * The supplier should always generate a new instance each time {@link
ProcessorSupplier#get()} gets called. Creating
- * a single {@link Processor} object and returning the same object
reference in {@link ProcessorSupplier#get()} would be
- * a violation of the supplier pattern and leads to runtime exceptions.
- *
- * @param storeBuilder user defined {@link StoreBuilder}; can't
be {@code null}
- * @param topic the topic to source the data from
- * @param consumed the instance of {@link Consumed} used to
define optional parameters; can't be {@code null}
- * @param stateUpdateSupplier the instance of {@link
org.apache.kafka.streams.processor.ProcessorSupplier}
- * @return itself
- * @throws TopologyException if the processor of state is already
registered
- * @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder,
String, Consumed, ProcessorSupplier)} instead.
- */
- @Deprecated
- public synchronized <K, V> StreamsBuilder addGlobalStore(final
StoreBuilder<?> storeBuilder,
- final String
topic,
- final Consumed<K,
V> consumed,
- final
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier)
{
- Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
- Objects.requireNonNull(consumed, "consumed can't be null");
- internalStreamsBuilder.addGlobalStore(
- new StoreBuilderWrapper(storeBuilder),
- topic,
- new ConsumedInternal<>(consumed),
- () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
- true
- );
- return this;
- }
-
/**
* Adds a global {@link StateStore} to the topology.
* The {@link StateStore} sources its data from all partitions of the
provided input topic.
diff --git
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 005efe92d10..476c1857422 100644
---
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -36,6 +36,9 @@ import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
import org.apache.kafka.streams.state.Stores;
import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
import org.apache.kafka.test.TestUtils;
@@ -127,7 +130,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
outputTopic2 = "output2" + testId;
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic,
inputTopic2, outputTopic, outputTopic2);
final KStream<String, String> stream = builder.stream(inputTopic);
- stream.process(() -> new ShutdownProcessor(processorValueCollector),
Named.as("process"));
+ stream.process(() -> new ShutdownProcessor<>(processorValueCollector),
Named.as("process"));
properties = basicProps();
}
@@ -198,18 +201,48 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
testShutdownApplication(1);
}
+ private static class ShutdownProcessor<KIn, VIn, KOut, VOut> implements
Processor<KIn, VIn, KOut, VOut> {
+
+ private ProcessorContext<KOut, VOut> context;
+
+ final List<String> valueList;
+
+ ShutdownProcessor(final List<String> valueList) {
+ this.valueList = valueList;
+ }
+
+ @Override
+ public void init(final ProcessorContext<KOut, VOut> context) {} {
+ this.context = context;
+ }
+
+ @Override
+ public void close() {
+ // do nothing
+ }
+
+ @Override
+ public void process(final Record<KIn, VIn> record) {
+ valueList.add(record.value().toString());
+ if (throwError.get()) {
+ throw new StreamsException(Thread.currentThread().getName());
+ }
+ throwError.set(true);
+ }
+ }
+
@Test
public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread()
throws Exception {
builder.addGlobalStore(
- new KeyValueStoreBuilder<>(
- Stores.persistentKeyValueStore("globalStore"),
- Serdes.String(),
- Serdes.String(),
- CLUSTER.time
- ),
- inputTopic2,
- Consumed.with(Serdes.String(), Serdes.String()),
- () -> new ShutdownProcessor(processorValueCollector)
+ new KeyValueStoreBuilder<>(
+ Stores.persistentKeyValueStore("globalStore"),
+ Serdes.String(),
+ Serdes.String(),
+ CLUSTER.time
+ ),
+ inputTopic2,
+ Consumed.with(Serdes.String(), Serdes.String()),
+ () -> new ShutdownProcessor<String, String, Void,
Void>(processorValueCollector)
);
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
@@ -224,7 +257,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest
{
assertThat(processorValueCollector.size(), equalTo(1));
}
-
}
@Test
@@ -319,23 +351,6 @@ public class
StreamsUncaughtExceptionHandlerIntegrationTest {
timestamp);
}
- private static class ShutdownProcessor extends
org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
- final List<String> valueList;
-
- ShutdownProcessor(final List<String> valueList) {
- this.valueList = valueList;
- }
-
- @Override
- public void process(final String key, final String value) {
- valueList.add(value + " " + context.taskId());
- if (throwError.get()) {
- throw new StreamsException(Thread.currentThread().getName());
- }
- throwError.set(true);
- }
- }
-
private void testShutdownApplication(final int numThreads) throws
Exception {
properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
diff --git
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 9430a511f71..25f5ce339b0 100644
---
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -20,7 +20,7 @@ import java.util.Properties
import java.util.regex.Pattern
import org.apache.kafka.streams.kstream.GlobalKTable
-import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
+import org.apache.kafka.streams.processor.StateStore
import org.apache.kafka.streams.state.StoreBuilder
import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, Topology}
import org.apache.kafka.streams.scala.kstream.{Consumed, KStream, KTable,
Materialized}
@@ -164,27 +164,6 @@ class StreamsBuilder(inner: StreamsBuilderJ = new
StreamsBuilderJ) {
*/
def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ =
inner.addStateStore(builder)
- /**
- * Adds a global `StateStore` to the topology. Global stores should not be
added to `Processor`, `Transformer`,
- * or `ValueTransformer` (in contrast to regular stores).
- * <p>
- * It is not required to connect a global store to `Processor`,
`Transformer`, or `ValueTransformer`;
- * those have read-only access to all global stores by default.
- *
- * @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
- */
- @deprecated(
- "Use #addGlobalStore(StoreBuilder, String, Consumed,
org.apache.kafka.streams.processor.api.ProcessorSupplier) instead.",
- "2.7.0"
- )
- def addGlobalStore[K, V](
- storeBuilder: StoreBuilder[_ <: StateStore],
- topic: String,
- consumed: Consumed[K, V],
- stateUpdateSupplier: ProcessorSupplier[K, V]
- ): StreamsBuilderJ =
- inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
-
/**
* Adds a global `StateStore` to the topology. Global stores should not be
added to `Processor`, `Transformer`,
* or `ValueTransformer` (in contrast to regular stores).