This is an automated email from the ASF dual-hosted git repository.

ableegoldman pushed a commit to branch 3.8
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.8 by this push:
     new 45027f3d33f KAFKA-15774: use the default dsl store supplier for fkj 
subscriptions (#16380)
45027f3d33f is described below

commit 45027f3d33fcc67b0dccfe8e1fab903f48f37129
Author: Rohan <desai.p.ro...@gmail.com>
AuthorDate: Wed Jun 19 00:32:59 2024 -0700

    KAFKA-15774: use the default dsl store supplier for fkj subscriptions 
(#16380)
    
    Foreign key joins have an additional "internal" state store used for 
subscriptions, which is not exposed for configuration via Materialized or 
StoreBuilder which means there is no way to plug in a different store 
implementation via the DSL operator. However, we should respect the configured 
default dsl store supplier if one is configured, to allow these stores to be 
customized and conform to the store type selection logic used for other DSL 
operator stores
    
    Reviewers: Anna Sophie Blee-Goldman <ableegold...@apache.org>
---
 .../streams/kstream/internals/KTableImpl.java      |  37 +++----
 .../internals/SubscriptionStoreFactory.java        | 116 +++++++++++++++++++++
 .../ForeignTableJoinProcessorSupplier.java         |   9 +-
 .../SubscriptionReceiveProcessorSupplier.java      |   9 +-
 .../internals/graph/ForeignTableJoinNode.java      |   8 +-
 .../ForeignTableJoinProcessorSupplierTests.java    |   2 +-
 .../SubscriptionReceiveProcessorSupplierTest.java  |   2 +-
 7 files changed, 143 insertions(+), 40 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
index b81f9c1918e..bb124dfcc45 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java
@@ -17,7 +17,6 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
-import org.apache.kafka.common.serialization.Serdes;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.streams.KeyValue;
 import org.apache.kafka.streams.errors.TopologyException;
@@ -73,8 +72,6 @@ import 
org.apache.kafka.streams.processor.internals.StoreBuilderWrapper;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.StoreBuilder;
-import org.apache.kafka.streams.state.Stores;
-import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer;
@@ -1184,25 +1181,20 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
         copartitionedRepartitionSources.add(subscriptionSource.nodeName());
         
builder.internalTopologyBuilder.copartitionSources(copartitionedRepartitionSources);
 
+        final String subscriptionStoreName = renamed
+            .suffixWithOrElseGet("-subscription-store", builder, 
FK_JOIN_STATE_STORE_NAME);
+        builder.addStateStore(
+            new SubscriptionStoreFactory<>(subscriptionStoreName, 
subscriptionWrapperSerde));
 
-        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<K>>> subscriptionStore =
-            Stores.timestampedKeyValueStoreBuilder(
-                Stores.persistentTimestampedKeyValueStore(
-                    renamed.suffixWithOrElseGet("-subscription-store", 
builder, FK_JOIN_STATE_STORE_NAME)
-                ),
-                new Serdes.BytesSerde(),
-                subscriptionWrapperSerde
-            );
-        builder.addStateStore(new StoreBuilderWrapper(subscriptionStore));
-
+        final String subscriptionReceiveName = renamed.suffixWithOrElseGet(
+            "-subscription-receive", builder, SUBSCRIPTION_PROCESSOR);
         final StatefulProcessorNode<KO, SubscriptionWrapper<K>> 
subscriptionReceiveNode =
             new StatefulProcessorNode<>(
+                subscriptionReceiveName,
                 new ProcessorParameters<>(
-                    new 
SubscriptionReceiveProcessorSupplier<>(subscriptionStore, combinedKeySchema),
-                    renamed.suffixWithOrElseGet("-subscription-receive", 
builder, SUBSCRIPTION_PROCESSOR)
-                ),
-                Collections.singleton(subscriptionStore),
-                Collections.emptySet()
+                    new 
SubscriptionReceiveProcessorSupplier<>(subscriptionStoreName, 
combinedKeySchema),
+                    subscriptionReceiveName),
+                new String[]{subscriptionStoreName}
             );
         builder.addGraphNode(subscriptionSource, subscriptionReceiveNode);
 
@@ -1220,13 +1212,14 @@ public class KTableImpl<K, S, V> extends 
AbstractStream<K, V> implements KTable<
             );
         builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode);
 
+        final String foreignTableJoinName = renamed
+            .suffixWithOrElseGet("-foreign-join-subscription", builder, 
SUBSCRIPTION_PROCESSOR);
         final StatefulProcessorNode<KO, Change<VO>> foreignTableJoinNode = new 
ForeignTableJoinNode<>(
             new ProcessorParameters<>(
-                new ForeignTableJoinProcessorSupplier<>(subscriptionStore, 
combinedKeySchema),
-                renamed.suffixWithOrElseGet("-foreign-join-subscription", 
builder, SUBSCRIPTION_PROCESSOR)
+                new ForeignTableJoinProcessorSupplier<>(subscriptionStoreName, 
combinedKeySchema),
+                foreignTableJoinName
             ),
-            Collections.singleton(subscriptionStore),
-            Collections.emptySet()
+            new String[]{subscriptionStoreName}
         );
         builder.addGraphNode(((KTableImpl<KO, VO, ?>) 
foreignKeyTable).graphNode, foreignTableJoinNode);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
new file mode 100644
index 00000000000..b20d6063917
--- /dev/null
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.streams.kstream.internals;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.common.serialization.Serdes;
+import 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.StoreFactory;
+import org.apache.kafka.streams.state.DslKeyValueParams;
+import org.apache.kafka.streams.state.StoreBuilder;
+import org.apache.kafka.streams.state.Stores;
+
+public class SubscriptionStoreFactory<K> extends 
AbstractConfigurableStoreFactory {
+    private final String name;
+    private final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde;
+    private final Map<String, String> logConfig = new HashMap<>();
+    private boolean loggingEnabled = true;
+
+    public SubscriptionStoreFactory(
+        final String name,
+        final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde
+    ) {
+        super(null);
+        this.name = name;
+        this.subscriptionWrapperSerde = subscriptionWrapperSerde;
+    }
+
+    @Override
+    public StateStore build() {
+        StoreBuilder<?> builder;
+        builder = Stores.timestampedKeyValueStoreBuilder(
+            dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, 
true)),
+            new Serdes.BytesSerde(),
+            subscriptionWrapperSerde
+        );
+        if (loggingEnabled) {
+            builder = builder.withLoggingEnabled(logConfig);
+        } else {
+            builder = builder.withLoggingDisabled();
+        }
+        builder = builder.withCachingDisabled();
+        return builder.build();
+    }
+
+    @Override
+    public long retentionPeriod() {
+        throw new IllegalStateException("retentionPeriod is not supported when 
not a window store");
+    }
+
+    @Override
+    public long historyRetention() {
+        throw new IllegalStateException(
+            "historyRetention is not supported when not a versioned store");
+    }
+
+    @Override
+    public boolean loggingEnabled() {
+        return loggingEnabled;
+    }
+
+    @Override
+    public String name() {
+        return name;
+    }
+
+    @Override
+    public boolean isWindowStore() {
+        return false;
+    }
+
+    @Override
+    public boolean isVersionedStore() {
+        return false;
+    }
+
+    @Override
+    public Map<String, String> logConfig() {
+        return logConfig;
+    }
+
+    @Override
+    public StoreFactory withCachingDisabled() {
+        // caching is always disabled
+        return this;
+    }
+
+    @Override
+    public StoreFactory withLoggingDisabled() {
+        loggingEnabled = false;
+        return this;
+    }
+
+    @Override
+    public boolean isCompatibleWith(final StoreFactory other) {
+        return other instanceof SubscriptionStoreFactory
+            && ((SubscriptionStoreFactory<?>) other).name.equals(name);
+    }
+}
\ No newline at end of file
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
index 314ed7cdfd3..7950fa5ff71 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java
@@ -30,7 +30,6 @@ import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.KeyValueIterator;
-import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
@@ -41,15 +40,15 @@ import java.nio.ByteBuffer;
 public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements
     ProcessorSupplier<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> {
     private static final Logger LOG = 
LoggerFactory.getLogger(ForeignTableJoinProcessorSupplier.class);
-    private final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<K>>> storeBuilder;
+    private final String storeName;
     private final CombinedKeySchema<KO, K> keySchema;
     private boolean useVersionedSemantics = false;
 
     public ForeignTableJoinProcessorSupplier(
-        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<K>>> storeBuilder,
+        final String storeName,
         final CombinedKeySchema<KO, K> keySchema) {
 
-        this.storeBuilder = storeBuilder;
+        this.storeName = storeName;
         this.keySchema = keySchema;
     }
 
@@ -80,7 +79,7 @@ public class ForeignTableJoinProcessorSupplier<K, KO, VO> 
implements
                 internalProcessorContext.taskId().toString(),
                 internalProcessorContext.metrics()
             );
-            subscriptionStore = 
internalProcessorContext.getStateStore(storeBuilder);
+            subscriptionStore = 
internalProcessorContext.getStateStore(storeName);
         }
 
         @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
index 90d70bdf330..3050f8bdd70 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java
@@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.api.RecordMetadata;
 import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
-import org.apache.kafka.streams.state.StoreBuilder;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.slf4j.Logger;
@@ -39,14 +38,14 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
     implements ProcessorSupplier<KO, SubscriptionWrapper<K>, CombinedKey<KO, 
K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> {
     private static final Logger LOG = 
LoggerFactory.getLogger(SubscriptionReceiveProcessorSupplier.class);
 
-    private final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<K>>> storeBuilder;
+    private final String storeName;
     private final CombinedKeySchema<KO, K> keySchema;
 
     public SubscriptionReceiveProcessorSupplier(
-        final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<K>>> storeBuilder,
+        final String storeName,
         final CombinedKeySchema<KO, K> keySchema) {
 
-        this.storeBuilder = storeBuilder;
+        this.storeName = storeName;
         this.keySchema = keySchema;
     }
 
@@ -68,7 +67,7 @@ public class SubscriptionReceiveProcessorSupplier<K, KO>
                     internalProcessorContext.taskId().toString(),
                     internalProcessorContext.metrics()
                 );
-                store = internalProcessorContext.getStateStore(storeBuilder);
+                store = internalProcessorContext.getStateStore(storeName);
 
                 keySchema.init(context);
             }
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignTableJoinNode.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignTableJoinNode.java
index 82d9d6486ad..16f096e8208 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignTableJoinNode.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignTableJoinNode.java
@@ -16,18 +16,14 @@
  */
 package org.apache.kafka.streams.kstream.internals.graph;
 
-import java.util.Set;
-import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier;
 import 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
-import org.apache.kafka.streams.state.StoreBuilder;
 
 public class ForeignTableJoinNode<K, V> extends StatefulProcessorNode<K, V> 
implements VersionedSemanticsGraphNode {
 
     public ForeignTableJoinNode(final ProcessorParameters<K, V, ?, ?> 
processorParameters,
-                                final Set<StoreBuilder<?>> preRegisteredStores,
-                                final Set<KTableValueGetterSupplier<?, ?>> 
valueGetterSuppliers) {
-        super(processorParameters, preRegisteredStores, valueGetterSuppliers);
+                                final String[] storeNames) {
+        super(processorParameters.processorName(), processorParameters, 
storeNames);
     }
 
     @SuppressWarnings("unchecked")
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
index e7f18b0b71e..01545514d69 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java
@@ -70,7 +70,7 @@ public class ForeignTableJoinProcessorSupplierTests {
         context = new MockInternalNewProcessorContext<>(props, new TaskId(0, 
0), stateDir);
 
         final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder = storeBuilder();
-        processor = new ForeignTableJoinProcessorSupplier<String, String, 
String>(storeBuilder(), COMBINED_KEY_SCHEMA).get();
+        processor = new ForeignTableJoinProcessorSupplier<String, String, 
String>(storeBuilder().name(), COMBINED_KEY_SCHEMA).get();
         stateStore = storeBuilder.build();
         context.addStateStore(stateStore);
         stateStore.init((StateStoreContext) context, stateStore);
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
index dc04d560ed0..b8bb186f83c 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java
@@ -503,7 +503,7 @@ public class SubscriptionReceiveProcessorSupplierTest {
     private SubscriptionReceiveProcessorSupplier<String, String> supplier(
         final StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder) {
 
-        return new SubscriptionReceiveProcessorSupplier<>(storeBuilder, 
COMBINED_KEY_SCHEMA);
+        return new SubscriptionReceiveProcessorSupplier<>(storeBuilder.name(), 
COMBINED_KEY_SCHEMA);
     }
 
     private StoreBuilder<TimestampedKeyValueStore<Bytes, 
SubscriptionWrapper<String>>> storeBuilder() {

Reply via email to