This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b603b5306e4 KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders
(N/N) (#21529)
b603b5306e4 is described below
commit b603b5306e4d1cc837c709886a35f652a7d337b1
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Feb 25 14:38:17 2026 +0100
KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (N/N) (#21529)
This PR implements the upgrade integration tests for
`TimestampedKeyValueStoreWithHeaders` introduced in kip-1271.
---
.../HeadersStoreUpgradeIntegrationTest.java | 352 +++++++++++++++++++++
.../TimestampedKeyValueStoreWithHeadersTest.java | 6 +-
...=> TimestampedStoreUpgradeIntegrationTest.java} | 2 +-
.../processor/internals/StateManagerUtil.java | 13 +-
.../state/internals/CachingKeyValueStore.java | 3 +
.../streams/state/internals/RecordConverters.java | 69 ++++
...TimestampedKeyValueStoreBuilderWithHeaders.java | 2 +-
.../streams/state/internals/WrappedStateStore.java | 11 +
.../state/internals/RecordConvertersTest.java | 21 ++
...stampedKeyValueStoreBuilderWithHeadersTest.java | 33 --
10 files changed, 471 insertions(+), 41 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
new file mode 100644
index 00000000000..9b6371f5a19
--- /dev/null
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HeadersStoreUpgradeIntegrationTest.java
@@ -0,0 +1,352 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.apache.kafka.streams.KafkaStreams;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.StreamsBuilder;
+import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
+import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.state.QueryableStoreTypes;
+import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
+import org.apache.kafka.streams.state.Stores;
+import org.apache.kafka.streams.state.TimestampedKeyValueStore;
+import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
+import org.apache.kafka.streams.state.ValueAndTimestamp;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Tag;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestInfo;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Properties;
+
+import static java.util.Collections.singletonList;
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
+
+@Tag("integration")
+public class HeadersStoreUpgradeIntegrationTest {
+ private static final String STORE_NAME = "store";
+ private String inputStream;
+
+ private KafkaStreams kafkaStreams;
+
+ public static final EmbeddedKafkaCluster CLUSTER = new
EmbeddedKafkaCluster(1);
+
+ @BeforeAll
+ public static void startCluster() throws IOException {
+ CLUSTER.start();
+ }
+
+ @AfterAll
+ public static void closeCluster() {
+ CLUSTER.stop();
+ }
+
+ public String safeTestName;
+
+ @BeforeEach
+ public void createTopics(final TestInfo testInfo) throws Exception {
+ safeTestName = safeUniqueTestName(testInfo);
+ inputStream = "input-stream-" + safeTestName;
+ CLUSTER.createTopic(inputStream);
+ }
+
+ private Properties props() {
+ final Properties streamsConfiguration = new Properties();
+ streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "app-" +
safeTestName);
+ streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+
streamsConfiguration.put(StreamsConfig.STATESTORE_CACHE_MAX_BYTES_CONFIG, 0);
+ streamsConfiguration.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+ streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
+ streamsConfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG,
1000L);
+ streamsConfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
"earliest");
+ return streamsConfiguration;
+ }
+
+ @AfterEach
+ public void shutdown() {
+ if (kafkaStreams != null) {
+ kafkaStreams.close(Duration.ofSeconds(30L));
+ kafkaStreams.cleanUp();
+ }
+ }
+
+ @Test
+ public void
shouldMigrateInMemoryTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
throws Exception {
+
shouldMigrateTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(false);
+ }
+
+ @Test
+ public void
shouldMigratePersistentTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
throws Exception {
+
shouldMigrateTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(true);
+ }
+
+ private void
shouldMigrateTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi(final
boolean persistentStore) throws Exception {
+ final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+ streamsBuilderForOldStore.addStateStore(
+ Stores.timestampedKeyValueStoreBuilder(
+ persistentStore ?
Stores.persistentTimestampedKeyValueStore(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(),
props);
+ kafkaStreams.start();
+
+ processKeyValueAndVerifyTimestampedValue("key1", "value1", 11L);
+ processKeyValueAndVerifyTimestampedValue("key2", "value2", 22L);
+ processKeyValueAndVerifyTimestampedValue("key3", "value3", 33L);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+ final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+ streamsBuilderForNewStore.addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ persistentStore ?
Stores.persistentTimestampedKeyValueStoreWithHeaders(STORE_NAME) :
Stores.inMemoryKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(),
props);
+ kafkaStreams.start();
+
+ // Verify legacy data can be read with empty headers
+ verifyLegacyValuesWithEmptyHeaders("key1", "value1", 11L);
+ verifyLegacyValuesWithEmptyHeaders("key2", "value2", 22L);
+ verifyLegacyValuesWithEmptyHeaders("key3", "value3", 33L);
+
+ // Process new records with headers
+ final Headers headers = new RecordHeaders();
+ headers.add("source", "test".getBytes());
+
+ processKeyValueWithTimestampAndHeadersAndVerify("key3", "value3",
333L, headers, headers);
+ processKeyValueWithTimestampAndHeadersAndVerify("key4new", "value4",
444L, headers, headers);
+
+ kafkaStreams.close();
+ }
+
+ @Test
+ public void
shouldProxyTimestampedKeyValueStoreToTimestampedKeyValueStoreWithHeadersUsingPapi()
throws Exception {
+ final StreamsBuilder streamsBuilderForOldStore = new StreamsBuilder();
+
+ streamsBuilderForOldStore.addStateStore(
+ Stores.timestampedKeyValueStoreBuilder(
+ Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedKeyValueProcessor::new, STORE_NAME);
+
+ final Properties props = props();
+ kafkaStreams = new KafkaStreams(streamsBuilderForOldStore.build(),
props);
+ kafkaStreams.start();
+
+ processKeyValueAndVerifyTimestampedValue("key1", "value1", 11L);
+ processKeyValueAndVerifyTimestampedValue("key2", "value2", 22L);
+ processKeyValueAndVerifyTimestampedValue("key3", "value3", 33L);
+
+ kafkaStreams.close();
+ kafkaStreams = null;
+
+
+
+ final StreamsBuilder streamsBuilderForNewStore = new StreamsBuilder();
+
+ streamsBuilderForNewStore.addStateStore(
+ Stores.timestampedKeyValueStoreBuilderWithHeaders(
+ Stores.persistentTimestampedKeyValueStore(STORE_NAME),
+ Serdes.String(),
+ Serdes.String()))
+ .stream(inputStream, Consumed.with(Serdes.String(),
Serdes.String()))
+ .process(TimestampedKeyValueWithHeadersProcessor::new, STORE_NAME);
+
+ kafkaStreams = new KafkaStreams(streamsBuilderForNewStore.build(),
props);
+ kafkaStreams.start();
+
+ // Verify legacy data can be read with empty headers
+ verifyLegacyValuesWithEmptyHeaders("key1", "value1", 11L);
+ verifyLegacyValuesWithEmptyHeaders("key2", "value2", 22L);
+ verifyLegacyValuesWithEmptyHeaders("key3", "value3", 33L);
+
+ // Process new records with headers
+ final RecordHeaders headers = new RecordHeaders();
+ headers.add("source", "proxy-test".getBytes());
+ final Headers expectedHeaders = new RecordHeaders();
+
+ processKeyValueWithTimestampAndHeadersAndVerify("key3", "value3",
333L, headers, expectedHeaders);
+ processKeyValueWithTimestampAndHeadersAndVerify("key4new", "value4",
444L, headers, expectedHeaders);
+
+ kafkaStreams.close();
+ }
+
+ private <K, V> void processKeyValueAndVerifyTimestampedValue(final K key,
+ final V value,
+ final long
timestamp)
+ throws Exception {
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, ValueAndTimestamp<V>> store
=
+ IntegrationTestUtils.getStore(STORE_NAME,
kafkaStreams, QueryableStoreTypes.timestampedKeyValueStore());
+
+ if (store == null) {
+ return false;
+ }
+
+ final ValueAndTimestamp<V> result = store.get(key);
+ return result != null && result.value().equals(value) &&
result.timestamp() == timestamp;
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
+ }
+ },
+ 60_000L,
+ "Could not get expected result in time.");
+ }
+
+ private <K, V> void processKeyValueWithTimestampAndHeadersAndVerify(final
K key,
+ final
V value,
+ final
long timestamp,
+ final
Headers headers,
+ final
Headers expectedHeaders)
+ throws Exception {
+
+ IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
+ inputStream,
+ singletonList(KeyValue.pair(key, value)),
+ TestUtils.producerConfig(CLUSTER.bootstrapServers(),
+ StringSerializer.class,
+ StringSerializer.class),
+ headers,
+ timestamp,
+ false);
+
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
+ .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.keyValueStore());
+
+ if (store == null)
+ return false;
+
+ final ValueTimestampHeaders<V> result = store.get(key);
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp
+ && result.headers().equals(expectedHeaders);
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
+ }
+ },
+ 60_000L,
+ "Could not get expected result in time.");
+ }
+
+ private <K, V> void verifyLegacyValuesWithEmptyHeaders(final K key,
+ final V value,
+ final long
timestamp) throws Exception {
+ TestUtils.waitForCondition(
+ () -> {
+ try {
+ final ReadOnlyKeyValueStore<K, ValueTimestampHeaders<V>>
store = IntegrationTestUtils
+ .getStore(STORE_NAME, kafkaStreams,
QueryableStoreTypes.keyValueStore());
+
+ if (store == null)
+ return false;
+
+ final ValueTimestampHeaders<V> result = store.get(key);
+ return result != null
+ && result.value().equals(value)
+ && result.timestamp() == timestamp
+ && result.headers().toArray().length == 0;
+ } catch (final Exception swallow) {
+ swallow.printStackTrace();
+ System.err.println(swallow.getMessage());
+ return false;
+ }
+ },
+ 60_000L,
+ "Could not get expected result in time.");
+ }
+
+ private static class TimestampedKeyValueProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedKeyValueStore<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ store.put(record.key(), ValueAndTimestamp.make(record.value(),
record.timestamp()));
+ }
+ }
+
+ private static class TimestampedKeyValueWithHeadersProcessor implements
Processor<String, String, Void, Void> {
+ private TimestampedKeyValueStoreWithHeaders<String, String> store;
+
+ @Override
+ public void init(final ProcessorContext<Void, Void> context) {
+ store = context.getStateStore(STORE_NAME);
+ }
+
+ @Override
+ public void process(final Record<String, String> record) {
+ store.put(record.key(), ValueTimestampHeaders.make(record.value(),
record.timestamp(), record.headers()));
+ }
+ }
+}
\ No newline at end of file
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
index 767615bd105..b7cbdb2d82d 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedKeyValueStoreWithHeadersTest.java
@@ -428,9 +428,9 @@ public class TimestampedKeyValueStoreWithHeadersTest {
@SuppressWarnings("varargs")
@SafeVarargs
private final int produceDataToTopicWithHeaders(final String topic,
- final long timestamp,
- final Headers headers,
- final KeyValue<Integer,
String>... keyValues) {
+ final long timestamp,
+ final Headers headers,
+ final KeyValue<Integer,
String>... keyValues) {
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
topic,
Arrays.asList(keyValues),
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedStoreUpgradeIntegrationTest.java
similarity index 99%
rename from
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
rename to
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedStoreUpgradeIntegrationTest.java
index cb248138d0e..dbeaa628d4c 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/StoreUpgradeIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/TimestampedStoreUpgradeIntegrationTest.java
@@ -62,7 +62,7 @@ import static java.util.Collections.singletonList;
import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
@Tag("integration")
-public class StoreUpgradeIntegrationTest {
+public class TimestampedStoreUpgradeIntegrationTest {
private static final String STORE_NAME = "store";
private String inputStream;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index 6990bd29029..6706ed89543 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -35,7 +35,9 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import static
org.apache.kafka.streams.state.internals.RecordConverters.identity;
+import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToHeadersValue;
import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
+import static
org.apache.kafka.streams.state.internals.WrappedStateStore.isHeadersAware;
import static
org.apache.kafka.streams.state.internals.WrappedStateStore.isTimestamped;
import static
org.apache.kafka.streams.state.internals.WrappedStateStore.isVersioned;
@@ -50,9 +52,14 @@ final class StateManagerUtil {
private StateManagerUtil() {}
static RecordConverter converterForStore(final StateStore store) {
- // should not prepend timestamp when restoring records for versioned
store, as
- // timestamp is used separately during put() process for restore of
versioned stores
- return (isTimestamped(store) && !isVersioned(store)) ?
rawValueToTimestampedValue() : identity();
+ if (isHeadersAware(store)) {
+ return rawValueToHeadersValue();
+ } else if (isTimestamped(store) && !isVersioned(store)) {
+ // should not prepend timestamp when restoring records for
versioned store, as
+ // timestamp is used separately during put() process for restore
of versioned stores
+ return rawValueToTimestampedValue();
+ }
+ return identity();
}
static boolean checkpointNeeded(final boolean enforceCheckpoint,
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index e8588d9146a..0e31ef87fa7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -177,6 +177,9 @@ public class CachingKeyValueStore
final Position mergedPosition,
final PositionBound positionBound,
final QueryConfig config) {
+ if (cacheType == CacheType.TIMESTAMPED_KEY_VALUE_STORE_WITH_HEADERS) {
+ throw new UnsupportedOperationException("Queries (IQv2) are not
supported for timestamped key-value stores with headers yet.");
+ }
QueryResult<R> result = null;
final KeyQuery<Bytes, byte[]> keyQuery = (KeyQuery<Bytes, byte[]>)
query;
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
index ad3c91e8073..8eb7be4f5de 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/RecordConverters.java
@@ -17,7 +17,14 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.utils.ByteUtils;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.nio.ByteBuffer;
public final class RecordConverters {
@@ -46,6 +53,35 @@ public final class RecordConverters {
);
};
+ private static final RecordConverter RAW_TO_WITH_HEADERS_INSTANCE = record
-> {
+ final byte[] rawValue = record.value();
+
+ // Format: [headersSize(varint)][headersBytes][timestamp(8)][value]
+ final byte[] recordValue = reconstructFromRaw(
+ rawValue,
+ record.timestamp(),
+ record.headers()
+ );
+
+ return new ConsumerRecord<>(
+ record.topic(),
+ record.partition(),
+ record.offset(),
+ record.timestamp(),
+ record.timestampType(),
+ record.serializedKeySize(),
+ record.serializedValueSize(),
+ record.key(),
+ recordValue,
+ record.headers(),
+ record.leaderEpoch()
+ );
+ };
+
+ public static RecordConverter rawValueToHeadersValue() {
+ return RAW_TO_WITH_HEADERS_INSTANCE;
+ }
+
// privatize the constructor so the class cannot be instantiated (only
used for its static members)
private RecordConverters() {}
@@ -56,4 +92,37 @@ public final class RecordConverters {
public static RecordConverter identity() {
return IDENTITY_INSTANCE;
}
+
+ /**
+ * Reconstructs the ValueTimestampHeaders format from raw value bytes,
timestamp, and headers.
+ * Used during state restoration from changelog topics.
+ *
+ * @param rawValue the raw value bytes
+ * @param timestamp the timestamp
+ * @param headers the headers
+ * @return the serialized ValueTimestampHeaders format
+ */
+ static byte[] reconstructFromRaw(final byte[] rawValue, final long
timestamp, final Headers headers) {
+ if (rawValue == null) {
+ return null;
+ }
+ final byte[] rawTimestamp;
+ try (LongSerializer timestampSerializer = new LongSerializer()) {
+ rawTimestamp = timestampSerializer.serialize("", timestamp);
+ }
+ final byte[] rawHeaders = HeadersSerializer.serialize(headers);
+
+ try (final ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ final DataOutputStream out = new DataOutputStream(baos)) {
+
+ ByteUtils.writeVarint(rawHeaders.length, out);
+ out.write(rawHeaders);
+ out.write(rawTimestamp);
+ out.write(rawValue);
+
+ return baos.toByteArray();
+ } catch (final IOException e) {
+ throw new SerializationException("Failed to reconstruct
ValueTimestampHeaders", e);
+ }
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
index 0c09eb2027d..be9ecb38bc9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeaders.java
@@ -179,7 +179,7 @@ public class TimestampedKeyValueStoreBuilderWithHeaders<K,
V>
@Override
public Position getPosition() {
- throw new UnsupportedOperationException("Position is not supported
by timestamped key-value stores with headers yet.");
+ return wrapped().getPosition();
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
index 0f0f7ee62ee..d1ce967bcc8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/WrappedStateStore.java
@@ -25,6 +25,7 @@ import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
+import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.TimestampedBytesStore;
import org.apache.kafka.streams.state.VersionedBytesStore;
@@ -55,6 +56,16 @@ public abstract class WrappedStateStore<S extends
StateStore, K, V> implements S
}
}
+ public static boolean isHeadersAware(final StateStore stateStore) {
+ if (stateStore instanceof HeadersBytesStore) {
+ return true;
+ } else if (stateStore instanceof WrappedStateStore) {
+ return isHeadersAware(((WrappedStateStore<?, ?, ?>)
stateStore).wrapped());
+ } else {
+ return false;
+ }
+ }
+
private final S wrapped;
public WrappedStateStore(final S wrapped) {
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
index 2f222ed6a8f..93d08c00293 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/RecordConvertersTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.record.TimestampType;
@@ -25,6 +26,7 @@ import org.junit.jupiter.api.Test;
import java.nio.ByteBuffer;
import java.util.Optional;
+import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToHeadersValue;
import static
org.apache.kafka.streams.state.internals.RecordConverters.rawValueToTimestampedValue;
import static org.junit.jupiter.api.Assertions.assertArrayEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
@@ -32,11 +34,14 @@ import static org.junit.jupiter.api.Assertions.assertNull;
public class RecordConvertersTest {
private final RecordConverter timestampedValueConverter =
rawValueToTimestampedValue();
+ private final RecordConverter headersValueConverter =
rawValueToHeadersValue();
+
@Test
public void shouldPreserveNullValueOnConversion() {
final ConsumerRecord<byte[], byte[]> nullValueRecord = new
ConsumerRecord<>("", 0, 0L, new byte[0], null);
assertNull(timestampedValueConverter.convert(nullValueRecord).value());
+ assertNull(headersValueConverter.convert(nullValueRecord).value());
}
@Test
@@ -50,4 +55,20 @@ public class RecordConvertersTest {
final byte[] actualValue =
timestampedValueConverter.convert(inputRecord).value();
assertArrayEquals(expectedValue, actualValue);
}
+
+ @Test
+ public void
shouldAddTimestampAndHeadersToValueOnConversionWhenValueIsNotNull() {
+ final long timestamp = 10L;
+ final byte[] value = new byte[1];
+ final Headers headers = new RecordHeaders().add("header-key",
"header-value".getBytes());
+ final ConsumerRecord<byte[], byte[]> inputRecord = new
ConsumerRecord<>(
+ "topic", 1, 0, timestamp, TimestampType.CREATE_TIME, 0, 0, new
byte[0], value,
+ headers, Optional.empty());
+ // Expected format:
[headersSize(varint)][headersBytes][timestamp(8)][value]
+ final byte[] expectedValue =
+ {50, 2, 20, 104, 101, 97, 100, 101, 114, 45, 107, 101, 121, 24,
104, 101, 97, 100, 101,
+ 114, 45, 118, 97, 108, 117, 101, 0, 0, 0, 0, 0, 0, 0, 10, 0};
+ final byte[] actualValue =
headersValueConverter.convert(inputRecord).value();
+ assertArrayEquals(expectedValue, actualValue);
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
index 3929a11667a..a49e85bbb1a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimestampedKeyValueStoreBuilderWithHeadersTest.java
@@ -23,9 +23,7 @@ import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryConfig;
-import org.apache.kafka.streams.state.HeadersBytesStore;
import org.apache.kafka.streams.state.KeyValueBytesStoreSupplier;
-import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.TimestampedKeyValueStoreWithHeaders;
import org.apache.kafka.streams.state.ValueTimestampHeaders;
@@ -336,37 +334,6 @@ public class
TimestampedKeyValueStoreBuilderWithHeadersTest {
assertTrue(exception.getMessage().contains("Position is not supported
by timestamped key-value stores with headers yet."));
}
- @Test
- public void shouldThrowOnGetPositionForInMemoryStoreMarker() {
- when(supplier.name()).thenReturn("test-store");
- when(supplier.metricsScope()).thenReturn("metricScope");
- when(supplier.get()).thenReturn(new
InMemoryKeyValueStore("test-store"));
-
- builder = new TimestampedKeyValueStoreBuilderWithHeaders<>(
- supplier,
- Serdes.String(),
- Serdes.String(),
- new MockTime()
- );
-
- final TimestampedKeyValueStoreWithHeaders<String, String> store =
builder
- .withLoggingDisabled()
- .withCachingDisabled()
- .build();
-
- // Unwrap to get directly to the
InMemoryTimestampedKeyValueStoreWithHeadersMarker
- final StateStore wrapped = ((WrappedStateStore) store).wrapped();
- assertInstanceOf(KeyValueStore.class, wrapped);
- assertInstanceOf(HeadersBytesStore.class, wrapped);
-
- final UnsupportedOperationException exception = assertThrows(
- UnsupportedOperationException.class,
- wrapped::getPosition
- );
-
- assertTrue(exception.getMessage().contains("Position is not supported
by timestamped key-value stores with headers yet."));
- }
-
@Test
public void shouldThrowOnGetPositionForHeadersStoreAdapter() {
when(supplier.name()).thenReturn("test-store");