This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9f22628b8fe KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL 
(1/N) (#21572)
9f22628b8fe is described below

commit 9f22628b8fe3b9c06e2b90f994d2e2dc2c3302d5
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Feb 27 14:40:37 2026 +0100

    KAFKA-20220: Enable TimestampedKVStoreWithHeaders in DSL (1/N) (#21572)
    
    This PR introduces the DslStoreFormat enum and extends DslKeyValueParams
    to enable headers-aware key-value stores in the Kafka Streams DSL,
    implementing the foundational infrastructure for  [
    
    
KIP-1285](https://cwiki.apache.org/confluence/display/KAFKA/KIP-1285%3A+DSL+Opt-in+Support+for+Headers-Aware+State+Stores).
    
    Reviewers: Matthias J. Sax <[email protected]>, TengYao Chi
     <[email protected]>
---
 .../org/apache/kafka/streams/DslStoreFormat.java   | 47 +++++++++++++
 .../org/apache/kafka/streams/StreamsConfig.java    | 14 ++++
 .../AbstractConfigurableStoreFactory.java          | 11 ++++
 .../internals/KeyValueStoreMaterializer.java       | 19 ++++--
 .../internals/OuterStreamJoinStoreFactory.java     |  4 +-
 .../internals/SubscriptionStoreFactory.java        |  4 +-
 .../streams/state/BuiltInDslStoreSuppliers.java    | 16 ++++-
 .../kafka/streams/state/DslKeyValueParams.java     | 37 ++++++++++-
 .../apache/kafka/streams/StreamsConfigTest.java    | 59 +++++++++++++++--
 .../internals/KeyValueStoreMaterializerTest.java   | 76 ++++++++++++++++++++++
 10 files changed, 271 insertions(+), 16 deletions(-)

diff --git a/streams/src/main/java/org/apache/kafka/streams/DslStoreFormat.java 
b/streams/src/main/java/org/apache/kafka/streams/DslStoreFormat.java
new file mode 100644
index 00000000000..02bfdfad743
--- /dev/null
+++ b/streams/src/main/java/org/apache/kafka/streams/DslStoreFormat.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams;
+
+import java.util.Locale;
+
+public enum DslStoreFormat {
+
+    /** The non-timestamped state stores */
+    PLAIN("PLAIN"),
+
+    /** The timestamped state stores */
+    TIMESTAMPED("TIMESTAMPED"),
+
+    /** The headers-aware state stores */
+    HEADERS("HEADERS");
+
+    /**
+     * String representation of the DSL store format.
+     */
+    public final String name;
+
+    DslStoreFormat(final String name) {
+        this.name = name;
+    }
+
+    /**
+     * Case-insensitive DSL store format lookup by string name.
+     */
+    public static DslStoreFormat of(final String name) {
+        return DslStoreFormat.valueOf(name.toUpperCase(Locale.ROOT));
+    }
+}
diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java 
b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
index 8f9e326b114..91f34111abd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
+++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java
@@ -537,6 +537,14 @@ public class StreamsConfig extends AbstractConfig {
     static final String DSL_STORE_SUPPLIERS_CLASS_DOC = "Defines which store 
implementations to plug in to DSL operators. Must implement the 
<code>org.apache.kafka.streams.state.DslStoreSuppliers</code> interface.";
     static final Class<?> DSL_STORE_SUPPLIERS_CLASS_DEFAULT = 
BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
 
+    /** {@code dsl.store.suppliers.class } */
+    public static final String DSL_STORE_FORMAT_CONFIG = "dsl.store.format";
+    public static final String DSL_STORE_FORMAT_DEFAULT = "DEFAULT";
+    public static final String DSL_STORE_FORMAT_HEADERS = "HEADERS";
+    private static final String DSL_STORE_FORMAT_DOC = "Specifies the state 
store format for DSL operators. " +
+        "'DEFAULT' creates either timestamped or plain state stores, depending 
on context. " +
+        "'HEADERS' creates headers-aware stores that preserve record headers.";
+
     /** {@code default key.serde} */
     @SuppressWarnings("WeakerAccess")
     public static final String DEFAULT_KEY_SERDE_CLASS_CONFIG = 
"default.key.serde";
@@ -1112,6 +1120,12 @@ public class StreamsConfig extends AbstractConfig {
                     DSL_STORE_SUPPLIERS_CLASS_DEFAULT,
                     Importance.LOW,
                     DSL_STORE_SUPPLIERS_CLASS_DOC)
+            .define(DSL_STORE_FORMAT_CONFIG,
+                    Type.STRING,
+                    DSL_STORE_FORMAT_DEFAULT,
+                    
ConfigDef.CaseInsensitiveValidString.in(DSL_STORE_FORMAT_DEFAULT, 
DSL_STORE_FORMAT_HEADERS),
+                    Importance.LOW,
+                    DSL_STORE_FORMAT_DOC)
             .define(DEFAULT_CLIENT_SUPPLIER_CONFIG,
                     Type.CLASS,
                     DefaultKafkaClientSupplier.class.getName(),
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
index 8fd766a93c6..6dca8141945 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/AbstractConfigurableStoreFactory.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.streams.DslStoreFormat;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.DslStoreSuppliers;
@@ -26,6 +27,7 @@ import java.util.Set;
 public abstract class AbstractConfigurableStoreFactory implements StoreFactory 
{
     private final Set<String> connectedProcessorNames = new HashSet<>();
     private DslStoreSuppliers dslStoreSuppliers;
+    private DslStoreFormat dslStoreFormat;
 
     public AbstractConfigurableStoreFactory(final DslStoreSuppliers 
initialStoreSuppliers) {
         this.dslStoreSuppliers = initialStoreSuppliers;
@@ -40,6 +42,11 @@ public abstract class AbstractConfigurableStoreFactory 
implements StoreFactory {
                 config.originals()
             );
         }
+        final String dslStoreFormatValue = 
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+        if 
(dslStoreFormatValue.equalsIgnoreCase(StreamsConfig.DSL_STORE_FORMAT_HEADERS)) {
+            dslStoreFormat = DslStoreFormat.HEADERS;
+        }
+        // else dslStoreFormat remains null and the lower layers decide 
between PLAIN and TIMESTAMPED
     }
 
     @Override
@@ -47,6 +54,10 @@ public abstract class AbstractConfigurableStoreFactory 
implements StoreFactory {
         return connectedProcessorNames;
     }
 
+    public DslStoreFormat dslStoreFormat() {
+        return dslStoreFormat;
+    }
+
     protected DslStoreSuppliers dslStoreSuppliers() {
         if (dslStoreSuppliers == null) {
             throw new IllegalStateException("Expected configure() to be called 
before using dslStoreSuppliers");
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
index d59d34e0e90..434b23e7871 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KeyValueStoreMaterializer.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.DslStoreFormat;
 import org.apache.kafka.streams.state.DslKeyValueParams;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
@@ -44,8 +45,9 @@ public class KeyValueStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
 
     @Override
     public StoreBuilder<?> builder() {
+        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.TIMESTAMPED : DslStoreFormat.HEADERS;
         final KeyValueBytesStoreSupplier supplier = 
materialized.storeSupplier() == null
-                ? dslStoreSuppliers().keyValueStore(new 
DslKeyValueParams(materialized.storeName(), true))
+                ? dslStoreSuppliers().keyValueStore(new 
DslKeyValueParams(materialized.storeName(), storeFormat))
                 : (KeyValueBytesStoreSupplier) materialized.storeSupplier();
 
         final StoreBuilder<?> builder;
@@ -55,10 +57,17 @@ public class KeyValueStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
                     materialized.keySerde(),
                     materialized.valueSerde());
         } else {
-            builder = Stores.timestampedKeyValueStoreBuilder(
+            if (storeFormat == DslStoreFormat.HEADERS) {
+                builder = Stores.timestampedKeyValueStoreBuilderWithHeaders(
                     supplier,
                     materialized.keySerde(),
                     materialized.valueSerde());
+            } else {
+                builder = Stores.timestampedKeyValueStoreBuilder(
+                    supplier,
+                    materialized.keySerde(),
+                    materialized.valueSerde());
+            }
         }
 
         if (materialized.loggingEnabled()) {
@@ -68,10 +77,10 @@ public class KeyValueStoreMaterializer<K, V> extends 
MaterializedStoreFactory<K,
         }
 
         if (materialized.cachingEnabled()) {
-            if (!(builder instanceof VersionedKeyValueStoreBuilder)) {
-                builder.withCachingEnabled();
-            } else {
+            if (builder instanceof VersionedKeyValueStoreBuilder) {
                 LOG.info("Not enabling caching for store '{}' as versioned 
stores do not support caching.", supplier.name());
+            } else {
+                builder.withCachingEnabled();
             }
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
index 645858d1a65..e7194ff89ea 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/OuterStreamJoinStoreFactory.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.streams.DslStoreFormat;
 import org.apache.kafka.streams.kstream.JoinWindows;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.DslKeyValueParams;
@@ -95,7 +96,8 @@ public class OuterStreamJoinStoreFactory<K, V1, V2> extends 
AbstractConfigurable
         final TimestampedKeyAndJoinSideSerde<K> timestampedKeyAndJoinSideSerde 
= new TimestampedKeyAndJoinSideSerde<>(streamJoined.keySerde());
         final LeftOrRightValueSerde<V1, V2> leftOrRightValueSerde = new 
LeftOrRightValueSerde<>(streamJoined.valueSerde(), 
streamJoined.otherValueSerde());
 
-        final DslKeyValueParams dslKeyValueParams = new 
DslKeyValueParams(name, false);
+        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.PLAIN : DslStoreFormat.HEADERS;
+        final DslKeyValueParams dslKeyValueParams = new 
DslKeyValueParams(name, storeFormat);
         final KeyValueBytesStoreSupplier supplier;
 
         if (passedInDslStoreSuppliers != null) {
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
index 10c8a5e110c..dbaa008cf0f 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/SubscriptionStoreFactory.java
@@ -19,6 +19,7 @@ package org.apache.kafka.streams.kstream.internals;
 
 import org.apache.kafka.common.serialization.Serde;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.DslStoreFormat;
 import 
org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapper;
 import org.apache.kafka.streams.processor.internals.StoreFactory;
 import org.apache.kafka.streams.state.DslKeyValueParams;
@@ -46,8 +47,9 @@ public class SubscriptionStoreFactory<K> extends 
AbstractConfigurableStoreFactor
     @Override
     public StoreBuilder<?> builder() {
         StoreBuilder<?> builder;
+        final DslStoreFormat storeFormat = dslStoreFormat() == null ? 
DslStoreFormat.TIMESTAMPED : DslStoreFormat.HEADERS;
         builder = Stores.timestampedKeyValueStoreBuilder(
-            dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, 
true)),
+            dslStoreSuppliers().keyValueStore(new DslKeyValueParams(name, 
storeFormat)),
             new Serdes.BytesSerde(),
             subscriptionWrapperSerde
         );
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
index eedcafd7c79..cfe95d515e7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/BuiltInDslStoreSuppliers.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.DslStoreFormat;
 import org.apache.kafka.streams.kstream.EmitStrategy;
 import 
org.apache.kafka.streams.state.internals.RocksDbIndexedTimeOrderedWindowBytesStoreSupplier;
 import 
org.apache.kafka.streams.state.internals.RocksDbTimeOrderedSessionBytesStoreSupplier;
@@ -36,9 +37,18 @@ public class BuiltInDslStoreSuppliers {
 
         @Override
         public KeyValueBytesStoreSupplier keyValueStore(final 
DslKeyValueParams params) {
-            return params.isTimestamped()
-                    ? Stores.persistentTimestampedKeyValueStore(params.name())
-                    : Stores.persistentKeyValueStore(params.name());
+            final DslStoreFormat storeFormat = params.dslStoreFormat();
+            switch (storeFormat) {
+                case HEADERS:
+                    return 
Stores.persistentTimestampedKeyValueStoreWithHeaders(params.name());
+                case TIMESTAMPED:
+                    return 
Stores.persistentTimestampedKeyValueStore(params.name());
+                case PLAIN:
+                    return Stores.persistentKeyValueStore(params.name());
+                default:
+                    throw new IllegalArgumentException("Unsupported 
DslStoreFormat: " + storeFormat +
+                        ". Expected one of: HEADERS, TIMESTAMPED, or PLAIN");
+            }
         }
 
         @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java 
b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
index 7447d6c711f..73258fe59b4 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/DslKeyValueParams.java
@@ -16,6 +16,8 @@
  */
 package org.apache.kafka.streams.state;
 
+import org.apache.kafka.streams.DslStoreFormat;
+
 import java.util.Objects;
 
 /**
@@ -26,23 +28,52 @@ public class DslKeyValueParams {
 
     private final String name;
     private final boolean isTimestamped;
+    private final DslStoreFormat dslStoreFormat;
 
     /**
+     * @deprecated Since 4.3. Use {@link #DslKeyValueParams(String, 
DslStoreFormat)} instead.
      * @param name          the name of the store (cannot be {@code null})
      * @param isTimestamped whether the returned stores should be timestamped, 
see ({@link TimestampedKeyValueStore}
      */
+    @Deprecated
     public DslKeyValueParams(final String name, final boolean isTimestamped) {
         Objects.requireNonNull(name);
         this.name = name;
         this.isTimestamped = isTimestamped;
+        // If isTimestamped is false and the user is still calling the old 
deprecated constructor, we should assume they mean plain.
+        this.dslStoreFormat = isTimestamped ? DslStoreFormat.TIMESTAMPED : 
DslStoreFormat.PLAIN;
+    }
+
+    /**
+     * @param name           the name of the store (cannot be {@code null})
+     * @param dslStoreFormat the format of the state store, see ({@link 
DslStoreFormat}
+     */
+    public DslKeyValueParams(final String name, final DslStoreFormat 
dslStoreFormat) {
+        this.name =  Objects.requireNonNull(name);
+        this.dslStoreFormat = Objects.requireNonNull(dslStoreFormat);
+        this.isTimestamped = dslStoreFormat == DslStoreFormat.TIMESTAMPED;
     }
 
     public String name() {
         return name;
     }
 
+    /**
+     * @deprecated Since 4.3. Use {@link #dslStoreFormat()} instead to check 
the store format.
+     * @return {@code true} if the store format is {@link 
DslStoreFormat#TIMESTAMPED}, {@code false} otherwise
+     */
+    @Deprecated
     public boolean isTimestamped() {
-        return isTimestamped;
+        return dslStoreFormat == DslStoreFormat.TIMESTAMPED;
+    }
+
+    /**
+     * Returns the store format for this key-value store.
+     *
+     * @return the {@link DslStoreFormat} specifying whether to use plain, 
timestamped, or headers-aware stores
+     */
+    public DslStoreFormat dslStoreFormat() {
+        return dslStoreFormat;
     }
 
     @Override
@@ -55,12 +86,13 @@ public class DslKeyValueParams {
         }
         final DslKeyValueParams that = (DslKeyValueParams) o;
         return isTimestamped == that.isTimestamped
+                && dslStoreFormat == that.dslStoreFormat
                 && Objects.equals(name, that.name);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(name, isTimestamped);
+        return Objects.hash(name, isTimestamped, dslStoreFormat);
     }
 
     @Override
@@ -68,6 +100,7 @@ public class DslKeyValueParams {
         return "DslKeyValueParams{" +
                 "name='" + name + '\'' +
                 "isTimestamped=" + isTimestamped +
+                "dslStoreFormat=" + dslStoreFormat +
                 '}';
     }
 }
\ No newline at end of file
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java 
b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
index d9fbef48929..d58909094cf 100644
--- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java
@@ -966,7 +966,7 @@ public class StreamsConfigTest {
     @Test
     public void shouldSpecifyRocksdbWhenNotExplicitlyAddedToConfigs() {
         final String expectedDefaultStoreType = StreamsConfig.ROCKS_DB;
-        final String actualDefaultStoreType = 
streamsConfig.getString(org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG);
+        final String actualDefaultStoreType = 
streamsConfig.getString(StreamsConfig.DEFAULT_DSL_STORE_CONFIG);
         assertEquals(expectedDefaultStoreType, actualDefaultStoreType, 
"default.dsl.store should be \"rocksDB\"");
     }
 
@@ -974,19 +974,70 @@ public class StreamsConfigTest {
     @Test
     public void shouldSpecifyInMemoryWhenExplicitlyAddedToConfigs() {
         final String expectedDefaultStoreType = StreamsConfig.IN_MEMORY;
-        
props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG, 
expectedDefaultStoreType);
+        props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, 
expectedDefaultStoreType);
         final StreamsConfig config = new StreamsConfig(props);
-        final String actualDefaultStoreType = 
config.getString(org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG);
+        final String actualDefaultStoreType = 
config.getString(StreamsConfig.DEFAULT_DSL_STORE_CONFIG);
         assertEquals(expectedDefaultStoreType, actualDefaultStoreType, 
"default.dsl.store should be \"in_memory\"");
     }
 
     @Deprecated
     @Test
     public void shouldThrowConfigExceptionWhenStoreTypeConfigNotValueInRange() 
{
-        
props.put(org.apache.kafka.streams.StreamsConfig.DEFAULT_DSL_STORE_CONFIG, 
"bad_config");
+        props.put(StreamsConfig.DEFAULT_DSL_STORE_CONFIG, "bad_config");
         assertThrows(ConfigException.class, () -> new StreamsConfig(props));
     }
 
+    @Test
+    public void shouldUseDefaultStoreFormatWhenNotSpecified() {
+        final StreamsConfig config = new StreamsConfig(props);
+        final String actualFormat = 
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+        assertEquals("DEFAULT", actualFormat, "dsl.store.format should default 
to 'default'");
+    }
+
+    @Test
+    public void shouldAcceptValidDslStoreFormatDefault() {
+        props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "DEFAULT");
+        final StreamsConfig config = new StreamsConfig(props);
+        assertEquals("DEFAULT", 
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+    }
+
+    @Test
+    public void shouldAcceptValidDslStoreFormatHeaders() {
+        props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "HEADERS");
+        final StreamsConfig config = new StreamsConfig(props);
+        assertEquals("HEADERS", 
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+    }
+
+    @Test
+    public void shouldThrowConfigExceptionForInvalidDslStoreFormat() {
+        props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "invalid_format");
+        final ConfigException exception = assertThrows(ConfigException.class, 
() -> new StreamsConfig(props));
+        assertTrue(exception.getMessage().contains("Invalid value 
invalid_format for configuration dsl.store.format"));
+    }
+
+    @Test
+    public void shouldAcceptDslStoreFormatCaseInsensitively() {
+        props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "default");
+        StreamsConfig config = new StreamsConfig(props);
+        assertEquals("default", 
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+
+        props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "DEFAULT");
+        config = new StreamsConfig(props);
+        assertEquals("DEFAULT", 
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+
+        props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "headers");
+        config = new StreamsConfig(props);
+        assertEquals("headers", 
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+
+        props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "HEADERS");
+        config = new StreamsConfig(props);
+        assertEquals("HEADERS", 
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+
+        props.put(StreamsConfig.DSL_STORE_FORMAT_CONFIG, "HeAdErS");
+        config = new StreamsConfig(props);
+        assertEquals("HeAdErS", 
config.getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG));
+    }
+
     @Test
     public void 
shouldSpecifyRocksdbDslSupplierWhenNotExplicitlyAddedToConfigs() {
         final Class<?> expectedDefaultStoreType = 
BuiltInDslStoreSuppliers.RocksDBDslStoreSuppliers.class;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
index 7228496dd36..5b0aa745133 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/KeyValueStoreMaterializerTest.java
@@ -29,15 +29,18 @@ import org.apache.kafka.streams.state.DslStoreSuppliers;
 import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
 import org.apache.kafka.streams.state.KeyValueStore;
 import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.VersionedBytesStore;
 import org.apache.kafka.streams.state.VersionedBytesStoreSupplier;
 import org.apache.kafka.streams.state.VersionedKeyValueStore;
 import org.apache.kafka.streams.state.internals.CachingKeyValueStore;
 import 
org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore;
 import 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore;
+import 
org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders;
 import 
org.apache.kafka.streams.state.internals.ChangeLoggingVersionedKeyValueBytesStore;
 import org.apache.kafka.streams.state.internals.InMemoryKeyValueStore;
 import 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStore;
+import 
org.apache.kafka.streams.state.internals.MeteredTimestampedKeyValueStoreWithHeaders;
 import org.apache.kafka.streams.state.internals.MeteredVersionedKeyValueStore;
 import org.apache.kafka.streams.state.internals.WrappedStateStore;
 
@@ -55,6 +58,7 @@ import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.core.IsInstanceOf.instanceOf;
 import static org.hamcrest.core.IsNot.not;
 import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.lenient;
 import static org.mockito.Mockito.when;
 
 @ExtendWith(MockitoExtension.class)
@@ -87,6 +91,8 @@ public class KeyValueStoreMaterializerTest {
                     DslStoreSuppliers.class,
                     emptyMap()
             );
+        lenient().doReturn("default")
+                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
     }
 
     private void mockInnerVersionedStore() {
@@ -258,6 +264,68 @@ public class KeyValueStoreMaterializerTest {
         assertThat(innerVersionedStore, equalTo(inner));
     }
 
+    @Test
+    public void shouldCreateHeadersAwareStoreWithLoggingEnabledByDefault() {
+        doReturn("headers")
+                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
+            new MaterializedInternal<>(Materialized.<String, String, 
KeyValueStore<Bytes, byte[]>>as("store")
+                .withCachingDisabled(), nameProvider, STORE_PREFIX);
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
+
+        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        assertThat(store, 
instanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class));
+        assertThat(logging, 
instanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class));
+    }
+
+    @Test
+    public void shouldCreateHeadersAwareStoreWithLoggingDisabled() {
+        doReturn("headers")
+                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized = new MaterializedInternal<>(
+            Materialized.<String, String, KeyValueStore<Bytes, 
byte[]>>as("store").withLoggingDisabled(), nameProvider, STORE_PREFIX
+        );
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
+
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertThat(wrapped, 
not(instanceOf(ChangeLoggingKeyValueBytesStore.class)));
+    }
+
+    @Test
+    public void shouldBuildHeadersAwareStoreWithCachingEnabledByDefault() {
+        doReturn("headers")
+                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
+            new MaterializedInternal<>(Materialized.as("store"), nameProvider, 
STORE_PREFIX);
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
+
+        final StateStore wrapped = ((WrappedStateStore) store).wrapped();
+        assertThat(wrapped, instanceOf(CachingKeyValueStore.class));
+    }
+
+    @Test
+    public void 
shouldCreateHeadersAwareStoreWithProvidedSupplierAndLoggingEnabled() {
+        mockKeyValueStoreSupplier();
+        doReturn("headers")
+                
.when(streamsConfig).getString(StreamsConfig.DSL_STORE_FORMAT_CONFIG);
+
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized =
+            new MaterializedInternal<>(Materialized.<String, 
String>as(keyValueStoreSupplier).withCachingDisabled(), nameProvider, 
STORE_PREFIX);
+
+        final TimestampedKeyValueStoreWithHeaders<String, String> store = 
getHeadersAwareStore(materialized);
+
+        final WrappedStateStore logging = (WrappedStateStore) 
((WrappedStateStore) store).wrapped();
+        assertThat(innerKeyValueStore.name(), equalTo(store.name()));
+        assertThat(store, 
instanceOf(MeteredTimestampedKeyValueStoreWithHeaders.class));
+        assertThat(logging, 
instanceOf(ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.class));
+    }
+
     @SuppressWarnings("unchecked")
     private TimestampedKeyValueStore<String, String> getTimestampedStore(
         final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized) {
@@ -273,4 +341,12 @@ public class KeyValueStoreMaterializerTest {
         materializer.configure(streamsConfig);
         return (VersionedKeyValueStore<String, String>) 
materializer.builder().build();
     }
+
+    @SuppressWarnings("unchecked")
+    private TimestampedKeyValueStoreWithHeaders<String, String> 
getHeadersAwareStore(
+        final MaterializedInternal<String, String, KeyValueStore<Bytes, 
byte[]>> materialized) {
+        final KeyValueStoreMaterializer<String, String> materializer = new 
KeyValueStoreMaterializer<>(materialized);
+        materializer.configure(streamsConfig);
+        return (TimestampedKeyValueStoreWithHeaders<String, String>) 
materializer.builder().build();
+    }
 }
\ No newline at end of file

Reply via email to