This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 312c1dbe894 HOTFIX: Add headers to log change method, some variables 
final, update… (#21527)
312c1dbe894 is described below

commit 312c1dbe894b7907c48117dd3fa47ca484b3653e
Author: Bill Bejeck <[email protected]>
AuthorDate: Fri Feb 20 11:04:32 2026 -0500

    HOTFIX: Add headers to log change method, some variables final, update… 
(#21527)
    
    1. `StateDirectory` - Added missing Headers parameter to the
    logChange method
    2. `TaskManager`  Fixed type conversion issues for both active and
    standby task creation
    3.  Checkstyle violations by making variables final
    
    Reviewers: Lucas Brutschy <[email protected]>, TengYao
     Chi<[email protected]>
---
 .../org/apache/kafka/streams/processor/internals/StateDirectory.java | 5 +++--
 .../org/apache/kafka/streams/processor/internals/TaskManager.java    | 4 ++--
 .../apache/kafka/streams/processor/internals/StateDirectoryTest.java | 2 +-
 .../apache/kafka/streams/processor/internals/TaskManagerTest.java    | 4 ++--
 4 files changed, 8 insertions(+), 7 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index ea2bb7a2f28..6c4e97c101e 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.utils.Bytes;
 import org.apache.kafka.common.utils.LogContext;
 import org.apache.kafka.common.utils.Time;
@@ -805,7 +806,7 @@ public class StateDirectory implements AutoCloseable {
         private final StateManager stateManager;
         final StreamsMetricsImpl metricsImpl;
 
-        public StartupContext(final TaskId taskId, final StreamsConfig config, 
final StateManager stateManager, final StreamsMetricsImpl metricsImpl, 
ThreadCache cache) {
+        public StartupContext(final TaskId taskId, final StreamsConfig config, 
final StateManager stateManager, final StreamsMetricsImpl metricsImpl, final 
ThreadCache cache) {
             super(taskId, config, metricsImpl, cache);
             this.stateManager = stateManager;
             this.metricsImpl = metricsImpl;
@@ -831,7 +832,7 @@ public class StateDirectory implements AutoCloseable {
         }
 
         @Override
-        public void logChange(final String storeName, final Bytes key, final 
byte[] value, final long timestamp, final Position position) {
+        public void logChange(final String storeName, final Bytes key, final 
byte[] value, final long timestamp, final Headers headers, final Position 
position) {
             throw new IllegalStateException("Should not be called");
         }
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index d4e0baef1ef..f94060d45e0 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -327,7 +327,7 @@ public class TaskManager {
                     assignedTasks.put(taskId, entry.getValue());
                 }
             }
-            return activeTaskCreator.createTasks(mainConsumer, assignedTasks);
+            return new ArrayList<>(activeTaskCreator.createTasks(mainConsumer, 
assignedTasks));
         } else {
             return Collections.emptySet();
         }
@@ -343,7 +343,7 @@ public class TaskManager {
                     assignedTasks.put(taskId, inputPartitions);
                 }
             }
-            return standbyTaskCreator.createTasks(assignedTasks);
+            return new 
ArrayList<>(standbyTaskCreator.createTasks(assignedTasks));
         } else {
             return Collections.emptySet();
         }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
index e2f57d83422..b89628d225b 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateDirectoryTest.java
@@ -892,7 +892,7 @@ public class StateDirectoryTest {
         // we need to set this because the auto-cleanup uses the last-modified 
time from the filesystem,
         // which can't be mocked
         time.setCurrentTimeMs(System.currentTimeMillis());
-        TaskId taskId = new TaskId(0, 0);
+        final TaskId taskId = new TaskId(0, 0);
 
         final StateStore store = initializeStartupStores(taskId, true);
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 69e071dd2b3..ff2b76c5f5d 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -4902,7 +4902,7 @@ public class TaskManagerTest {
         
when(stateUpdater.tasks()).thenReturn(Collections.singleton(activeTask));
         when(stateUpdater.standbyTasks()).thenReturn(Collections.emptySet());
 
-        InOrder inOrder = inOrder(activeTaskCreator);
+        final InOrder inOrder = inOrder(activeTaskCreator);
         inOrder.verify(activeTaskCreator).createTasks(same(consumer), 
eq(Map.of(taskId00, taskId00Partitions)));
         inOrder.verify(activeTaskCreator).createTasks(consumer, emptyMap());
 
@@ -4941,7 +4941,7 @@ public class TaskManagerTest {
 
         // ensure we didn't construct any new Tasks, or recycle an existing 
Task; we only used the one we already have
         verify(activeTaskCreator, times(2)).createTasks(any(), 
eq(Collections.emptyMap()));
-        InOrder inOrder = inOrder(standbyTaskCreator);
+        final InOrder inOrder = inOrder(standbyTaskCreator);
         inOrder.verify(standbyTaskCreator).createTasks(Map.of(taskId00, 
taskId00Partitions));
         inOrder.verify(standbyTaskCreator).createTasks(Collections.emptyMap());
         verifyNoMoreInteractions(activeTaskCreator);

Reply via email to