This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch 4.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/4.1 by this push:
new 35e59427434 Revert "KAFKA-13722: remove usage of old ProcessorContext
(#18292)" (#20398)
35e59427434 is described below
commit 35e594274342eb10035eeec39bf82620637a6825
Author: Matthias J. Sax <[email protected]>
AuthorDate: Mon Aug 25 03:43:39 2025 -0700
Revert "KAFKA-13722: remove usage of old ProcessorContext (#18292)"
(#20398)
This reverts commit f13a22af0b3a48a4ca1bf2ece5b58f31e3b26b7d.
Reviewers: Chia-Ping Tsai <[email protected]>, Eduwer Camacaro
<[email protected]>, Mickael Maison <[email protected]>,
---
.../kstream/internals/KTableRepartitionMap.java | 2 +-
.../internals/GlobalProcessorContextImpl.java | 2 +-
.../processor/internals/GlobalStateUpdateTask.java | 4 ++--
.../processor/internals/ProcessorContextImpl.java | 8 ++++----
.../streams/processor/internals/ProcessorNode.java | 10 +++++-----
.../kafka/streams/processor/internals/SinkNode.java | 6 +++---
.../kafka/streams/processor/internals/StreamTask.java | 4 ++--
.../streams/state/internals/CachingKeyValueStore.java | 10 +++++-----
.../streams/state/internals/CachingSessionStore.java | 10 +++++-----
.../streams/state/internals/CachingWindowStore.java | 10 +++++-----
.../internals/ChangeLoggingKeyValueBytesStore.java | 10 +++++-----
.../internals/ChangeLoggingListValueBytesStore.java | 4 ++--
.../internals/ChangeLoggingSessionBytesStore.java | 4 ++--
.../ChangeLoggingTimestampedKeyValueBytesStore.java | 6 +++---
.../ChangeLoggingTimestampedWindowBytesStore.java | 2 +-
.../internals/ChangeLoggingWindowBytesStore.java | 2 +-
.../streams/state/internals/MeteredKeyValueStore.java | 2 +-
.../streams/state/internals/MeteredSessionStore.java | 2 +-
.../streams/state/internals/MeteredWindowStore.java | 2 +-
.../internals/TimeOrderedCachingWindowStore.java | 19 +++++++++----------
.../processor/internals/ProcessorContextImplTest.java | 4 ----
.../processor/internals/ProcessorNodeTest.java | 2 +-
.../internals/ChangeLoggingSessionBytesStoreTest.java | 5 -----
.../ChangeLoggingTimestampedWindowBytesStoreTest.java | 13 ++++---------
.../internals/ChangeLoggingWindowBytesStoreTest.java | 13 ++++---------
.../state/internals/MeteredWindowStoreTest.java | 4 ++--
.../TimeOrderedCachingPersistentWindowStoreTest.java | 6 +++---
.../state/internals/TimeOrderedWindowStoreTest.java | 6 +++---
28 files changed, 76 insertions(+), 96 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
index a686692b40a..567d9a2947f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableRepartitionMap.java
@@ -213,7 +213,7 @@ public class KTableRepartitionMap<K, V, K1, V1> implements
KTableRepartitionMapS
private ValueAndTimestamp<KeyValue<? extends K1, ? extends V1>>
mapValue(final K key, final ValueAndTimestamp<V> valueAndTimestamp) {
return ValueAndTimestamp.make(
mapper.apply(key, getValueOrNull(valueAndTimestamp)),
- valueAndTimestamp == null ?
context.recordContext().timestamp() : valueAndTimestamp.timestamp()
+ valueAndTimestamp == null ? context.timestamp() :
valueAndTimestamp.timestamp()
);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
index 01b694863fd..828ae3a0a79 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
@@ -84,7 +84,7 @@ public class GlobalProcessorContextImpl extends
AbstractProcessorContext<Object,
@Override
public <KIn, VIn> void forward(final KIn key, final VIn value) {
- forward(new Record<>(key, value, recordContext().timestamp(),
headers()));
+ forward(new Record<>(key, value, timestamp(), headers()));
}
/**
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
index bfab9a770f6..1d8f18d326d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateUpdateTask.java
@@ -120,8 +120,8 @@ public class GlobalStateUpdateTask implements
GlobalStateMaintainer {
final Record<Object, Object> toProcess = new Record<>(
deserialized.key(),
deserialized.value(),
- processorContext.recordContext().timestamp(),
- processorContext.recordContext().headers()
+ processorContext.timestamp(),
+ processorContext.headers()
);
((SourceNode<Object, Object>)
sourceNodeAndDeserializer.sourceNode()).process(toProcess);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
index 93961daf97b..b5e0515522a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorContextImpl.java
@@ -190,7 +190,7 @@ public final class ProcessorContextImpl extends
AbstractProcessorContext<Object,
final Record<K, V> toForward = new Record<>(
key,
value,
- recordContext.timestamp(),
+ timestamp(),
headers()
);
forward(toForward);
@@ -204,7 +204,7 @@ public final class ProcessorContextImpl extends
AbstractProcessorContext<Object,
final Record<K, V> toForward = new Record<>(
key,
value,
- toInternal.hasTimestamp() ? toInternal.timestamp() :
recordContext.timestamp(),
+ toInternal.hasTimestamp() ? toInternal.timestamp() : timestamp(),
headers()
);
forward(toForward, toInternal.child());
@@ -250,11 +250,11 @@ public final class ProcessorContextImpl extends
AbstractProcessorContext<Object,
// old API processors wouldn't see the timestamps or headers of
upstream
// new API processors. But then again, from the perspective of
those old-API
// processors, even consulting the timestamp or headers when the
record context
- // is undefined is itself not well-defined. Plus, I don't think we
need to worry
+ // is undefined is itself not well defined. Plus, I don't think we
need to worry
// too much about heterogeneous applications, in which the
upstream processor is
// implementing the new API and the downstream one is implementing
the old API.
// So, this seems like a fine compromise for now.
- if (recordContext != null && (record.timestamp() !=
recordContext.timestamp() || record.headers() != recordContext.headers())) {
+ if (recordContext != null && (record.timestamp() != timestamp() ||
record.headers() != headers())) {
recordContext = new ProcessorRecordContext(
record.timestamp(),
recordContext.offset(),
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
index 1dddc55ca3c..62173e807fd 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java
@@ -209,13 +209,13 @@ public class ProcessorNode<KIn, VIn, KOut, VOut> {
// (instead of `RuntimeException`) to work well with those
languages
final ErrorHandlerContext errorHandlerContext = new
DefaultErrorHandlerContext(
null, // only required to pass for
DeserializationExceptionHandler
- internalProcessorContext.recordContext().topic(),
- internalProcessorContext.recordContext().partition(),
- internalProcessorContext.recordContext().offset(),
- internalProcessorContext.recordContext().headers(),
+ internalProcessorContext.topic(),
+ internalProcessorContext.partition(),
+ internalProcessorContext.offset(),
+ internalProcessorContext.headers(),
internalProcessorContext.currentNode().name(),
internalProcessorContext.taskId(),
- internalProcessorContext.recordContext().timestamp(),
+ internalProcessorContext.timestamp(),
internalProcessorContext.recordContext().sourceRawKey(),
internalProcessorContext.recordContext().sourceRawValue()
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
index d32cf2523e0..8cb82a6bfa5 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java
@@ -85,9 +85,9 @@ public class SinkNode<KIn, VIn> extends ProcessorNode<KIn,
VIn, Void, Void> {
final ProcessorRecordContext contextForExtraction =
new ProcessorRecordContext(
timestamp,
- context.recordContext().offset(),
- context.recordContext().partition(),
- context.recordContext().topic(),
+ context.offset(),
+ context.partition(),
+ context.topic(),
record.headers()
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 82e9c8d7fb1..8c3b6cc506b 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -866,8 +866,8 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
final Record<Object, Object> toProcess = new Record<>(
record.key(),
record.value(),
- processorContext.recordContext().timestamp(),
- processorContext.recordContext().headers()
+ processorContext.timestamp(),
+ processorContext.headers()
);
maybeMeasureLatency(() -> currNode.process(toProcess), time,
processLatencySensor);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
index 83343d04494..389cf688f4a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingKeyValueStore.java
@@ -272,12 +272,12 @@ public class CachingKeyValueStore
key,
new LRUCacheEntry(
value,
- internalContext.recordContext().headers(),
+ internalContext.headers(),
true,
- internalContext.recordContext().offset(),
- internalContext.recordContext().timestamp(),
- internalContext.recordContext().partition(),
- internalContext.recordContext().topic(),
+ internalContext.offset(),
+ internalContext.timestamp(),
+ internalContext.partition(),
+ internalContext.topic(),
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
index ec0c1bd077d..7bb615ea4ee 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingSessionStore.java
@@ -135,12 +135,12 @@ class CachingSessionStore
final LRUCacheEntry entry =
new LRUCacheEntry(
value,
- internalContext.recordContext().headers(),
+ internalContext.headers(),
true,
- internalContext.recordContext().offset(),
- internalContext.recordContext().timestamp(),
- internalContext.recordContext().partition(),
- internalContext.recordContext().topic(),
+ internalContext.offset(),
+ internalContext.timestamp(),
+ internalContext.partition(),
+ internalContext.topic(),
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
index 0432c1726cb..38d98b58d7e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/CachingWindowStore.java
@@ -153,12 +153,12 @@ class CachingWindowStore
final LRUCacheEntry entry =
new LRUCacheEntry(
value,
- internalContext.recordContext().headers(),
+ internalContext.headers(),
true,
- internalContext.recordContext().offset(),
- internalContext.recordContext().timestamp(),
- internalContext.recordContext().partition(),
- internalContext.recordContext().topic(),
+ internalContext.offset(),
+ internalContext.timestamp(),
+ internalContext.partition(),
+ internalContext.topic(),
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 9c1c3f9ae76..b21b102cdfc 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -52,7 +52,7 @@ public class ChangeLoggingKeyValueBytesStore
if (wrapped() instanceof MemoryLRUCache) {
((MemoryLRUCache) wrapped()).setWhenEldestRemoved((key, value) -> {
// pass null to indicate removal
- log(key, null, internalContext.recordContext().timestamp());
+ log(key, null, internalContext.timestamp());
});
}
}
@@ -66,7 +66,7 @@ public class ChangeLoggingKeyValueBytesStore
public void put(final Bytes key,
final byte[] value) {
wrapped().put(key, value);
- log(key, value, internalContext.recordContext().timestamp());
+ log(key, value, internalContext.timestamp());
}
@Override
@@ -75,7 +75,7 @@ public class ChangeLoggingKeyValueBytesStore
final byte[] previous = wrapped().putIfAbsent(key, value);
if (previous == null) {
// then it was absent
- log(key, value, internalContext.recordContext().timestamp());
+ log(key, value, internalContext.timestamp());
}
return previous;
}
@@ -84,7 +84,7 @@ public class ChangeLoggingKeyValueBytesStore
public void putAll(final List<KeyValue<Bytes, byte[]>> entries) {
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
- log(entry.key, entry.value,
internalContext.recordContext().timestamp());
+ log(entry.key, entry.value, internalContext.timestamp());
}
}
@@ -97,7 +97,7 @@ public class ChangeLoggingKeyValueBytesStore
@Override
public byte[] delete(final Bytes key) {
final byte[] oldValue = wrapped().delete(key);
- log(key, null, internalContext.recordContext().timestamp());
+ log(key, null, internalContext.timestamp());
return oldValue;
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
index ba43ba30b17..9070fc8da5f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingListValueBytesStore.java
@@ -32,9 +32,9 @@ public class ChangeLoggingListValueBytesStore extends
ChangeLoggingKeyValueBytes
// we need to log the full new list and thus call get() on the inner
store below
// if the value is a tombstone, we delete the whole list and thus can
save the get call
if (value == null) {
- log(key, null, internalContext.recordContext().timestamp());
+ log(key, null, internalContext.timestamp());
} else {
- log(key, wrapped().get(key),
internalContext.recordContext().timestamp());
+ log(key, wrapped().get(key), internalContext.timestamp());
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
index 248889211c3..06097aa7a71 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStore.java
@@ -73,13 +73,13 @@ public class ChangeLoggingSessionBytesStore
@Override
public void remove(final Windowed<Bytes> sessionKey) {
wrapped().remove(sessionKey);
- internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), null,
internalContext.recordContext().timestamp(), wrapped().getPosition());
+ internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), null, internalContext.timestamp(),
wrapped().getPosition());
}
@Override
public void put(final Windowed<Bytes> sessionKey, final byte[] aggregate) {
wrapped().put(sessionKey, aggregate);
- internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), aggregate,
internalContext.recordContext().timestamp(), wrapped().getPosition());
+ internalContext.logChange(name(),
SessionKeySchema.toBinary(sessionKey), aggregate, internalContext.timestamp(),
wrapped().getPosition());
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
index b95ede1bba8..916c9547184 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedKeyValueBytesStore.java
@@ -35,7 +35,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore
extends ChangeLoggingKey
public void put(final Bytes key,
final byte[] valueAndTimestamp) {
wrapped().put(key, valueAndTimestamp);
- log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
+ log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.timestamp() : timestamp(valueAndTimestamp));
}
@Override
@@ -44,7 +44,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore
extends ChangeLoggingKey
final byte[] previous = wrapped().putIfAbsent(key, valueAndTimestamp);
if (previous == null) {
// then it was absent
- log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.recordContext().timestamp() : timestamp(valueAndTimestamp));
+ log(key, rawValue(valueAndTimestamp), valueAndTimestamp == null ?
internalContext.timestamp() : timestamp(valueAndTimestamp));
}
return previous;
}
@@ -54,7 +54,7 @@ public class ChangeLoggingTimestampedKeyValueBytesStore
extends ChangeLoggingKey
wrapped().putAll(entries);
for (final KeyValue<Bytes, byte[]> entry : entries) {
final byte[] valueAndTimestamp = entry.value;
- log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp ==
null ? internalContext.recordContext().timestamp() :
timestamp(valueAndTimestamp));
+ log(entry.key, rawValue(valueAndTimestamp), valueAndTimestamp ==
null ? internalContext.timestamp() : timestamp(valueAndTimestamp));
}
}
}
\ No newline at end of file
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
index 5ae334f95cc..2bf87f9d2a8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStore.java
@@ -36,7 +36,7 @@ class ChangeLoggingTimestampedWindowBytesStore extends
ChangeLoggingWindowBytesS
name(),
key,
rawValue(valueAndTimestamp),
- valueAndTimestamp != null ? timestamp(valueAndTimestamp) :
internalContext.recordContext().timestamp(),
+ valueAndTimestamp != null ? timestamp(valueAndTimestamp) :
internalContext.timestamp(),
wrapped().getPosition()
);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
index 0d0f378af75..d5857d0456e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStore.java
@@ -129,7 +129,7 @@ class ChangeLoggingWindowBytesStore
}
void log(final Bytes key, final byte[] value) {
- internalContext.logChange(name(), key, value,
internalContext.recordContext().timestamp(), wrapped().getPosition());
+ internalContext.logChange(name(), key, value,
internalContext.timestamp(), wrapped().getPosition());
}
private int maybeUpdateSeqnumForDups() {
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
index 0962033b7ef..8678111f989 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
@@ -417,7 +417,7 @@ public class MeteredKeyValueStore<K, V>
// In that case, we _can't_ get the current timestamp, so we don't
record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds();
- final long e2eLatency = currentTime -
internalContext.recordContext().timestamp();
+ final long e2eLatency = currentTime - internalContext.timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
index 234ac1220f7..546959a9269 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
@@ -494,7 +494,7 @@ public class MeteredSessionStore<K, V>
// In that case, we _can't_ get the current timestamp, so we don't
record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds();
- final long e2eLatency = currentTime -
internalContext.recordContext().timestamp();
+ final long e2eLatency = currentTime - internalContext.timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
index 2da877453ce..783c16b2f4f 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredWindowStore.java
@@ -510,7 +510,7 @@ public class MeteredWindowStore<K, V>
// In that case, we _can't_ get the current timestamp, so we don't
record anything.
if (e2eLatencySensor.shouldRecord() && internalContext != null) {
final long currentTime = time.milliseconds();
- final long e2eLatency = currentTime -
internalContext.recordContext().timestamp();
+ final long e2eLatency = currentTime - internalContext.timestamp();
e2eLatencySensor.record(e2eLatency, currentTime);
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
index 646cbf2ca35..15a728ba0d0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
+++
b/streams/src/main/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingWindowStore.java
@@ -97,13 +97,12 @@ class TimeOrderedCachingWindowStore
hasIndex = timeOrderedWindowStore.hasIndex();
}
- @SuppressWarnings("unchecked")
private RocksDBTimeOrderedWindowStore getWrappedStore(final StateStore
wrapped) {
if (wrapped instanceof RocksDBTimeOrderedWindowStore) {
return (RocksDBTimeOrderedWindowStore) wrapped;
}
if (wrapped instanceof WrappedStateStore) {
- return getWrappedStore(((WrappedStateStore<?, Bytes, byte[]>)
wrapped).wrapped());
+ return getWrappedStore(((WrappedStateStore<?, ?, ?>)
wrapped).wrapped());
}
return null;
}
@@ -256,12 +255,12 @@ class TimeOrderedCachingWindowStore
final LRUCacheEntry entry =
new LRUCacheEntry(
value,
- internalContext.recordContext().headers(),
+ internalContext.headers(),
true,
- internalContext.recordContext().offset(),
- internalContext.recordContext().timestamp(),
- internalContext.recordContext().partition(),
- internalContext.recordContext().topic(),
+ internalContext.offset(),
+ internalContext.timestamp(),
+ internalContext.partition(),
+ internalContext.topic(),
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
);
@@ -278,9 +277,9 @@ class TimeOrderedCachingWindowStore
new byte[0],
new RecordHeaders(),
true,
- internalContext.recordContext().offset(),
- internalContext.recordContext().timestamp(),
- internalContext.recordContext().partition(),
+ internalContext.offset(),
+ internalContext.timestamp(),
+ internalContext.partition(),
"",
internalContext.recordContext().sourceRawKey(),
internalContext.recordContext().sourceRawValue()
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
index 9410ca5a978..42c466c2120 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorContextImplTest.java
@@ -593,8 +593,6 @@ public class ProcessorContextImplTest {
@Test
public void shouldThrowUnsupportedOperationExceptionOnForward() {
context = getStandbyContext();
- context.recordContext = mock(ProcessorRecordContext.class);
-
assertThrows(
UnsupportedOperationException.class,
() -> context.forward("key", "value")
@@ -604,8 +602,6 @@ public class ProcessorContextImplTest {
@Test
public void shouldThrowUnsupportedOperationExceptionOnForwardWithTo() {
context = getStandbyContext();
- context.recordContext = mock(ProcessorRecordContext.class);
-
assertThrows(
UnsupportedOperationException.class,
() -> context.forward("key", "value", To.child("child-name"))
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
index 5341cd25f0d..ce5fddb870a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java
@@ -362,7 +362,7 @@ public class ProcessorNodeTest {
assertEquals(internalProcessorContext.offset(), context.offset());
assertEquals(internalProcessorContext.currentNode().name(),
context.processorNodeId());
assertEquals(internalProcessorContext.taskId(), context.taskId());
- assertEquals(internalProcessorContext.recordContext().timestamp(),
context.timestamp());
+ assertEquals(internalProcessorContext.timestamp(),
context.timestamp());
assertEquals(internalProcessorContext.recordContext().sourceRawKey(),
context.sourceRawKey());
assertEquals(internalProcessorContext.recordContext().sourceRawValue(),
context.sourceRawValue());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
index 9a23e657600..d3243ef2fc6 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingSessionBytesStoreTest.java
@@ -16,12 +16,10 @@
*/
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.SessionWindow;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.SessionStore;
@@ -77,7 +75,6 @@ public class ChangeLoggingSessionBytesStoreTest {
public void shouldLogPuts() {
final Bytes binaryKey = SessionKeySchema.toBinary(key1);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
- when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
store.put(key1, value1);
@@ -89,7 +86,6 @@ public class ChangeLoggingSessionBytesStoreTest {
public void shouldLogPutsWithPosition() {
final Bytes binaryKey = SessionKeySchema.toBinary(key1);
when(inner.getPosition()).thenReturn(POSITION);
- when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
store.put(key1, value1);
@@ -101,7 +97,6 @@ public class ChangeLoggingSessionBytesStoreTest {
public void shouldLogRemoves() {
final Bytes binaryKey = SessionKeySchema.toBinary(key1);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
- when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
store.remove(key1);
store.remove(key1);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
index 1c1b713ce21..03701bdcb00 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingTimestampedWindowBytesStoreTest.java
@@ -17,11 +17,9 @@
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
@@ -79,9 +77,8 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
public void shouldLogPuts() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
- when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
- store.put(bytesKey, valueAndTimestamp,
context.recordContext().timestamp());
+ store.put(bytesKey, valueAndTimestamp, context.timestamp());
verify(inner).put(bytesKey, valueAndTimestamp, 0);
verify(context).logChange(store.name(), key, value, 42,
Position.emptyPosition());
@@ -91,9 +88,8 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
public void shouldLogPutsWithPosition() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(POSITION);
- when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
- store.put(bytesKey, valueAndTimestamp,
context.recordContext().timestamp());
+ store.put(bytesKey, valueAndTimestamp, context.timestamp());
verify(inner).put(bytesKey, valueAndTimestamp, 0);
verify(context).logChange(store.name(), key, value, 42, POSITION);
@@ -122,10 +118,9 @@ public class ChangeLoggingTimestampedWindowBytesStoreTest {
final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
- when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
- store.put(bytesKey, valueAndTimestamp,
context.recordContext().timestamp());
- store.put(bytesKey, valueAndTimestamp,
context.recordContext().timestamp());
+ store.put(bytesKey, valueAndTimestamp, context.timestamp());
+ store.put(bytesKey, valueAndTimestamp, context.timestamp());
verify(inner, times(2)).put(bytesKey, valueAndTimestamp, 0);
verify(context).logChange(store.name(), key1, value, 42L,
Position.emptyPosition());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
index e80a2325a2a..2607e56ad9f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/ChangeLoggingWindowBytesStoreTest.java
@@ -17,11 +17,9 @@
package org.apache.kafka.streams.state.internals;
-import org.apache.kafka.common.header.internals.RecordHeaders;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.processor.internals.ProcessorContextImpl;
-import org.apache.kafka.streams.processor.internals.ProcessorRecordContext;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.WindowStore;
@@ -78,9 +76,8 @@ public class ChangeLoggingWindowBytesStoreTest {
public void shouldLogPuts() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
- when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
- store.put(bytesKey, value, context.recordContext().timestamp());
+ store.put(bytesKey, value, context.timestamp());
verify(inner).put(bytesKey, value, 0);
verify(context).logChange(store.name(), key, value, 0L,
Position.emptyPosition());
@@ -90,9 +87,8 @@ public class ChangeLoggingWindowBytesStoreTest {
public void shouldLogPutsWithPosition() {
final Bytes key = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 0);
when(inner.getPosition()).thenReturn(POSITION);
- when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
- store.put(bytesKey, value, context.recordContext().timestamp());
+ store.put(bytesKey, value, context.timestamp());
verify(inner).put(bytesKey, value, 0);
verify(context).logChange(store.name(), key, value, 0L, POSITION);
@@ -135,13 +131,12 @@ public class ChangeLoggingWindowBytesStoreTest {
store = new ChangeLoggingWindowBytesStore(inner, true,
WindowKeySchema::toStoreKeyBinary);
store.init(context, store);
when(inner.getPosition()).thenReturn(Position.emptyPosition());
- when(context.recordContext()).thenReturn(new ProcessorRecordContext(0,
0, 0, "topic", new RecordHeaders()));
final Bytes key1 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 1);
final Bytes key2 = WindowKeySchema.toStoreKeyBinary(bytesKey, 0, 2);
- store.put(bytesKey, value, context.recordContext().timestamp());
- store.put(bytesKey, value, context.recordContext().timestamp());
+ store.put(bytesKey, value, context.timestamp());
+ store.put(bytesKey, value, context.timestamp());
verify(inner, times(2)).put(bytesKey, value, 0);
verify(context).logChange(store.name(), key1, value, 0L,
Position.emptyPosition());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
index 1c8935d1e1c..ba557104ebd 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/MeteredWindowStoreTest.java
@@ -223,10 +223,10 @@ public class MeteredWindowStoreTest {
@Test
public void shouldPutToInnerStoreAndRecordPutMetrics() {
final byte[] bytes = "a".getBytes();
- doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(),
eq(context.recordContext().timestamp()));
+ doNothing().when(innerStoreMock).put(eq(Bytes.wrap(bytes)), any(),
eq(context.timestamp()));
store.init(context, store);
- store.put("a", "a", context.recordContext().timestamp());
+ store.put("a", "a", context.timestamp());
// it suffices to verify one put metric since all put metrics are
recorded by the same sensor
// and the sensor is tested elsewhere
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
index ffa509d5188..21d16b09be4 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedCachingPersistentWindowStoreTest.java
@@ -935,9 +935,9 @@ public class TimeOrderedCachingPersistentWindowStoreTest {
new byte[0],
new RecordHeaders(),
true,
- context.recordContext().offset(),
- context.recordContext().timestamp(),
- context.recordContext().partition(),
+ context.offset(),
+ context.timestamp(),
+ context.partition(),
"",
context.recordContext().sourceRawKey(),
context.recordContext().sourceRawValue()
diff --git
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
index 9d0db9bae0f..f4ff30002ae 100644
---
a/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/state/internals/TimeOrderedWindowStoreTest.java
@@ -941,9 +941,9 @@ public class TimeOrderedWindowStoreTest {
new byte[0],
new RecordHeaders(),
true,
- context.recordContext().offset(),
- context.recordContext().timestamp(),
- context.recordContext().partition(),
+ context.offset(),
+ context.timestamp(),
+ context.partition(),
"",
context.recordContext().sourceRawKey(),
context.recordContext().sourceRawValue()