This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 9f22628b8fe KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL
(1/N) (#21572)
9f22628b8fe is described below
commit 9f22628b8fe3b9c06e2b90f994d2e2dc2c3302d5
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Feb 27 14:40:37 2026 +0100
KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (1/N) (#21572)
This PR introduces the DslStoreFormat enum and extends DslKeyValueParams
to enable headers-aware key-value stores in the Kafka Streams DSL,
implementing the foundational infrastructure for [
KIP-1285](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1285%3A+DSL+Opt-in+Support+for+Headers-Aware+State+Stores).
Reviewers: Matthias J. Sax <[email protected]>, TengYao Chi
<[email protected]>
---
.../org/apache/kafka/streams/DslStoreFormat.java | 47 +++++++++++++
.../org/apache/kafka/streams/StreamsConfig.java | 14 ++++
.../AbstractConfigurableStoreFactory.java | 11 ++++
.../internals/KeyValueStoreMaterializer.java | 19 ++++--
.../internals/OuterStreamJoinStoreFactory.java | 4 +-
.../internals/SubscriptionStoreFactory.java | 4 +-
.../streams/state/BuiltInDslStoreSuppliers.java | 16 ++++-
.../kafka/streams/state/DslKeyValueParams.java | 37 ++++++++++-
.../apache/kafka/streams/StreamsConfigTest.java | 59 +++++++++++++++--
.../internals/KeyValueStoreMaterializerTest.java | 76 ++++++++++++++++++++++
10 files changed, 271 insertions(+), 16 deletions(-)
diff --git a/streams/src/main/java/org/apache/kafka/streams/DslStoreFormat.java
b/streams/src/main/java/org/apache/kafka/streams/DslStoreFormat.java
new file mode 100644
index 00000000000..02bfdfad743
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/DslStoreFormat.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Locale;
+
+public enum DslStoreFormat {
+
+ /** The non-timestamped state stores */
+ PLAIN("PLAIN"),
+
+ /** The timestamped state stores */
+ TIMESTAMPED("TIMESTAMPED"),
+
+ /** The headers-aware state stores */
+ HEADERS("HEADERS");
+
+ /**
+ * String representation of the DSL store format.
+ */
+ public final String name;
+
+ DslStoreFormat(final String name) {
+ this.name = name;
+ }
+
+ /**
+ * Case-insensitive DSL store format lookup by string name.
+ */
+ public static DslStoreFormat of(final String name) {
+ return DslStoreFormat.valueOf(name.toUpperCase(Locale.ROOT));
+ }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 8f9e326b114..91f34111abd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -537,6 +537,14 @@ public class StreamsConfig extends AbstractConfig {
static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store
implementations to plug in to DSL operators. Must implement the
<code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
static final Class<?> DSL_STORE_SUPPLIERS_CLASS_DEFAULT =
BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
+ /** {@code dsl.store.suppliers.class } */
+ public static final String DSL_STORE_FORMAT_CONFIG = "dsl.store.format";
+ public static final String DSL_STORE_FORMAT_DEFAULT = "DEFAULT";
+ public static final String DSL_STORE_FORMAT_HEADERS = "HEADERS";
+ private static final String DSL_STORE_FORMAT_DOC = "Specifies the state
store format for DSL operators. " +
+ "'DEFAULT' creates either timestamped or plain state stores, depending
on context. " +
+ "'HEADERS' creates headers-aware stores that preserve record headers.";
+
/** {@code default key.serde} */
@SuppressWarnings("WeakerAccess")
public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG =
"default.key.serde";
@@ -1112,6 +1120,12 @@ public class StreamsConfig extends AbstractConfig {
DSL_STORE_SUPPLIERS_CLASS_DEFAULT,
Importance.LOW,
DSL_STORE_SUPPLIERS_CLASS_DOC)
+ .define(DSL_STORE_FORMAT_CONFIG,
+ Type.STRING,
+ DSL_STORE_FORMAT_DEFAULT,
+
ConfigDef.CaseInsensitiveValidString.in(DSL_STORE_FORMAT_DEFAULT,
DSL_STORE_FORMAT_HEADERS),
+ Importance.LOW,
+ DSL_STORE_FORMAT_DOC)
.define(DEFAULT_CLIENT_SUPPLIER_CONFIG,
Type.CLASS,
DefaultKafkaClientSupplier.class.getName(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
index 8fd766a93c6..6dca8141945 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.kstream.internals;
+import org.apache.kafka.streams.DslStoreFormat;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslStoreSuppliers;
@@ -26,6 +27,7 @@ import java.util.Set;
public abstract class AbstractConfigurableStoreFactory implements StoreFactory
{
private final Set<String> connectedProcessorNames = new HashSet<>();
private DslStoreSuppliers dslStoreSuppliers;
+ private DslStoreFormat dslStoreFormat;
public AbstractConfigurableStoreFactory(final DslStoreSuppliers
initialStoreSuppliers) {
this.dslStoreSuppliers = initialStoreSuppliers;
@@ -40,6 +42,11 @@ public abstract class AbstractConfigurableStoreFactory
implements StoreFactory {
config.originals()
);
}
+ final String dslStoreFormatValue =
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+ if
(dslStoreFormatValue.equalsIgnoreCase(StreamsConfig.DSL_STORE_FORMAT_HEADERS)) {
+ dslStoreFormat = DslStoreFormat.HEADERS;
+ }
+ // else dslStoreFormat remains null and the lower layers decide
between PLAIN and TIMESTAMPED
}
@Override
@@ -47,6 +54,10 @@ public abstract class AbstractConfigurableStoreFactory
implements StoreFactory {
return connectedProcessorNames;
}
+ public DslStoreFormat dslStoreFormat() {
+ return dslStoreFormat;
+ }
+
protected DslStoreSuppliers dslStoreSuppliers() {
if (dslStoreSuppliers == null) {
throw new IllegalStateException("Expected configure() to be called
before using dslStoreSuppliers");
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index d59d34e0e90..434b23e7871 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.DslStoreFormat;
import org.apache.kafka.streams.state.DslKeyValueParams;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -44,8 +45,9 @@ public class KeyValueStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
@Override
public StoreBuilder<?> builder() {
+ final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.TIMESTAMPED : DslStoreFormat.HEADERS;
final KeyValueBytesStoreSupplier supplier =
materialized.storeSupplier() == null
- ? dslStoreSuppliers().keyValueStore(new
DslKeyValueParams(materialized.storeName(), true))
+ ? dslStoreSuppliers().keyValueStore(new
DslKeyValueParams(materialized.storeName(), storeFormat))
: (KeyValueBytesStoreSupplier) materialized.storeSupplier();
final StoreBuilder<?> builder;
@@ -55,10 +57,17 @@ public class KeyValueStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
materialized.keySerde(),
materialized.valueSerde());
} else {
- builder = Stores.timestampedKeyValueStoreBuilder(
+ if (storeFormat == DslStoreFormat.HEADERS) {
+ builder = Stores.timestampedKeyValueStoreBuilderWithHeaders(
supplier,
materialized.keySerde(),
materialized.valueSerde());
+ } else {
+ builder = Stores.timestampedKeyValueStoreBuilder(
+ supplier,
+ materialized.keySerde(),
+ materialized.valueSerde());
+ }
}
if (materialized.loggingEnabled()) {
@@ -68,10 +77,10 @@ public class KeyValueStoreMaterializer<K, V> extends
MaterializedStoreFactory<K,
}
if (materialized.cachingEnabled()) {
- if (!(builder instanceof VersionedKeyValueStoreBuilder)) {
- builder.withCachingEnabled();
- } else {
+ if (builder instanceof VersionedKeyValueStoreBuilder) {
LOG.info("Not enabling caching for store '{}' as versioned
stores do not support caching.", supplier.name());
+ } else {
+ builder.withCachingEnabled();
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
index 645858d1a65..e7194ff89ea 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.DslStoreFormat;
import org.apache.kafka.streams.kstream.JoinWindows;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslKeyValueParams;
@@ -95,7 +96,8 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends
AbstractConfigurable
final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde
= new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new
LeftOrRightValueSerde<>(streamJoined.valueSerde(),
streamJoined.otherValueSerde());
- final DslKeyValueParams dslKeyValueParams = new
DslKeyValueParams(name, false);
+ final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.PLAIN : DslStoreFormat.HEADERS;
+ final DslKeyValueParams dslKeyValueParams = new
DslKeyValueParams(name, storeFormat);
final KeyValueBytesStoreSupplier supplier;
if (passedInDslStoreSuppliers != null) {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
index 10c8a5e110c..dbaa008cf0f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.DslStoreFormat;
import
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
import org.apache.kafka.streams.processor.internals.StoreFactory;
import org.apache.kafka.streams.state.DslKeyValueParams;
@@ -46,8 +47,9 @@ public class SubscriptionStoreFactory<K> extends
AbstractConfigurableStoreFactor
@Override
public StoreBuilder<?> builder() {
StoreBuilder<?> builder;
+ final DslStoreFormat storeFormat = dslStoreFormat() == null ?
DslStoreFormat.TIMESTAMPED : DslStoreFormat.HEADERS;
builder = Stores.timestampedKeyValueStoreBuilder(
- dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name,
true)),
+ dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name,
storeFormat)),
new Serdes.BytesSerde(),
subscriptionWrapperSerde
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
index eedcafd7c79..cfe95d515e7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state;
+import org.apache.kafka.streams.DslStoreFormat;
import org.apache.kafka.streams.kstream.EmitStrategy;
import
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
import
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
@@ -36,9 +37,18 @@ public class BuiltInDslStoreSuppliers {
@Override
public KeyValueBytesStoreSupplier keyValueStore(final
DslKeyValueParams params) {
- return params.isTimestamped()
- ? Stores.persistentTimestampedKeyValueStore(params.name())
- : Stores.persistentKeyValueStore(params.name());
+ final DslStoreFormat storeFormat = params.dslStoreFormat();
+ switch (storeFormat) {
+ case HEADERS:
+ return
Stores.persistentTimestampedKeyValueStoreWithHeaders(params.name());
+ case TIMESTAMPED:
+ return
Stores.persistentTimestampedKeyValueStore(params.name());
+ case PLAIN:
+ return Stores.persistentKeyValueStore(params.name());
+ default:
+ throw new IllegalArgumentException("Unsupported
DslStoreFormat: " + storeFormat +
+ ". Expected one of: HEADERS, TIMESTAMPED, or PLAIN");
+ }
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
index 7447d6c711f..73258fe59b4 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state;
+import org.apache.kafka.streams.DslStoreFormat;
+
import java.util.Objects;
/**
@@ -26,23 +28,52 @@ public class DslKeyValueParams {
private final String name;
private final boolean isTimestamped;
+ private final DslStoreFormat dslStoreFormat;
/**
+ * @deprecated Since 4.3. Use {@link #DslKeyValueParams(String,
DslStoreFormat)} instead.
* @param name the name of the store (cannot be {@code null})
* @param isTimestamped whether the returned stores should be timestamped,
see ({@link TimestampedKeyValueStore}
*/
+ @Deprecated
public DslKeyValueParams(final String name, final boolean isTimestamped) {
Objects.requireNonNull(name);
this.name = name;
this.isTimestamped = isTimestamped;
+ // If isTimestamped is false and the user is still calling the old
deprecated constructor, we should assume they mean plain.
+ this.dslStoreFormat = isTimestamped ? DslStoreFormat.TIMESTAMPED :
DslStoreFormat.PLAIN;
+ }
+
+ /**
+ * @param name the name of the store (cannot be {@code null})
+ * @param dslStoreFormat the format of the state store, see ({@link
DslStoreFormat}
+ */
+ public DslKeyValueParams(final String name, final DslStoreFormat
dslStoreFormat) {
+ this.name = Objects.requireNonNull(name);
+ this.dslStoreFormat = Objects.requireNonNull(dslStoreFormat);
+ this.isTimestamped = dslStoreFormat == DslStoreFormat.TIMESTAMPED;
}
public String name() {
return name;
}
+ /**
+ * @deprecated Since 4.3. Use {@link #dslStoreFormat()} instead to check
the store format.
+ * @return {@code true} if the store format is {@link
DslStoreFormat#TIMESTAMPED}, {@code false} otherwise
+ */
+ @Deprecated
public boolean isTimestamped() {
- return isTimestamped;
+ return dslStoreFormat == DslStoreFormat.TIMESTAMPED;
+ }
+
+ /**
+ * Returns the store format for this key-value store.
+ *
+ * @return the {@link DslStoreFormat} specifying whether to use plain,
timestamped, or headers-aware stores
+ */
+ public DslStoreFormat dslStoreFormat() {
+ return dslStoreFormat;
}
@Override
@@ -55,12 +86,13 @@ public class DslKeyValueParams {
}
final DslKeyValueParams that = (DslKeyValueParams) o;
return isTimestamped == that.isTimestamped
+ && dslStoreFormat == that.dslStoreFormat
&& Objects.equals(name, that.name);
}
@Override
public int hashCode() {
- return Objects.hash(name, isTimestamped);
+ return Objects.hash(name, isTimestamped, dslStoreFormat);
}
@Override
@@ -68,6 +100,7 @@ public class DslKeyValueParams {
return "DslKeyValueParams{" +
"name='" + name + '\'' +
"isTimestamped=" + isTimestamped +
+ "dslStoreFormat=" + dslStoreFormat +
'}';
}
}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index d9fbef48929..d58909094cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -966,7 +966,7 @@ public class StreamsConfigTest {
@Test
public void shouldSpecifyRocksdbWhenNotExplicitlyAddedToConfigs() {
final String expectedDefaultStoreType = StreamsConfig.ROCKS_DB;
- final String actualDefaultStoreType =
streamsConfig.getString(org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG);
+ final String actualDefaultStoreType =
streamsConfig.getString(StreamsConfig.DEFAULT_DSL_STORE_CONFIG);
assertEquals(expectedDefaultStoreType, actualDefaultStoreType,
"default.dsl.store should be \"rocksDB\"");
}
@@ -974,19 +974,70 @@ public class StreamsConfigTest {
@Test
public void shouldSpecifyInMemoryWhenExplicitlyAddedToConfigs() {
final String expectedDefaultStoreType = StreamsConfig.IN_MEMORY;
-
props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG,
expectedDefaultStoreType);
+ props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG,
expectedDefaultStoreType);
final StreamsConfig config = new StreamsConfig(props);
- final String actualDefaultStoreType =
config.getString(org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG);
+ final String actualDefaultStoreType =
config.getString(StreamsConfig.DEFAULT_DSL_STORE_CONFIG);
assertEquals(expectedDefaultStoreType, actualDefaultStoreType,
"default.dsl.store should be \"in_memory\"");
}
@Deprecated
@Test
public void shouldThrowConfigExceptionWhenStoreTypeConfigNotValueInRange()
{
-
props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG,
"bad_config");
+ props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "bad_config");
assertThrows(ConfigException.class, () -> new StreamsConfig(props));
}
+ @Test
+ public void shouldUseDefaultStoreFormatWhenNotSpecified() {
+ final StreamsConfig config = new StreamsConfig(props);
+ final String actualFormat =
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+ assertEquals("DEFAULT", actualFormat, "dsl.store.format should default
to 'default'");
+ }
+
+ @Test
+ public void shouldAcceptValidDslStoreFormatDefault() {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "DEFAULT");
+ final StreamsConfig config = new StreamsConfig(props);
+ assertEquals("DEFAULT",
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+ }
+
+ @Test
+ public void shouldAcceptValidDslStoreFormatHeaders() {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "HEADERS");
+ final StreamsConfig config = new StreamsConfig(props);
+ assertEquals("HEADERS",
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+ }
+
+ @Test
+ public void shouldThrowConfigExceptionForInvalidDslStoreFormat() {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "invalid_format");
+ final ConfigException exception = assertThrows(ConfigException.class,
() -> new StreamsConfig(props));
+ assertTrue(exception.getMessage().contains("Invalid value
invalid_format for configuration dsl.store.format"));
+ }
+
+ @Test
+ public void shouldAcceptDslStoreFormatCaseInsensitively() {
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "default");
+ StreamsConfig config = new StreamsConfig(props);
+ assertEquals("default",
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "DEFAULT");
+ config = new StreamsConfig(props);
+ assertEquals("DEFAULT",
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "headers");
+ config = new StreamsConfig(props);
+ assertEquals("headers",
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "HEADERS");
+ config = new StreamsConfig(props);
+ assertEquals("HEADERS",
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+
+ props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "HeAdErS");
+ config = new StreamsConfig(props);
+ assertEquals("HeAdErS",
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+ }
+
@Test
public void
shouldSpecifyRocksdbDslSupplierWhenNotExplicitlyAddedToConfigs() {
final Class<?> expectedDefaultStoreType =
BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 7228496dd36..5b0aa745133 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -29,15 +29,18 @@ import org.apache.kafka.streams.state.DslStoreSuppliers;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.VersionedBytesStore;
import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.internals.CachingKeyValueStore;
import
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
import
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore;
+import
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders;
import
org.apache.kafka.streams.state.internals.ChangeLoggingVersionedKeyValueBytesStore;
import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
import
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
+import
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.internals.MeteredVersionedKeyValueStore;
import org.apache.kafka.streams.state.internals.WrappedStateStore;
@@ -55,6 +58,7 @@ import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.core.IsInstanceOf.instanceOf;
import static org.hamcrest.core.IsNot.not;
import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
@@ -87,6 +91,8 @@ public class KeyValueStoreMaterializerTest {
DslStoreSuppliers.class,
emptyMap()
);
+ lenient().doReturn("default")
+
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
}
private void mockInnerVersionedStore() {
@@ -258,6 +264,68 @@ public class KeyValueStoreMaterializerTest {
assertThat(innerVersionedStore, equalTo(inner));
}
+ @Test
+ public void shouldCreateHeadersAwareStoreWithLoggingEnabledByDefault() {
+ doReturn("headers")
+
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+
+ final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
+ new MaterializedInternal<>(Materialized.<String, String,
KeyValueStore<Bytes, byte[]>>as("store")
+ .withCachingDisabled(), nameProvider, STORE_PREFIX);
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
+
+ final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ assertThat(store,
instanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class));
+ assertThat(logging,
instanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class));
+ }
+
+ @Test
+ public void shouldCreateHeadersAwareStoreWithLoggingDisabled() {
+ doReturn("headers")
+
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+
+ final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized = new MaterializedInternal<>(
+ Materialized.<String, String, KeyValueStore<Bytes,
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
+ );
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertThat(wrapped,
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
+ }
+
+ @Test
+ public void shouldBuildHeadersAwareStoreWithCachingEnabledByDefault() {
+ doReturn("headers")
+
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+
+ final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
+ new MaterializedInternal<>(Materialized.as("store"), nameProvider,
STORE_PREFIX);
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
+
+ final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+ assertThat(wrapped, instanceOf(CachingKeyValueStore.class));
+ }
+
+ @Test
+ public void
shouldCreateHeadersAwareStoreWithProvidedSupplierAndLoggingEnabled() {
+ mockKeyValueStoreSupplier();
+ doReturn("headers")
+
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+
+ final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized =
+ new MaterializedInternal<>(Materialized.<String,
String>as(keyValueStoreSupplier).withCachingDisabled(), nameProvider,
STORE_PREFIX);
+
+ final TimestampedKeyValueStoreWithHeaders<String, String> store =
getHeadersAwareStore(materialized);
+
+ final WrappedStateStore logging = (WrappedStateStore)
((WrappedStateStore) store).wrapped();
+ assertThat(innerKeyValueStore.name(), equalTo(store.name()));
+ assertThat(store,
instanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class));
+ assertThat(logging,
instanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class));
+ }
+
@SuppressWarnings("unchecked")
private TimestampedKeyValueStore<String, String> getTimestampedStore(
final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized) {
@@ -273,4 +341,12 @@ public class KeyValueStoreMaterializerTest {
materializer.configure(streamsConfig);
return (VersionedKeyValueStore<String, String>)
materializer.builder().build();
}
+
+ @SuppressWarnings("unchecked")
+ private TimestampedKeyValueStoreWithHeaders<String, String>
getHeadersAwareStore(
+ final MaterializedInternal<String, String, KeyValueStore<Bytes,
byte[]>> materialized) {
+ final KeyValueStoreMaterializer<String, String> materializer = new
KeyValueStoreMaterializer<>(materialized);
+ materializer.configure(streamsConfig);
+ return (TimestampedKeyValueStoreWithHeaders<String, String>)
materializer.builder().build();
+ }
}
\ No newline at end of file