This is an automated email from the ASF dual-hosted git repository. ableegoldman pushed a commit to branch 3.8 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.8 by this push: new 45027f3d33f KAFKA-15774: use the default dsl store supplier for fkj subscriptions (#16380) 45027f3d33f is described below commit 45027f3d33fcc67b0dccfe8e1fab903f48f37129 Author: Rohan <desai.p.ro...@gmail.com> AuthorDate: Wed Jun 19 00:32:59 2024 -0700 KAFKA-15774: use the default dsl store supplier for fkj subscriptions (#16380) Foreign key joins have an additional "internal" state store used for subscriptions, which is not exposed for configuration via Materialized or StoreBuilder which means there is no way to plug in a different store implementation via the DSL operator. However, we should respect the configured default dsl store supplier if one is configured, to allow these stores to be customized and conform to the store type selection logic used for other DSL operator stores Reviewers: Anna Sophie Blee-Goldman <ableegold...@apache.org> --- .../streams/kstream/internals/KTableImpl.java | 37 +++---- .../internals/SubscriptionStoreFactory.java | 116 +++++++++++++++++++++ .../ForeignTableJoinProcessorSupplier.java | 9 +- .../SubscriptionReceiveProcessorSupplier.java | 9 +- .../internals/graph/ForeignTableJoinNode.java | 8 +- .../ForeignTableJoinProcessorSupplierTests.java | 2 +- .../SubscriptionReceiveProcessorSupplierTest.java | 2 +- 7 files changed, 143 insertions(+), 40 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index b81f9c1918e..bb124dfcc45 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -17,7 +17,6 @@ package org.apache.kafka.streams.kstream.internals; import org.apache.kafka.common.serialization.Serde; -import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.errors.TopologyException; @@ -73,8 +72,6 @@ import org.apache.kafka.streams.processor.internals.StoreBuilderWrapper; import org.apache.kafka.streams.processor.internals.StoreFactory; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.StoreBuilder; -import org.apache.kafka.streams.state.Stores; -import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.apache.kafka.streams.state.VersionedBytesStoreSupplier; import org.apache.kafka.streams.state.internals.InMemoryTimeOrderedKeyValueChangeBuffer; @@ -1184,25 +1181,20 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< copartitionedRepartitionSources.add(subscriptionSource.nodeName()); builder.internalTopologyBuilder.copartitionSources(copartitionedRepartitionSources); + final String subscriptionStoreName = renamed + .suffixWithOrElseGet("-subscription-store", builder, FK_JOIN_STATE_STORE_NAME); + builder.addStateStore( + new SubscriptionStoreFactory<>(subscriptionStoreName, subscriptionWrapperSerde)); - final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> subscriptionStore = - Stores.timestampedKeyValueStoreBuilder( - Stores.persistentTimestampedKeyValueStore( - renamed.suffixWithOrElseGet("-subscription-store", builder, FK_JOIN_STATE_STORE_NAME) - ), - new Serdes.BytesSerde(), - subscriptionWrapperSerde - ); - builder.addStateStore(new StoreBuilderWrapper(subscriptionStore)); - + final String subscriptionReceiveName = renamed.suffixWithOrElseGet( + "-subscription-receive", builder, SUBSCRIPTION_PROCESSOR); final StatefulProcessorNode<KO, SubscriptionWrapper<K>> subscriptionReceiveNode = new StatefulProcessorNode<>( + subscriptionReceiveName, new ProcessorParameters<>( - new SubscriptionReceiveProcessorSupplier<>(subscriptionStore, combinedKeySchema), - renamed.suffixWithOrElseGet("-subscription-receive", builder, SUBSCRIPTION_PROCESSOR) - ), - Collections.singleton(subscriptionStore), - Collections.emptySet() + new SubscriptionReceiveProcessorSupplier<>(subscriptionStoreName, combinedKeySchema), + subscriptionReceiveName), + new String[]{subscriptionStoreName} ); builder.addGraphNode(subscriptionSource, subscriptionReceiveNode); @@ -1220,13 +1212,14 @@ public class KTableImpl<K, S, V> extends AbstractStream<K, V> implements KTable< ); builder.addGraphNode(subscriptionReceiveNode, subscriptionJoinNode); + final String foreignTableJoinName = renamed + .suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR); final StatefulProcessorNode<KO, Change<VO>> foreignTableJoinNode = new ForeignTableJoinNode<>( new ProcessorParameters<>( - new ForeignTableJoinProcessorSupplier<>(subscriptionStore, combinedKeySchema), - renamed.suffixWithOrElseGet("-foreign-join-subscription", builder, SUBSCRIPTION_PROCESSOR) + new ForeignTableJoinProcessorSupplier<>(subscriptionStoreName, combinedKeySchema), + foreignTableJoinName ), - Collections.singleton(subscriptionStore), - Collections.emptySet() + new String[]{subscriptionStoreName} ); builder.addGraphNode(((KTableImpl<KO, VO, ?>) foreignKeyTable).graphNode, foreignTableJoinNode); diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java new file mode 100644 index 00000000000..b20d6063917 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import java.util.HashMap; +import java.util.Map; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.internals.StoreFactory; +import org.apache.kafka.streams.state.DslKeyValueParams; +import org.apache.kafka.streams.state.StoreBuilder; +import org.apache.kafka.streams.state.Stores; + +public class SubscriptionStoreFactory<K> extends AbstractConfigurableStoreFactory { + private final String name; + private final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde; + private final Map<String, String> logConfig = new HashMap<>(); + private boolean loggingEnabled = true; + + public SubscriptionStoreFactory( + final String name, + final Serde<SubscriptionWrapper<K>> subscriptionWrapperSerde + ) { + super(null); + this.name = name; + this.subscriptionWrapperSerde = subscriptionWrapperSerde; + } + + @Override + public StateStore build() { + StoreBuilder<?> builder; + builder = Stores.timestampedKeyValueStoreBuilder( + dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, true)), + new Serdes.BytesSerde(), + subscriptionWrapperSerde + ); + if (loggingEnabled) { + builder = builder.withLoggingEnabled(logConfig); + } else { + builder = builder.withLoggingDisabled(); + } + builder = builder.withCachingDisabled(); + return builder.build(); + } + + @Override + public long retentionPeriod() { + throw new IllegalStateException("retentionPeriod is not supported when not a window store"); + } + + @Override + public long historyRetention() { + throw new IllegalStateException( + "historyRetention is not supported when not a versioned store"); + } + + @Override + public boolean loggingEnabled() { + return loggingEnabled; + } + + @Override + public String name() { + return name; + } + + @Override + public boolean isWindowStore() { + return false; + } + + @Override + public boolean isVersionedStore() { + return false; + } + + @Override + public Map<String, String> logConfig() { + return logConfig; + } + + @Override + public StoreFactory withCachingDisabled() { + // caching is always disabled + return this; + } + + @Override + public StoreFactory withLoggingDisabled() { + loggingEnabled = false; + return this; + } + + @Override + public boolean isCompatibleWith(final StoreFactory other) { + return other instanceof SubscriptionStoreFactory + && ((SubscriptionStoreFactory<?>) other).name.equals(name); + } +} \ No newline at end of file diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java index 314ed7cdfd3..7950fa5ff71 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplier.java @@ -30,7 +30,6 @@ import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; @@ -41,15 +40,15 @@ import java.nio.ByteBuffer; public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements ProcessorSupplier<KO, Change<VO>, K, SubscriptionResponseWrapper<VO>> { private static final Logger LOG = LoggerFactory.getLogger(ForeignTableJoinProcessorSupplier.class); - private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder; + private final String storeName; private final CombinedKeySchema<KO, K> keySchema; private boolean useVersionedSemantics = false; public ForeignTableJoinProcessorSupplier( - final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder, + final String storeName, final CombinedKeySchema<KO, K> keySchema) { - this.storeBuilder = storeBuilder; + this.storeName = storeName; this.keySchema = keySchema; } @@ -80,7 +79,7 @@ public class ForeignTableJoinProcessorSupplier<K, KO, VO> implements internalProcessorContext.taskId().toString(), internalProcessorContext.metrics() ); - subscriptionStore = internalProcessorContext.getStateStore(storeBuilder); + subscriptionStore = internalProcessorContext.getStateStore(storeName); } @Override diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java index 90d70bdf330..3050f8bdd70 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplier.java @@ -29,7 +29,6 @@ import org.apache.kafka.streams.processor.api.Record; import org.apache.kafka.streams.processor.api.RecordMetadata; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics; -import org.apache.kafka.streams.state.StoreBuilder; import org.apache.kafka.streams.state.TimestampedKeyValueStore; import org.apache.kafka.streams.state.ValueAndTimestamp; import org.slf4j.Logger; @@ -39,14 +38,14 @@ public class SubscriptionReceiveProcessorSupplier<K, KO> implements ProcessorSupplier<KO, SubscriptionWrapper<K>, CombinedKey<KO, K>, Change<ValueAndTimestamp<SubscriptionWrapper<K>>>> { private static final Logger LOG = LoggerFactory.getLogger(SubscriptionReceiveProcessorSupplier.class); - private final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder; + private final String storeName; private final CombinedKeySchema<KO, K> keySchema; public SubscriptionReceiveProcessorSupplier( - final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<K>>> storeBuilder, + final String storeName, final CombinedKeySchema<KO, K> keySchema) { - this.storeBuilder = storeBuilder; + this.storeName = storeName; this.keySchema = keySchema; } @@ -68,7 +67,7 @@ public class SubscriptionReceiveProcessorSupplier<K, KO> internalProcessorContext.taskId().toString(), internalProcessorContext.metrics() ); - store = internalProcessorContext.getStateStore(storeBuilder); + store = internalProcessorContext.getStateStore(storeName); keySchema.init(context); } diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignTableJoinNode.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignTableJoinNode.java index 82d9d6486ad..16f096e8208 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignTableJoinNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/graph/ForeignTableJoinNode.java @@ -16,18 +16,14 @@ */ package org.apache.kafka.streams.kstream.internals.graph; -import java.util.Set; -import org.apache.kafka.streams.kstream.internals.KTableValueGetterSupplier; import org.apache.kafka.streams.kstream.internals.foreignkeyjoin.ForeignTableJoinProcessorSupplier; import org.apache.kafka.streams.processor.api.ProcessorSupplier; -import org.apache.kafka.streams.state.StoreBuilder; public class ForeignTableJoinNode<K, V> extends StatefulProcessorNode<K, V> implements VersionedSemanticsGraphNode { public ForeignTableJoinNode(final ProcessorParameters<K, V, ?, ?> processorParameters, - final Set<StoreBuilder<?>> preRegisteredStores, - final Set<KTableValueGetterSupplier<?, ?>> valueGetterSuppliers) { - super(processorParameters, preRegisteredStores, valueGetterSuppliers); + final String[] storeNames) { + super(processorParameters.processorName(), processorParameters, storeNames); } @SuppressWarnings("unchecked") diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java index e7f18b0b71e..01545514d69 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTests.java @@ -70,7 +70,7 @@ public class ForeignTableJoinProcessorSupplierTests { context = new MockInternalNewProcessorContext<>(props, new TaskId(0, 0), stateDir); final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder = storeBuilder(); - processor = new ForeignTableJoinProcessorSupplier<String, String, String>(storeBuilder(), COMBINED_KEY_SCHEMA).get(); + processor = new ForeignTableJoinProcessorSupplier<String, String, String>(storeBuilder().name(), COMBINED_KEY_SCHEMA).get(); stateStore = storeBuilder.build(); context.addStateStore(stateStore); stateStore.init((StateStoreContext) context, stateStore); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java index dc04d560ed0..b8bb186f83c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionReceiveProcessorSupplierTest.java @@ -503,7 +503,7 @@ public class SubscriptionReceiveProcessorSupplierTest { private SubscriptionReceiveProcessorSupplier<String, String> supplier( final StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder) { - return new SubscriptionReceiveProcessorSupplier<>(storeBuilder, COMBINED_KEY_SCHEMA); + return new SubscriptionReceiveProcessorSupplier<>(storeBuilder.name(), COMBINED_KEY_SCHEMA); } private StoreBuilder<TimestampedKeyValueStore<Bytes, SubscriptionWrapper<String>>> storeBuilder() {