This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 85c6313be0f KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders
(4/N) (#21454)
85c6313be0f is described below
commit 85c6313be0f1c1e666a428d0614dcd71ef0106d2
Author: Alieh Saeedi <[email protected]>
AuthorDate: Wed Feb 18 16:42:43 2026 +0100
KAFKA-20132: Implement TimestampedKeyValueStoreWithHeaders (4/N) (#21454)
This PR implements the changelogging layer of the
`TimestampedKeyValueStoreWithHeaders` introduced in KIP-1271.
Reviewers: TengYao Chi <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../internals/GlobalProcessorContextImpl.java | 2 +
.../internals/InternalProcessorContext.java | 2 +
.../processor/internals/ProcessorContextImpl.java | 19 +-
.../internals/ChangeLoggingKeyValueBytesStore.java | 16 +-
.../ChangeLoggingListValueBytesStore.java | 5 +-
.../internals/ChangeLoggingSessionBytesStore.java | 5 +-
...ChangeLoggingTimestampedKeyValueBytesStore.java | 7 +-
...ngTimestampedKeyValueBytesStoreWithHeaders.java | 100 ++++++
.../ChangeLoggingTimestampedWindowBytesStore.java | 2 +
.../ChangeLoggingVersionedKeyValueBytesStore.java | 10 +-
.../internals/ChangeLoggingWindowBytesStore.java | 3 +-
.../ValueTimestampHeadersDeserializer.java | 13 +
.../internals/AbstractProcessorContextTest.java | 1 +
.../internals/ProcessorContextImplTest.java | 9 +-
.../internals/WriteConsistencyVectorTest.java | 2 +-
.../ChangeLoggingSessionBytesStoreTest.java | 6 +-
...mestampedKeyValueBytesStoreWithHeadersTest.java | 343 +++++++++++++++++++++
...angeLoggingTimestampedWindowBytesStoreTest.java | 8 +-
.../ChangeLoggingWindowBytesStoreTest.java | 8 +-
.../kafka/test/InternalMockProcessorContext.java | 19 +-
.../kafka/test/MockInternalProcessorContext.java | 1 +
.../apache/kafka/test/NoOpProcessorContext.java | 2 +
22 files changed, 529 insertions(+), 54 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 4456b6b5814..2e7344cd508 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.StreamsConfig;
@@ -144,6 +145,7 @@ public class GlobalProcessorContextImpl extends
AbstractProcessorContext<Object,
final Bytes key,
final byte[] value,
final long timestamp,
+ final Headers headers,
final Position position) {
throw new UnsupportedOperationException("this should not happen:
logChange() not supported in global processor context.");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
index 368cefe5e7e..7cc236578b8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalProcessorContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.processor.internals;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.BytesSerializer;
import org.apache.kafka.common.utils.Bytes;
@@ -120,6 +121,7 @@ public interface InternalProcessorContext<KOut, VOut>
final Bytes key,
final byte[] value,
final long timestamp,
+ final Headers headers,
final Position position);
String changelogFor(final String storeName);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 26f724b75f4..a0ca6875310 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -19,7 +19,6 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.header.internals.RecordHeader;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.StreamsException;
@@ -124,20 +123,14 @@ public final class ProcessorContextImpl extends
AbstractProcessorContext<Object,
final Bytes key,
final byte[] value,
final long timestamp,
+ final Headers headers,
final Position position) {
throwUnsupportedOperationExceptionIfStandby("logChange");
final TopicPartition changelogPartition =
stateManager().registeredChangelogPartitionFor(storeName);
- final Headers headers;
- if (!consistencyEnabled) {
- headers = null;
- } else {
- // Add the vector clock to the header part of every record
- headers = new RecordHeaders();
-
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
- headers.add(new
RecordHeader(ChangelogRecordDeserializationHelper.CHANGELOG_POSITION_HEADER_KEY,
- PositionSerde.serialize(position).array()));
+ if (consistencyEnabled) {
+ addVectorClockToHeaders(headers, position);
}
collector.send(
@@ -153,6 +146,12 @@ public final class ProcessorContextImpl extends
AbstractProcessorContext<Object,
null);
}
+ private void addVectorClockToHeaders(final Headers headers, final Position
position) {
+
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
+ headers.add(new
RecordHeader(ChangelogRecordDeserializationHelper.CHANGELOG_POSITION_HEADER_KEY,
+ PositionSerde.serialize(position).array()));
+ }
+
/**
* @throws StreamsException if an attempt is made to access this state
store from an unknown node
* @throws UnsupportedOperationException if the current streamTask type is
standby
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 9c1c3f9ae76..d6f31629c50 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
@@ -52,7 +54,7 @@ public class ChangeLoggingKeyValueBytesStore
if (wrapped() instanceof MemoryLRUCache) {
((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
// pass null to indicate removal
- log(key, null, internalContext.recordContext().timestamp());
+ log(key, null, internalContext.recordContext().timestamp(),
new RecordHeaders());
});
}
}
@@ -66,7 +68,7 @@ public class ChangeLoggingKeyValueBytesStore
public void put(final Bytes key,
final byte[] value) {
wrapped().put(key, value);
- log(key, value, internalContext.recordContext().timestamp());
+ log(key, value, internalContext.recordContext().timestamp(), new
RecordHeaders());
}
@Override
@@ -75,7 +77,7 @@ public class ChangeLoggingKeyValueBytesStore
final byte[] previous = wrapped().putIfAbsent(key, value);
if (previous == null) {
// then it was absent
- log(key, value, internalContext.recordContext().timestamp());
+ log(key, value, internalContext.recordContext().timestamp(), new
RecordHeaders());
}
return previous;
}
@@ -84,7 +86,7 @@ public class ChangeLoggingKeyValueBytesStore
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
- log(entry.key, entry.value,
internalContext.recordContext().timestamp());
+ log(entry.key, entry.value,
internalContext.recordContext().timestamp(), new RecordHeaders());
}
}
@@ -97,7 +99,7 @@ public class ChangeLoggingKeyValueBytesStore
@Override
public byte[] delete(final Bytes key) {
final byte[] oldValue = wrapped().delete(key);
- log(key, null, internalContext.recordContext().timestamp());
+ log(key, null, internalContext.recordContext().timestamp(), new
RecordHeaders());
return oldValue;
}
@@ -128,7 +130,7 @@ public class ChangeLoggingKeyValueBytesStore
return wrapped().reverseAll();
}
- void log(final Bytes key, final byte[] value, final long timestamp) {
- internalContext.logChange(name(), key, value, timestamp,
wrapped().getPosition());
+ void log(final Bytes key, final byte[] value, final long timestamp, final
Headers headers) {
+ internalContext.logChange(name(), key, value, timestamp, headers,
wrapped().getPosition());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
index ba43ba30b17..8d3b38391cb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -32,9 +33,9 @@ public class ChangeLoggingListValueBytesStore extends
ChangeLoggingKeyValueBytes
// we need to log the full new list and thus call get() on the inner
store below
// if the value is a tombstone, we delete the whole list and thus can
save the get call
if (value == null) {
- log(key, null, internalContext.recordContext().timestamp());
+ log(key, null, internalContext.recordContext().timestamp(), new
RecordHeaders());
} else {
- log(key, wrapped().get(key),
internalContext.recordContext().timestamp());
+ log(key, wrapped().get(key),
internalContext.recordContext().timestamp(), new RecordHeaders());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 248889211c3..8c24d428abc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
@@ -73,13 +74,13 @@ public class ChangeLoggingSessionBytesStore
@Override
public void remove(final Windowed<Bytes> sessionKey) {
wrapped().remove(sessionKey);
- internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), null,
internalContext.recordContext().timestamp(), wrapped().getPosition());
+ internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), null,
internalContext.recordContext().timestamp(), new RecordHeaders(),
wrapped().getPosition());
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
wrapped().put(sessionKey, aggregate);
- internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), aggregate,
internalContext.recordContext().timestamp(), wrapped().getPosition());
+ internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), aggregate,
internalContext.recordContext().timestamp(), new RecordHeaders(),
wrapped().getPosition());
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
index b95ede1bba8..4aab5d2673d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.state.KeyValueStore;
@@ -35,7 +36,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore
extends ChangeLoggingKey
public void put(final Bytes key,
final byte[] valueAndTimestamp) {
wrapped().put(key, valueAndTimestamp);
- log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
+ log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp), new
RecordHeaders());
}
@Override
@@ -44,7 +45,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore
extends ChangeLoggingKey
final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp);
if (previous == null) {
// then it was absent
- log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
+ log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp), new
RecordHeaders());
}
return previous;
}
@@ -54,7 +55,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore
extends ChangeLoggingKey
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
final byte[] valueAndTimestamp = entry.value;
- log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp ==
null ? internalContext.recordContext().timestamp() :
timestamp(valueAndTimestamp));
+ log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp ==
null ? internalContext.recordContext().timestamp() :
timestamp(valueAndTimestamp), new RecordHeaders());
}
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
new file mode 100644
index 00000000000..fe96a6acd16
--- /dev/null
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueStore;
+
+import java.util.List;
+
+import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.headers;
+import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.rawValue;
+import static
org.apache.kafka.streams.state.internals.ValueTimestampHeadersDeserializer.timestamp;
+
+/**
+ * Change-logging wrapper for a timestamped key-value bytes store whose values
also carry headers.
+ * <p>
+ * the header-aware serialized value format produced by {@link
ValueTimestampHeadersSerializer}.
+ * <p>
+ * Semantics:
+ * - The inner store value format is:
+ * [ varint header_length ][ header_bytes ][ 8-byte timestamp ][
value_bytes ]
+ * - The changelog record value logged via {@code log(...)} remains just
{@code value_bytes}
+ * (no timestamp, no headers), and the timestamp is logged separately.
+ */
+public class ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders
+ extends ChangeLoggingKeyValueBytesStore {
+
+ ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders(final
KeyValueStore<Bytes, byte[]> inner) {
+ super(inner);
+ }
+
+ @Override
+ public void put(final Bytes key,
+ final byte[] valueTimestampHeaders) {
+ wrapped().put(key, valueTimestampHeaders);
+ log(
+ key,
+ rawValue(valueTimestampHeaders),
+ valueTimestampHeaders == null
+ ? internalContext.recordContext().timestamp()
+ : timestamp(valueTimestampHeaders),
+ valueTimestampHeaders == null
+ ? internalContext.recordContext().headers()
+ : headers(valueTimestampHeaders)
+ );
+ }
+
+ @Override
+ public byte[] putIfAbsent(final Bytes key,
+ final byte[] valueTimestampHeaders) {
+ final byte[] previous = wrapped().putIfAbsent(key,
valueTimestampHeaders);
+ if (previous == null) {
+ // then it was absent
+ log(
+ key,
+ rawValue(valueTimestampHeaders),
+ valueTimestampHeaders == null
+ ? internalContext.recordContext().timestamp()
+ : timestamp(valueTimestampHeaders),
+ valueTimestampHeaders == null
+ ? internalContext.recordContext().headers()
+ : headers(valueTimestampHeaders)
+ );
+ }
+ return previous;
+ }
+
+ @Override
+ public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
+ wrapped().putAll(entries);
+ for (final KeyValue<Bytes, byte[]> entry : entries) {
+ final byte[] valueTimestampHeaders = entry.value;
+ log(
+ entry.key,
+ rawValue(valueTimestampHeaders),
+ valueTimestampHeaders == null
+ ? internalContext.recordContext().timestamp()
+ : timestamp(valueTimestampHeaders),
+ valueTimestampHeaders == null
+ ? internalContext.recordContext().headers()
+ : headers(valueTimestampHeaders)
+ );
+ }
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
index 5ae334f95cc..9f9a4b05c93 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.WindowStore;
@@ -37,6 +38,7 @@ class ChangeLoggingTimestampedWindowBytesStore extends
ChangeLoggingWindowBytesS
key,
rawValue(valueAndTimestamp),
valueAndTimestamp != null ? timestamp(valueAndTimestamp) :
internalContext.recordContext().timestamp(),
+ new RecordHeaders(),
wrapped().getPosition()
);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java
index 94bfdc7ee79..bd35648210d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingVersionedKeyValueBytesStore.java
@@ -16,6 +16,8 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.VersionedBytesStore;
@@ -39,7 +41,7 @@ public class ChangeLoggingVersionedKeyValueBytesStore extends
ChangeLoggingKeyVa
@Override
public long put(final Bytes key, final byte[] value, final long timestamp)
{
final long validTo = inner.put(key, value, timestamp);
- log(key, value, timestamp);
+ log(key, value, timestamp, new RecordHeaders());
return validTo;
}
@@ -51,17 +53,17 @@ public class ChangeLoggingVersionedKeyValueBytesStore
extends ChangeLoggingKeyVa
@Override
public byte[] delete(final Bytes key, final long timestamp) {
final byte[] oldValue = inner.delete(key, timestamp);
- log(key, null, timestamp);
+ log(key, null, timestamp, new RecordHeaders());
return oldValue;
}
- @Override
- public void log(final Bytes key, final byte[] value, final long timestamp)
{
+ @Override public void log(final Bytes key, final byte[] value, final long
timestamp, final Headers headers) {
internalContext.logChange(
name(),
key,
value,
timestamp,
+ headers,
wrapped().getPosition()
);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 0d0f378af75..74a213f2ec1 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams.state.internals;
+import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.StateStore;
@@ -129,7 +130,7 @@ class ChangeLoggingWindowBytesStore
}
void log(final Bytes key, final byte[] value) {
- internalContext.logChange(name(), key, value,
internalContext.recordContext().timestamp(), wrapped().getPosition());
+ internalContext.logChange(name(), key, value,
internalContext.recordContext().timestamp(), new RecordHeaders(),
wrapped().getPosition());
}
private int maybeUpdateSeqnumForDups() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
index bb47ff1e148..c268e6f0276 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ValueTimestampHeadersDeserializer.java
@@ -164,4 +164,17 @@ class ValueTimestampHeadersDeserializer<V> implements
WrappingNullableDeserializ
final byte[] rawHeaders = readBytes(buffer, headersSize);
return HEADERS_DESERIALIZER.deserialize("", rawHeaders);
}
+ /**
+ * Extract raw value from serialized ValueTimestampHeaders.
+ */
+ static byte[] rawValue(final byte[] rawValueTimestampHeaders) {
+ if (rawValueTimestampHeaders == null) {
+ return null;
+ }
+
+ final ByteBuffer buffer = ByteBuffer.wrap(rawValueTimestampHeaders);
+ final int headersSize = ByteUtils.readVarint(buffer);
+ buffer.position(buffer.position() + headersSize + Long.BYTES);
+ return readBytes(buffer, buffer.remaining());
+ }
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
index 8c0a2717d32..ce45c608a46 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/AbstractProcessorContextTest.java
@@ -255,6 +255,7 @@ public class AbstractProcessorContextTest {
final Bytes key,
final byte[] value,
final long timestamp,
+ final Headers headers,
final Position position) {
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 960ca160ef7..2c757a20aba 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -513,15 +513,16 @@ public class ProcessorContextImplTest {
mockProcessorNodeWithLocalKeyValueStore();
final StreamTask task1 = mock(StreamTask.class);
+ final Headers headers = new RecordHeaders();
context.transitionToActive(task1, recordCollector, null);
- context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES,
TIMESTAMP, Position.emptyPosition());
+ context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES,
TIMESTAMP, headers, Position.emptyPosition());
verify(recordCollector).send(
CHANGELOG_PARTITION.topic(),
KEY_BYTES,
VALUE_BYTES,
- null,
+ headers,
CHANGELOG_PARTITION.partition(),
TIMESTAMP,
BYTES_KEY_SERIALIZER,
@@ -546,7 +547,7 @@ public class ProcessorContextImplTest {
context =
buildProcessorContextImpl(streamsConfigWithConsistencyMock(), stateManager);
context.transitionToActive(task1, recordCollector, null);
- context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES,
TIMESTAMP, position);
+ context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES,
TIMESTAMP, headers, position);
verify(recordCollector).send(
CHANGELOG_PARTITION.topic(),
@@ -566,7 +567,7 @@ public class ProcessorContextImplTest {
context = getStandbyContext();
assertThrows(
UnsupportedOperationException.class,
- () -> context.logChange("Store", Bytes.wrap("k".getBytes()), null,
0L, Position.emptyPosition())
+ () -> context.logChange("Store", Bytes.wrap("k".getBytes()), null,
0L, new RecordHeaders(), Position.emptyPosition())
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WriteConsistencyVectorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WriteConsistencyVectorTest.java
index 75c2444e4fb..d0d2c123622 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/WriteConsistencyVectorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/WriteConsistencyVectorTest.java
@@ -128,7 +128,7 @@ public class WriteConsistencyVectorTest {
context.transitionToActive(task, recordCollector, null);
- context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES,
TIMESTAMP, position);
+ context.logChange(REGISTERED_STORE_NAME, KEY_BYTES, VALUE_BYTES,
TIMESTAMP, headers, position);
verify(recordCollector).send(
CHANGELOG_PARTITION.topic(),
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 2e7f8519fab..b2695f766dc 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -84,7 +84,7 @@ public class ChangeLoggingSessionBytesStoreTest {
store.put(key1, value1);
verify(inner).put(key1, value1);
- verify(context).logChange(store.name(), binaryKey, value1, 0L,
Position.emptyPosition());
+ verify(context).logChange(store.name(), binaryKey, value1, 0L, new
RecordHeaders(), Position.emptyPosition());
}
@Test
@@ -96,7 +96,7 @@ public class ChangeLoggingSessionBytesStoreTest {
store.put(key1, value1);
verify(inner).put(key1, value1);
- verify(context).logChange(store.name(), binaryKey, value1, 0L,
POSITION);
+ verify(context).logChange(store.name(), binaryKey, value1, 0L, new
RecordHeaders(), POSITION);
}
@Test
@@ -109,7 +109,7 @@ public class ChangeLoggingSessionBytesStoreTest {
store.remove(key1);
verify(inner, times(2)).remove(key1);
- verify(context, times(2)).logChange(store.name(), binaryKey, null, 0L,
Position.emptyPosition());
+ verify(context, times(2)).logChange(store.name(), binaryKey, null, 0L,
new RecordHeaders(), Position.emptyPosition());
}
@SuppressWarnings({"resource", "unused"})
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeadersTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeadersTest.java
new file mode 100644
index 00000000000..a9b1bdd5ca2
--- /dev/null
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStoreWithHeadersTest.java
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.internals.MockStreamsMetrics;
+import org.apache.kafka.streams.state.KeyValueStore;
+import org.apache.kafka.streams.state.ValueTimestampHeaders;
+import org.apache.kafka.test.InternalMockProcessorContext;
+import org.apache.kafka.test.MockRecordCollector;
+import org.apache.kafka.test.TestUtils;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.junit.jupiter.MockitoExtension;
+import org.mockito.junit.jupiter.MockitoSettings;
+import org.mockito.quality.Strictness;
+
+import java.util.Arrays;
+
+import static org.junit.jupiter.api.Assertions.assertArrayEquals;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@ExtendWith(MockitoExtension.class)
+@MockitoSettings(strictness = Strictness.STRICT_STUBS)
+public class ChangeLoggingTimestampedKeyValueBytesStoreWithHeadersTest {
+
+ private final MockRecordCollector collector = new MockRecordCollector();
+ private final InMemoryKeyValueStore root = new InMemoryKeyValueStore("kv");
+ private final ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders store =
+ new ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders(root);
+ private final Bytes hi = Bytes.wrap("hi".getBytes());
+ private final Bytes hello = Bytes.wrap("hello".getBytes());
+
+ private final RecordHeaders thereHeaders = new RecordHeaders();
+ private final RecordHeaders worldHeaders = new RecordHeaders();
+
+ private final ValueTimestampHeaders<byte[]> there =
ValueTimestampHeaders.make("there".getBytes(), 97L, thereHeaders);
+ private final ValueTimestampHeaders<byte[]> world =
ValueTimestampHeaders.make("world".getBytes(), 98L, worldHeaders);
+
+ private byte[] rawThere;
+ private byte[] rawWorld;
+
+ @BeforeEach
+ public void before() {
+ thereHeaders.add("key1", "value1".getBytes());
+ worldHeaders.add("key2", "value2".getBytes());
+
+ final ValueTimestampHeadersSerializer<byte[]> serializer =
+ new
ValueTimestampHeadersSerializer<>(Serdes.ByteArray().serializer());
+ rawThere = serializer.serialize("topic", there);
+ rawWorld = serializer.serialize("topic", world);
+
+ final InternalMockProcessorContext<String, Long> context =
mockContext();
+ context.setTime(0);
+ store.init(context, store);
+ }
+
+ private InternalMockProcessorContext<String, Long> mockContext() {
+ return new InternalMockProcessorContext<>(
+ TestUtils.tempDirectory(),
+ Serdes.String(),
+ Serdes.Long(),
+ collector,
+ new ThreadCache(new LogContext("testCache "), 0, new
MockStreamsMetrics(new Metrics()))
+ );
+ }
+
+ @AfterEach
+ public void after() {
+ store.close();
+ }
+
+ @Test
+ public void shouldDelegateInit() {
+ final InternalMockProcessorContext<String, Long> context =
mockContext();
+ final KeyValueStore<Bytes, byte[]> inner =
mock(InMemoryKeyValueStore.class);
+ final StateStore outer = new
ChangeLoggingTimestampedKeyValueBytesStoreWithHeaders(inner);
+
+ outer.init(context, outer);
+ verify(inner).init(context, outer);
+ }
+
+ @Test
+ public void shouldWriteKeyValueBytesToInnerStoreOnPut() {
+ store.put(hi, rawThere);
+
+ assertEquals(rawThere, root.get(hi));
+ assertEquals(1, collector.collected().size());
+ assertEquals(hi, collector.collected().get(0).key());
+ assertArrayEquals(there.value(), (byte[])
collector.collected().get(0).value());
+ assertEquals(97L, collector.collected().get(0).timestamp());
+
+ // Verify headers are logged
+ final Headers loggedHeaders = collector.collected().get(0).headers();
+ assertEquals(1, loggedHeaders.toArray().length);
+ assertEquals("value1", new
String(loggedHeaders.lastHeader("key1").value()));
+ }
+
+ @Test
+ public void shouldWriteAllKeyValueToInnerStoreOnPutAll() {
+ store.putAll(Arrays.asList(KeyValue.pair(hi, rawThere),
+ KeyValue.pair(hello, rawWorld)));
+ assertEquals(rawThere, root.get(hi));
+ assertEquals(rawWorld, root.get(hello));
+ }
+
+ @Test
+ public void shouldLogChangesOnPutAll() {
+ store.putAll(Arrays.asList(KeyValue.pair(hi, rawThere),
+ KeyValue.pair(hello, rawWorld)));
+
+ assertEquals(2, collector.collected().size());
+
+ // First entry
+ assertEquals(hi, collector.collected().get(0).key());
+ assertArrayEquals(there.value(), (byte[])
collector.collected().get(0).value());
+ assertEquals(97L, collector.collected().get(0).timestamp());
+ final Headers headers0 = collector.collected().get(0).headers();
+ assertEquals(1, headers0.toArray().length);
+ assertEquals("value1", new
String(headers0.lastHeader("key1").value()));
+
+ // Second entry
+ assertEquals(hello, collector.collected().get(1).key());
+ assertArrayEquals(world.value(), (byte[])
collector.collected().get(1).value());
+ assertEquals(98L, collector.collected().get(1).timestamp());
+ final Headers headers1 = collector.collected().get(1).headers();
+ assertEquals(1, headers1.toArray().length);
+ assertEquals("value2", new
String(headers1.lastHeader("key2").value()));
+ }
+
+ @Test
+ public void shouldPropagateDelete() {
+ store.put(hi, rawThere);
+ store.delete(hi);
+ assertEquals(0L, root.approximateNumEntries());
+ assertNull(root.get(hi));
+ }
+
+ @Test
+ public void shouldReturnOldValueOnDelete() {
+ store.put(hi, rawThere);
+ assertEquals(rawThere, store.delete(hi));
+ }
+
+ @Test
+ public void shouldLogKeyNullOnDelete() {
+ store.put(hi, rawThere);
+ store.delete(hi);
+
+ assertEquals(2, collector.collected().size());
+
+ // First record is the put
+ assertEquals(hi, collector.collected().get(0).key());
+ assertArrayEquals(there.value(), (byte[])
collector.collected().get(0).value());
+ assertEquals(97L, collector.collected().get(0).timestamp());
+ final Headers headers0 = collector.collected().get(0).headers();
+ assertEquals(1, headers0.toArray().length);
+ assertEquals("value1", new
String(headers0.lastHeader("key1").value()));
+
+ // Second record is the delete
+ assertEquals(hi, collector.collected().get(1).key());
+ assertNull(collector.collected().get(1).value());
+ assertEquals(0L, collector.collected().get(1).timestamp());
+ assertEquals(0,
collector.collected().get(1).headers().toArray().length);
+ }
+
+ @Test
+ public void shouldWriteToInnerOnPutIfAbsentNoPreviousValue() {
+ store.putIfAbsent(hi, rawThere);
+ assertEquals(rawThere, root.get(hi));
+ }
+
+ @Test
+ public void shouldNotWriteToInnerOnPutIfAbsentWhenValueForKeyExists() {
+ store.put(hi, rawThere);
+ store.putIfAbsent(hi, rawWorld);
+ assertEquals(rawThere, root.get(hi));
+ }
+
+ @Test
+ public void shouldWriteToChangelogOnPutIfAbsentWhenNoPreviousValue() {
+ store.putIfAbsent(hi, rawThere);
+
+ assertEquals(1, collector.collected().size());
+ assertEquals(hi, collector.collected().get(0).key());
+ assertArrayEquals(there.value(), (byte[])
collector.collected().get(0).value());
+ assertEquals(97L, collector.collected().get(0).timestamp());
+
+ final Headers headers = collector.collected().get(0).headers();
+ assertEquals(1, headers.toArray().length);
+ assertEquals("value1", new String(headers.lastHeader("key1").value()));
+ }
+
+ @Test
+ public void shouldNotWriteToChangeLogOnPutIfAbsentWhenValueForKeyExists() {
+ store.put(hi, rawThere);
+ store.putIfAbsent(hi, rawWorld);
+
+ assertEquals(1, collector.collected().size());
+ assertEquals(hi, collector.collected().get(0).key());
+ assertArrayEquals(there.value(), (byte[])
collector.collected().get(0).value());
+ assertEquals(97L, collector.collected().get(0).timestamp());
+ final Headers headers0 = collector.collected().get(0).headers();
+ assertEquals(1, headers0.toArray().length);
+ assertEquals("value1", new
String(headers0.lastHeader("key1").value()));
+ }
+
+ @Test
+ public void shouldReturnCurrentValueOnPutIfAbsent() {
+ store.put(hi, rawThere);
+ assertEquals(rawThere, store.putIfAbsent(hi, rawWorld));
+ }
+
+ @Test
+ public void shouldReturnNullOnPutIfAbsentWhenNoPreviousValue() {
+ assertNull(store.putIfAbsent(hi, rawThere));
+ }
+
+ @Test
+ public void shouldReturnValueOnGetWhenExists() {
+ store.put(hello, rawWorld);
+ assertEquals(rawWorld, store.get(hello));
+ }
+
+ @Test
+ public void shouldReturnNullOnGetWhenDoesntExist() {
+ assertNull(store.get(hello));
+ }
+
+ @Test
+ public void shouldHandleNullValueInPut() {
+ final InternalMockProcessorContext<String, Long> context =
mockContext();
+ context.setTime(42L);
+ context.headers().add("headerKey", "headerValue".getBytes());
+ store.init(context, store);
+
+ store.put(hi, null);
+
+ assertEquals(1, collector.collected().size());
+ assertEquals(hi, collector.collected().get(0).key());
+ assertNull(collector.collected().get(0).value());
+ assertEquals(42L, collector.collected().get(0).timestamp());
+ assertEquals(1,
collector.collected().get(0).headers().toArray().length);
+ assertEquals("headerKey",
collector.collected().get(0).headers().toArray()[0].key());
+ assertArrayEquals("headerValue".getBytes(),
collector.collected().get(0).headers().toArray()[0].value());
+ }
+
+ @Test
+ public void shouldHandleNullValueInPutIfAbsent() {
+ final InternalMockProcessorContext<String, Long> context =
mockContext();
+ context.setTime(50L);
+ context.headers().add("headerKey", "headerValue".getBytes());
+ store.init(context, store);
+
+ store.putIfAbsent(hi, null);
+
+ assertEquals(1, collector.collected().size());
+ assertEquals(hi, collector.collected().get(0).key());
+ assertNull(collector.collected().get(0).value());
+ assertEquals(50L, collector.collected().get(0).timestamp());
+ assertEquals(1,
collector.collected().get(0).headers().toArray().length);
+ assertEquals("headerKey",
collector.collected().get(0).headers().toArray()[0].key());
+ assertArrayEquals("headerValue".getBytes(),
collector.collected().get(0).headers().toArray()[0].value());
+ }
+
+ @Test
+ public void shouldHandleEmptyHeaders() {
+ final RecordHeaders emptyHeaders = new RecordHeaders();
+ final ValueTimestampHeaders<byte[]> valueWithEmptyHeaders =
+ ValueTimestampHeaders.make("test".getBytes(), 100L, emptyHeaders);
+
+ final ValueTimestampHeadersSerializer<byte[]> serializer =
+ new
ValueTimestampHeadersSerializer<>(Serdes.ByteArray().serializer());
+ final byte[] rawValueWithEmptyHeaders = serializer.serialize("topic",
valueWithEmptyHeaders);
+
+ store.put(hi, rawValueWithEmptyHeaders);
+
+ assertEquals(1, collector.collected().size());
+ assertEquals(hi, collector.collected().get(0).key());
+ assertArrayEquals(valueWithEmptyHeaders.value(), (byte[])
collector.collected().get(0).value());
+ assertEquals(100L, collector.collected().get(0).timestamp());
+
+ // Verify empty headers
+ final Headers loggedHeaders = collector.collected().get(0).headers();
+ assertEquals(0, loggedHeaders.toArray().length);
+ }
+
+ @Test
+ public void shouldHandleMultipleHeadersInSingleRecord() {
+ final RecordHeaders multiHeaders = new RecordHeaders();
+ multiHeaders.add("header1", "value1".getBytes());
+ multiHeaders.add("header2", "value2".getBytes());
+ multiHeaders.add("header3", "value3".getBytes());
+
+ final ValueTimestampHeaders<byte[]> valueWithMultiHeaders =
+ ValueTimestampHeaders.make("multi".getBytes(), 123L, multiHeaders);
+
+ final ValueTimestampHeadersSerializer<byte[]> serializer =
+ new
ValueTimestampHeadersSerializer<>(Serdes.ByteArray().serializer());
+ final byte[] rawValueWithMultiHeaders = serializer.serialize("topic",
valueWithMultiHeaders);
+
+ store.put(hello, rawValueWithMultiHeaders);
+
+ assertEquals(1, collector.collected().size());
+ assertEquals(hello, collector.collected().get(0).key());
+ assertArrayEquals(valueWithMultiHeaders.value(), (byte[])
collector.collected().get(0).value());
+ assertEquals(123L, collector.collected().get(0).timestamp());
+
+ // Verify multiple headers
+ final Headers loggedHeaders = collector.collected().get(0).headers();
+ assertEquals(3, loggedHeaders.toArray().length);
+ assertEquals("value1", new
String(loggedHeaders.lastHeader("header1").value()));
+ assertEquals("value2", new
String(loggedHeaders.lastHeader("header2").value()));
+ assertEquals("value3", new
String(loggedHeaders.lastHeader("header3").value()));
+ }
+}
\ No newline at end of file
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index 1c1b713ce21..fae6a7fe379 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -84,7 +84,7 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
store.put(bytesKey, valueAndTimestamp,
context.recordContext().timestamp());
verify(inner).put(bytesKey, valueAndTimestamp, 0);
- verify(context).logChange(store.name(), key, value, 42,
Position.emptyPosition());
+ verify(context).logChange(store.name(), key, value, 42, new
RecordHeaders(), Position.emptyPosition());
}
@Test
@@ -96,7 +96,7 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
store.put(bytesKey, valueAndTimestamp,
context.recordContext().timestamp());
verify(inner).put(bytesKey, valueAndTimestamp, 0);
- verify(context).logChange(store.name(), key, value, 42, POSITION);
+ verify(context).logChange(store.name(), key, value, 42, new
RecordHeaders(), POSITION);
}
@SuppressWarnings({"resource", "unused"})
@@ -128,8 +128,8 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
store.put(bytesKey, valueAndTimestamp,
context.recordContext().timestamp());
verify(inner, times(2)).put(bytesKey, valueAndTimestamp, 0);
- verify(context).logChange(store.name(), key1, value, 42L,
Position.emptyPosition());
- verify(context).logChange(store.name(), key2, value, 42L,
Position.emptyPosition());
+ verify(context).logChange(store.name(), key1, value, 42L, new
RecordHeaders(), Position.emptyPosition());
+ verify(context).logChange(store.name(), key2, value, 42L, new
RecordHeaders(), Position.emptyPosition());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index e80a2325a2a..39c93b86fe7 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -83,7 +83,7 @@ public class ChangeLoggingWindowBytesStoreTest {
store.put(bytesKey, value, context.recordContext().timestamp());
verify(inner).put(bytesKey, value, 0);
- verify(context).logChange(store.name(), key, value, 0L,
Position.emptyPosition());
+ verify(context).logChange(store.name(), key, value, 0L, new
RecordHeaders(), Position.emptyPosition());
}
@Test
@@ -95,7 +95,7 @@ public class ChangeLoggingWindowBytesStoreTest {
store.put(bytesKey, value, context.recordContext().timestamp());
verify(inner).put(bytesKey, value, 0);
- verify(context).logChange(store.name(), key, value, 0L, POSITION);
+ verify(context).logChange(store.name(), key, value, 0L, new
RecordHeaders(), POSITION);
}
@SuppressWarnings({"resource", "unused"})
@@ -144,8 +144,8 @@ public class ChangeLoggingWindowBytesStoreTest {
store.put(bytesKey, value, context.recordContext().timestamp());
verify(inner, times(2)).put(bytesKey, value, 0);
- verify(context).logChange(store.name(), key1, value, 0L,
Position.emptyPosition());
- verify(context).logChange(store.name(), key2, value, 0L,
Position.emptyPosition());
+ verify(context).logChange(store.name(), key1, value, 0L, new
RecordHeaders(), Position.emptyPosition());
+ verify(context).logChange(store.name(), key2, value, 0L, new
RecordHeaders(), Position.emptyPosition());
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
index 318be5cd86a..2fa2cee1f6c 100644
---
a/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
@@ -457,17 +457,11 @@ public class InternalMockProcessorContext<KOut, VOut>
final Bytes key,
final byte[] value,
final long timestamp,
+ final Headers headers,
final Position position) {
- Headers headers = new RecordHeaders();
- if (!consistencyEnabled) {
- headers = null;
- } else {
- // Add the vector clock to the header part of every record
-
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
- headers.add(new RecordHeader(
-
ChangelogRecordDeserializationHelper.CHANGELOG_POSITION_HEADER_KEY,
- PositionSerde.serialize(position).array()));
+ if (consistencyEnabled) {
+ addVectorClockToHeaders(headers, position);
}
recordCollector().send(
@@ -483,6 +477,13 @@ public class InternalMockProcessorContext<KOut, VOut>
null);
}
+ private void addVectorClockToHeaders(Headers headers, Position position) {
+
headers.add(ChangelogRecordDeserializationHelper.CHANGELOG_VERSION_HEADER_RECORD_CONSISTENCY);
+ headers.add(new RecordHeader(
+ ChangelogRecordDeserializationHelper.CHANGELOG_POSITION_HEADER_KEY,
+ PositionSerde.serialize(position).array()));
+ }
+
@Override
public void transitionToActive(final StreamTask streamTask, final
RecordCollector recordCollector, final ThreadCache newCache) {
taskType = TaskType.ACTIVE;
diff --git
a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
index 019410642fc..e245ef93e34 100644
---
a/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
+++
b/streams/src/test/java/org/apache/kafka/test/MockInternalProcessorContext.java
@@ -208,6 +208,7 @@ public class MockInternalProcessorContext<KOut, VOut>
extends MockProcessorConte
final Bytes key,
final byte[] value,
final long timestamp,
+ final Headers headers,
final Position position) {
}
diff --git
a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
index 38059cb93d7..ff1fc62f209 100644
--- a/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
+++ b/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.test;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.StreamsConfig;
@@ -142,6 +143,7 @@ public class NoOpProcessorContext extends
AbstractProcessorContext<Object, Objec
final Bytes key,
final byte[] value,
final long timestamp,
+ final Headers headers,
final Position position) {
}