This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c62c3899aaa KAFKA-12829: Remove deprecated 
StreamsBuilder#addGlobalStore of old Processor API (#17059)
c62c3899aaa is described below

commit c62c3899aaa1787b72ec8236e5a2ab290e2989ac
Author: JohnHuang <[email protected]>
AuthorDate: Thu Sep 12 05:22:08 2024 +0800

    KAFKA-12829: Remove deprecated StreamsBuilder#addGlobalStore of old 
Processor API (#17059)
    
    Reviewers: Chia-Ping Tsai <[email protected]>, Matthias J. Sax 
<[email protected]>
---
 .../org/apache/kafka/streams/StreamsBuilder.java   | 48 ---------------
 ...amsUncaughtExceptionHandlerIntegrationTest.java | 71 +++++++++++++---------
 .../kafka/streams/scala/StreamsBuilder.scala       | 23 +------
 3 files changed, 44 insertions(+), 98 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
index efc66dd35c3..2879436e500 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsBuilder.java
@@ -33,8 +33,6 @@ import org.apache.kafka.streams.processor.TimestampExtractor;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder;
-import org.apache.kafka.streams.processor.internals.ProcessorAdapter;
-import org.apache.kafka.streams.processor.internals.ProcessorNode;
 import org.apache.kafka.streams.processor.internals.SourceNode;
 import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -523,52 +521,6 @@ public class StreamsBuilder {
         return this;
     }
 
-    /**
-     * Adds a global {@link StateStore} to the topology.
-     * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
-     * There will be exactly one instance of this {@link StateStore} per Kafka 
Streams instance.
-     * <p>
-     * A {@link SourceNode} with the provided sourceName will be added to 
consume the data arriving from the partitions
-     * of the input topic.
-     * <p>
-     * The provided {@link 
org.apache.kafka.streams.processor.ProcessorSupplier} will be used to create an 
{@link ProcessorNode} that will receive all
-     * records forwarded from the {@link SourceNode}.
-     * This {@link ProcessorNode} should be used to keep the {@link 
StateStore} up-to-date.
-     * The default {@link TimestampExtractor} as specified in the {@link 
StreamsConfig config} is used.
-     * <p>
-     * It is not required to connect a global store to {@link 
org.apache.kafka.streams.processor.api.Processor Processors},
-     * {@link org.apache.kafka.streams.kstream.Transformer Transformers},
-     * or {@link org.apache.kafka.streams.kstream.ValueTransformer 
ValueTransformer}; those have read-only access to all global stores by default.
-     * <p>
-     * The supplier should always generate a new instance each time {@link  
ProcessorSupplier#get()} gets called. Creating
-     * a single {@link Processor} object and returning the same object 
reference in {@link ProcessorSupplier#get()} would be
-     * a violation of the supplier pattern and leads to runtime exceptions.
-     *
-     * @param storeBuilder          user defined {@link StoreBuilder}; can't 
be {@code null}
-     * @param topic                 the topic to source the data from
-     * @param consumed              the instance of {@link Consumed} used to 
define optional parameters; can't be {@code null}
-     * @param stateUpdateSupplier   the instance of {@link 
org.apache.kafka.streams.processor.ProcessorSupplier}
-     * @return itself
-     * @throws TopologyException if the processor of state is already 
registered
-     * @deprecated Since 2.7.0; use {@link #addGlobalStore(StoreBuilder, 
String, Consumed, ProcessorSupplier)} instead.
-     */
-    @Deprecated
-    public synchronized <K, V> StreamsBuilder addGlobalStore(final 
StoreBuilder<?> storeBuilder,
-                                                             final String 
topic,
-                                                             final Consumed<K, 
V> consumed,
-                                                             final 
org.apache.kafka.streams.processor.ProcessorSupplier<K, V> stateUpdateSupplier) 
{
-        Objects.requireNonNull(storeBuilder, "storeBuilder can't be null");
-        Objects.requireNonNull(consumed, "consumed can't be null");
-        internalStreamsBuilder.addGlobalStore(
-            new StoreBuilderWrapper(storeBuilder),
-            topic,
-            new ConsumedInternal<>(consumed),
-            () -> ProcessorAdapter.adapt(stateUpdateSupplier.get()),
-            true
-        );
-        return this;
-    }
-
     /**
      * Adds a global {@link StateStore} to the topology.
      * The {@link StateStore} sources its data from all partitions of the 
provided input topic.
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
index 005efe92d10..476c1857422 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamsUncaughtExceptionHandlerIntegrationTest.java
@@ -36,6 +36,9 @@ import org.apache.kafka.streams.kstream.Consumed;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.Materialized;
 import org.apache.kafka.streams.kstream.Named;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.state.Stores;
 import org.apache.kafka.streams.state.internals.KeyValueStoreBuilder;
 import org.apache.kafka.test.TestUtils;
@@ -127,7 +130,7 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
         outputTopic2 = "output2" + testId;
         IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, 
inputTopic2, outputTopic, outputTopic2);
         final KStream<String, String> stream = builder.stream(inputTopic);
-        stream.process(() -> new ShutdownProcessor(processorValueCollector), 
Named.as("process"));
+        stream.process(() -> new ShutdownProcessor<>(processorValueCollector), 
Named.as("process"));
         properties = basicProps();
     }
 
@@ -198,18 +201,48 @@ public class 
StreamsUncaughtExceptionHandlerIntegrationTest {
         testShutdownApplication(1);
     }
 
+    private static class ShutdownProcessor<KIn, VIn, KOut, VOut> implements 
Processor<KIn, VIn, KOut, VOut> {
+
+        private ProcessorContext<KOut, VOut> context;
+
+        final List<String> valueList;
+
+        ShutdownProcessor(final List<String> valueList) {
+            this.valueList = valueList;
+        }
+
+        @Override
+        public void init(final ProcessorContext<KOut, VOut> context) {} {
+            this.context = context;
+        }
+
+        @Override
+        public void close() {
+            // do nothing
+        }
+
+        @Override
+        public void process(final Record<KIn, VIn> record) {
+            valueList.add(record.value().toString());
+            if (throwError.get()) {
+                throw new StreamsException(Thread.currentThread().getName());
+            }
+            throwError.set(true);
+        }
+    }
+
     @Test
     public void shouldShutDownClientIfGlobalStreamThreadWantsToReplaceThread() 
throws Exception {
         builder.addGlobalStore(
-            new KeyValueStoreBuilder<>(
-                Stores.persistentKeyValueStore("globalStore"),
-                Serdes.String(),
-                Serdes.String(),
-                CLUSTER.time
-            ),
-            inputTopic2,
-            Consumed.with(Serdes.String(), Serdes.String()),
-            () -> new ShutdownProcessor(processorValueCollector)
+                new KeyValueStoreBuilder<>(
+                        Stores.persistentKeyValueStore("globalStore"),
+                        Serdes.String(),
+                        Serdes.String(),
+                        CLUSTER.time
+                ),
+                inputTopic2,
+                Consumed.with(Serdes.String(), Serdes.String()),
+                () -> new ShutdownProcessor<String, String, Void, 
Void>(processorValueCollector)
         );
         properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 0);
 
@@ -224,7 +257,6 @@ public class StreamsUncaughtExceptionHandlerIntegrationTest 
{
 
             assertThat(processorValueCollector.size(), equalTo(1));
         }
-
     }
 
     @Test
@@ -319,23 +351,6 @@ public class 
StreamsUncaughtExceptionHandlerIntegrationTest {
             timestamp);
     }
 
-    private static class ShutdownProcessor extends 
org.apache.kafka.streams.processor.AbstractProcessor<String, String> {
-        final List<String> valueList;
-
-        ShutdownProcessor(final List<String> valueList) {
-            this.valueList = valueList;
-        }
-
-        @Override
-        public void process(final String key, final String value) {
-            valueList.add(value + " " + context.taskId());
-            if (throwError.get()) {
-                throw new StreamsException(Thread.currentThread().getName());
-            }
-            throwError.set(true);
-        }
-    }
-
     private void testShutdownApplication(final int numThreads) throws 
Exception {
         properties.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, numThreads);
 
diff --git 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
index 9430a511f71..25f5ce339b0 100644
--- 
a/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
+++ 
b/streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/StreamsBuilder.scala
@@ -20,7 +20,7 @@ import java.util.Properties
 import java.util.regex.Pattern
 
 import org.apache.kafka.streams.kstream.GlobalKTable
-import org.apache.kafka.streams.processor.{ProcessorSupplier, StateStore}
+import org.apache.kafka.streams.processor.StateStore
 import org.apache.kafka.streams.state.StoreBuilder
 import org.apache.kafka.streams.{StreamsBuilder => StreamsBuilderJ, Topology}
 import org.apache.kafka.streams.scala.kstream.{Consumed, KStream, KTable, 
Materialized}
@@ -164,27 +164,6 @@ class StreamsBuilder(inner: StreamsBuilderJ = new 
StreamsBuilderJ) {
    */
   def addStateStore(builder: StoreBuilder[_ <: StateStore]): StreamsBuilderJ = 
inner.addStateStore(builder)
 
-  /**
-   * Adds a global `StateStore` to the topology. Global stores should not be 
added to `Processor`, `Transformer`,
-   * or `ValueTransformer` (in contrast to regular stores).
-   * <p>
-   * It is not required to connect a global store to `Processor`, 
`Transformer`, or `ValueTransformer`;
-   * those have read-only access to all global stores by default.
-   *
-   * @see `org.apache.kafka.streams.StreamsBuilder#addGlobalStore`
-   */
-  @deprecated(
-    "Use #addGlobalStore(StoreBuilder, String, Consumed, 
org.apache.kafka.streams.processor.api.ProcessorSupplier) instead.",
-    "2.7.0"
-  )
-  def addGlobalStore[K, V](
-    storeBuilder: StoreBuilder[_ <: StateStore],
-    topic: String,
-    consumed: Consumed[K, V],
-    stateUpdateSupplier: ProcessorSupplier[K, V]
-  ): StreamsBuilderJ =
-    inner.addGlobalStore(storeBuilder, topic, consumed, stateUpdateSupplier)
-
   /**
    * Adds a global `StateStore` to the topology. Global stores should not be 
added to `Processor`, `Transformer`,
    * or `ValueTransformer` (in contrast to regular stores).

Reply via email to