This is an automated email from the ASF dual-hosted git repository.
bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 312c1dbe894 HOTFIX: Add headers to log change method, some variables
final, update… (#21527)
312c1dbe894 is described below
commit 312c1dbe894b7907c48117dd3fa47ca484b3653e
Author: Bill Bejeck <[email protected]>
AuthorDate: Fri Feb 20 11:04:32 2026 -0500
HOTFIX: Add headers to log change method, some variables final, update…
(#21527)
1. `StateDirectory` - Added missing Headers parameter to the
logChange method
2. `TaskManager` Fixed type conversion issues for both active and
standby task creation
3. Checkstyle violations by making variables final
Reviewers: Lucas Brutschy <[email protected]>, TengYao
Chi<[email protected]>
---
.../org/apache/kafka/streams/processor/internals/StateDirectory.java | 5 +++--
.../org/apache/kafka/streams/processor/internals/TaskManager.java | 4 ++--
.../apache/kafka/streams/processor/internals/StateDirectoryTest.java | 2 +-
.../apache/kafka/streams/processor/internals/TaskManagerTest.java | 4 ++--
4 files changed, 8 insertions(+), 7 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index ea2bb7a2f28..6c4e97c101e 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.Time;
@@ -805,7 +806,7 @@ public class StateDirectory implements AutoCloseable {
private final StateManager stateManager;
final StreamsMetricsImpl metricsImpl;
- public StartupContext(final TaskId taskId, final StreamsConfig config,
final StateManager stateManager, final StreamsMetricsImpl metricsImpl,
ThreadCache cache) {
+ public StartupContext(final TaskId taskId, final StreamsConfig config,
final StateManager stateManager, final StreamsMetricsImpl metricsImpl, final
ThreadCache cache) {
super(taskId, config, metricsImpl, cache);
this.stateManager = stateManager;
this.metricsImpl = metricsImpl;
@@ -831,7 +832,7 @@ public class StateDirectory implements AutoCloseable {
}
@Override
- public void logChange(final String storeName, final Bytes key, final
byte[] value, final long timestamp, final Position position) {
+ public void logChange(final String storeName, final Bytes key, final
byte[] value, final long timestamp, final Headers headers, final Position
position) {
throw new IllegalStateException("Should not be called");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index d4e0baef1ef..f94060d45e0 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -327,7 +327,7 @@ public class TaskManager {
assignedTasks.put(taskId, entry.getValue());
}
}
- return activeTaskCreator.createTasks(mainConsumer, assignedTasks);
+ return new ArrayList<>(activeTaskCreator.createTasks(mainConsumer,
assignedTasks));
} else {
return Collections.emptySet();
}
@@ -343,7 +343,7 @@ public class TaskManager {
assignedTasks.put(taskId, inputPartitions);
}
}
- return standbyTaskCreator.createTasks(assignedTasks);
+ return new
ArrayList<>(standbyTaskCreator.createTasks(assignedTasks));
} else {
return Collections.emptySet();
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index e2f57d83422..b89628d225b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -892,7 +892,7 @@ public class StateDirectoryTest {
// we need to set this because the auto-cleanup uses the last-modified
time from the filesystem,
// which can't be mocked
time.setCurrentTimeMs(System.currentTimeMillis());
- TaskId taskId = new TaskId(0, 0);
+ final TaskId taskId = new TaskId(0, 0);
final StateStore store = initializeStartupStores(taskId, true);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 69e071dd2b3..ff2b76c5f5d 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -4902,7 +4902,7 @@ public class TaskManagerTest {
when(stateUpdater.tasks()).thenReturn(Collections.singleton(activeTask));
when(stateUpdater.standbyTasks()).thenReturn(Collections.emptySet());
- InOrder inOrder = inOrder(activeTaskCreator);
+ final InOrder inOrder = inOrder(activeTaskCreator);
inOrder.verify(activeTaskCreator).createTasks(same(consumer),
eq(Map.of(taskId00, taskId00Partitions)));
inOrder.verify(activeTaskCreator).createTasks(consumer, emptyMap());
@@ -4941,7 +4941,7 @@ public class TaskManagerTest {
// ensure we didn't construct any new Tasks, or recycle an existing
Task; we only used the one we already have
verify(activeTaskCreator, times(2)).createTasks(any(),
eq(Collections.emptyMap()));
- InOrder inOrder = inOrder(standbyTaskCreator);
+ final InOrder inOrder = inOrder(standbyTaskCreator);
inOrder.verify(standbyTaskCreator).createTasks(Map.of(taskId00,
taskId00Partitions));
inOrder.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
verifyNoMoreInteractions(activeTaskCreator);