This is an automated email from the ASF dual-hosted git repository.

bbejeck pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 95291432fa3 KAFKA-14412: Use shared cache for Task offset sums (#20954)
95291432fa3 is described below

commit 95291432fa3f31262a07e58af2247c73a92af8d6
Author: Nick Telford <[email protected]>
AuthorDate: Wed Feb 25 19:00:36 2026 +0000

    KAFKA-14412: Use shared cache for Task offset sums (#20954)
    
    Instead of reading Task state offsets for non-open Tasks from the
    `.checkpoint` file, we now maintain an in-memory cache of the latest
    changelog offsets for every Task on the instance.
    
    On start-up, this cache is seeded with the changelog offsets for every
    on-disk StateStore. Running Active and Standby Tasks then update this
    cache on every checkpoint to ensure it always reflects the offsets
    on-disk.
    
    This breaks the tight coupling between `TaskManager` and `.checkpoint`
    files, which will enable us to remove `.checkpoint` files in a later
    commit as part of KIP-1035.
    
    Reviewers: Eduwer Camacaro<[email protected]>, Bill Bejeck
     <[email protected]>
---
 .../processor/internals/ProcessorStateManager.java |  9 ++++
 .../processor/internals/StateDirectory.java        | 49 +++++++++++++++++
 .../processor/internals/StateManagerUtil.java      |  1 +
 .../streams/processor/internals/TaskManager.java   | 61 +++-------------------
 .../processor/internals/StateManagerUtilTest.java  |  2 +
 .../processor/internals/TaskManagerTest.java       | 57 +++++++++++---------
 6 files changed, 102 insertions(+), 77 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
index f77a1f9632b..86725805ce5 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorStateManager.java
@@ -176,6 +176,7 @@ public class ProcessorStateManager implements StateManager {
     private final FixedOrderMap<String, StateStoreMetadata> stores = new 
FixedOrderMap<>();
     private final FixedOrderMap<String, StateStore> globalStores = new 
FixedOrderMap<>();
 
+    private final StateDirectory stateDirectory;
     private final File baseDir;
     private final OffsetCheckpoint checkpointFile;
 
@@ -211,6 +212,7 @@ public class ProcessorStateManager implements StateManager {
 
         this.baseDir = stateDirectory.getOrCreateDirectoryForTask(taskId);
         this.checkpointFile = new 
OffsetCheckpoint(stateDirectory.checkpointFileFor(taskId));
+        this.stateDirectory = stateDirectory;
 
         log.debug("Created state store manager for task {}", taskId);
     }
@@ -300,6 +302,8 @@ public class ProcessorStateManager implements StateManager {
                 }
             }
 
+            stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
+
             if (!loadedCheckpoints.isEmpty()) {
                 log.warn("Some loaded checkpoint offsets cannot find their 
corresponding state stores: {}", loadedCheckpoints);
             }
@@ -462,10 +466,13 @@ public class ProcessorStateManager implements 
StateManager {
             }
 
             storeMetadata.setOffset(batchEndOffset);
+
             // If null means the lag for this partition is not known yet
             if (optionalLag.isPresent()) {
                 storeMetadata.setEndOffset(optionalLag.getAsLong() + 
batchEndOffset);
             }
+
+            stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
         }
     }
 
@@ -647,6 +654,8 @@ public class ProcessorStateManager implements StateManager {
                         store.stateStore.name(), store.offset, 
store.changelogPartition);
             }
         }
+
+        stateDirectory.updateTaskOffsets(taskId, changelogOffsets());
     }
 
     @Override
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
index 6c4e97c101e..701ac8195d7 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateDirectory.java
@@ -38,6 +38,7 @@ import org.apache.kafka.streams.processor.api.FixedKeyRecord;
 import org.apache.kafka.streams.processor.api.Record;
 import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
 import org.apache.kafka.streams.query.Position;
+import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 import org.apache.kafka.streams.state.internals.ThreadCache;
 
 import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
@@ -65,6 +66,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.UUID;
@@ -124,6 +126,7 @@ public class StateDirectory implements AutoCloseable {
 
     private final StreamsConfig config;
     private final Set<TaskId> tasksInLocalState = new 
ConcurrentSkipListSet<>();
+    private final Map<TaskId, Long> taskOffsetSums = new ConcurrentHashMap<>();
 
     /**
      * Ensures that the state base directory as well as the application's 
sub-directory are created.
@@ -295,6 +298,44 @@ public class StateDirectory implements AutoCloseable {
         }
     }
 
+    public Map<TaskId, Long> taskOffsetSums(final Set<TaskId> tasks) {
+        return taskOffsetSums.entrySet().stream()
+                .filter(e -> tasks.contains(e.getKey()))
+                .collect(Collectors.toMap(Map.Entry::getKey, 
Map.Entry::getValue));
+    }
+
+    public void updateTaskOffsets(final TaskId taskId, final 
Map<TopicPartition, Long> changelogOffsets) {
+        if (!changelogOffsets.isEmpty()) {
+            taskOffsetSums.put(taskId, sumOfChangelogOffsets(taskId, 
changelogOffsets));
+        }
+    }
+
+    public void removeTaskOffsets(final TaskId taskId) {
+        taskOffsetSums.remove(taskId);
+    }
+
+    private long sumOfChangelogOffsets(final TaskId taskId, final 
Map<TopicPartition, Long> changelogOffsets) {
+        long offsetSum = 0L;
+        for (final Map.Entry<TopicPartition, Long> changelogEntry : 
changelogOffsets.entrySet()) {
+            final long offset = changelogEntry.getValue();
+
+            if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) {
+                if (offset < 0) {
+                    throw new StreamsException(
+                            new IllegalStateException("Expected not to get a 
sentinel offset, but got: " + changelogEntry),
+                            taskId);
+                }
+                offsetSum += offset;
+                if (offsetSum < 0) {
+                    log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", taskId);
+                    return Long.MAX_VALUE;
+                }
+            }
+        }
+
+        return offsetSum;
+    }
+
     public UUID initializeProcessId() {
         if (!hasPersistentStores) {
             final UUID processId = UUID.randomUUID();
@@ -502,6 +543,7 @@ public class StateDirectory implements AutoCloseable {
     public void close() {
         if (hasPersistentStores) {
             unlockStartupStores();
+            taskOffsetSums.clear();
             try {
                 stateDirLock.release();
                 stateDirLockChannel.close();
@@ -582,6 +624,7 @@ public class StateDirectory implements AutoCloseable {
                         final long now = time.milliseconds();
                         final long lastModifiedMs = 
taskDir.file().lastModified();
                         if (now - cleanupDelayMs > lastModifiedMs) {
+                            removeTaskOffsets(id);
                             log.info("{} Deleting obsolete state directory {} 
for task {} as {}ms has elapsed (cleanup delay is {}ms).",
                                 logPrefix(), dirName, id, now - 
lastModifiedMs, cleanupDelayMs);
                             removeStartupState(id);
@@ -620,6 +663,9 @@ public class StateDirectory implements AutoCloseable {
         );
         if (namedTopologyDirs != null) {
             for (final File namedTopologyDir : namedTopologyDirs) {
+                final String topologyName = 
parseNamedTopologyFromDirectory(namedTopologyDir.getName());
+                final Set<TaskId> taskKeys = taskOffsetSums.keySet();
+                taskKeys.removeIf(taskId -> 
taskId.topologyName().equals(topologyName));
                 final File[] contents = namedTopologyDir.listFiles();
                 if (contents != null && contents.length == 0) {
                     try {
@@ -657,6 +703,8 @@ public class StateDirectory implements AutoCloseable {
             log.debug("Tried to clear out the local state for NamedTopology {} 
but none was found", topologyName);
         }
         try {
+            final Set<TaskId> taskKeys = taskOffsetSums.keySet();
+            taskKeys.removeIf(taskId -> 
taskId.topologyName().equals(topologyName));
             Utils.delete(namedTopologyDir);
         } catch (final IOException e) {
             log.error("Hit an unexpected error while clearing local state for 
topology " + topologyName, e);
@@ -670,6 +718,7 @@ public class StateDirectory implements AutoCloseable {
             log.warn("Found some still-locked task directories when user 
requested to cleaning up the state, "
                 + "since Streams is not running any more these will be ignored 
to complete the cleanup");
         }
+        taskOffsetSums.clear();
         final AtomicReference<Exception> firstException = new 
AtomicReference<>();
         for (final TaskDirectory taskDir : listAllTaskDirectories()) {
             final String dirName = taskDir.file().getName();
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
index 6706ed89543..e2b92352bf2 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StateManagerUtil.java
@@ -144,6 +144,7 @@ final class StateManagerUtil {
                     try {
                         if (wipeStateStore) {
                             log.debug("Wiping state stores for {} task {}", 
taskType, id);
+                            stateDirectory.removeTaskOffsets(id);
                             // we can just delete the whole dir of the task, 
including the state store images and the checkpoint files,
                             // and then we write an empty checkpoint file 
indicating that the previous close is graceful and we just
                             // need to re-bootstrap the restoration from the 
beginning
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index 3888e4384ba..1657b3a414b 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -40,15 +40,12 @@ import 
org.apache.kafka.streams.internals.StreamsConfigUtils.ProcessingMode;
 import org.apache.kafka.streams.processor.StandbyUpdateListener;
 import org.apache.kafka.streams.processor.TaskId;
 import org.apache.kafka.streams.processor.assignment.ProcessId;
-import 
org.apache.kafka.streams.processor.internals.StateDirectory.TaskDirectory;
 import org.apache.kafka.streams.processor.internals.Task.State;
 import org.apache.kafka.streams.processor.internals.tasks.DefaultTaskManager;
-import org.apache.kafka.streams.state.internals.OffsetCheckpoint;
 
 import org.slf4j.Logger;
 
 import java.io.File;
-import java.io.IOException;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -1242,7 +1239,6 @@ public class TaskManager {
      * Does not include stateless or non-logged tasks.
      */
     public Map<TaskId, Long> taskOffsetSums() {
-        final Map<TaskId, Long> taskOffsetSums = new HashMap<>();
 
         // Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
         // so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
@@ -1250,27 +1246,14 @@ public class TaskManager {
         final Map<TaskId, Task> tasks = allTasks();
         final Set<TaskId> 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks =
             union(HashSet::new, lockedTaskDirectories, tasks.keySet());
-        for (final Task task : tasks.values()) {
-            if (task.state() != State.CREATED && task.state() != State.CLOSED) 
{
-                final Map<TopicPartition, Long> changelogOffsets = 
task.changelogOffsets();
-                if (changelogOffsets.isEmpty()) {
-                    log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}",
-                        task.id());
-                } else {
-                    taskOffsetSums.put(task.id(), 
sumOfChangelogOffsets(task.id(), changelogOffsets));
-                }
-                
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks.remove(task.id());
-            }
-        }
 
-        for (final TaskId id : 
lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks) {
-            final File checkpointFile = stateDirectory.checkpointFileFor(id);
-            try {
-                if (checkpointFile.exists()) {
-                    taskOffsetSums.put(id, sumOfChangelogOffsets(id, new 
OffsetCheckpoint(checkpointFile).read()));
-                }
-            } catch (final IOException e) {
-                log.warn(String.format("Exception caught while trying to read 
checkpoint for task %s:", id), e);
+        final Map<TaskId, Long> taskOffsetSums = 
stateDirectory.taskOffsetSums(lockedTaskDirectoriesOfNonOwnedTasksAndClosedAndCreatedTasks);
+
+        // overlay latest offsets from assigned tasks
+        for (final Task task : tasks.values()) {
+            // exclude stateless and non-logged tasks
+            if (task.isActive() && task.state() == State.RUNNING && 
!task.changelogPartitions().isEmpty()) {
+                taskOffsetSums.put(task.id(), Task.LATEST_OFFSET);
             }
         }
 
@@ -1289,7 +1272,7 @@ public class TaskManager {
         lockedTaskDirectories.clear();
 
         final Map<TaskId, Task> allTasks = allTasks();
-        for (final TaskDirectory taskDir : 
stateDirectory.listNonEmptyTaskDirectories()) {
+        for (final StateDirectory.TaskDirectory taskDir : 
stateDirectory.listNonEmptyTaskDirectories()) {
             final File dir = taskDir.file();
             final String namedTopology = taskDir.namedTopology();
             try {
@@ -1343,34 +1326,6 @@ public class TaskManager {
         }
     }
 
-    private long sumOfChangelogOffsets(final TaskId id, final 
Map<TopicPartition, Long> changelogOffsets) {
-        long offsetSum = 0L;
-        for (final Map.Entry<TopicPartition, Long> changelogEntry : 
changelogOffsets.entrySet()) {
-            final long offset = changelogEntry.getValue();
-
-
-            if (offset == Task.LATEST_OFFSET) {
-                // this condition can only be true for active tasks; never for 
standby
-                // for this case, the offset of all partitions is set to 
`LATEST_OFFSET`
-                // and we "forward" the sentinel value directly
-                return Task.LATEST_OFFSET;
-            } else if (offset != OffsetCheckpoint.OFFSET_UNKNOWN) {
-                if (offset < 0) {
-                    throw new StreamsException(
-                        new IllegalStateException("Expected not to get a 
sentinel offset, but got: " + changelogEntry),
-                        id);
-                }
-                offsetSum += offset;
-                if (offsetSum < 0) {
-                    log.warn("Sum of changelog offsets for task {} overflowed, 
pinning to Long.MAX_VALUE", id);
-                    return Long.MAX_VALUE;
-                }
-            }
-        }
-
-        return offsetSum;
-    }
-
     private void closeTaskDirty(final Task task, final boolean 
removeFromTasksRegistry) {
         try {
             // we call this function only to flush the case if necessary
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
index 5d0acf46bd5..1ee6594d119 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StateManagerUtilTest.java
@@ -171,6 +171,7 @@ public class StateManagerUtilTest {
             "logPrefix:", false, true, stateManager, stateDirectory, 
TaskType.ACTIVE);
 
         inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).removeTaskOffsets(taskId);
         inOrder.verify(stateDirectory).unlock(taskId);
         verifyNoMoreInteractions(stateManager, stateDirectory);
     }
@@ -211,6 +212,7 @@ public class StateManagerUtilTest {
         }
 
         inOrder.verify(stateManager).close();
+        inOrder.verify(stateDirectory).removeTaskOffsets(taskId);
         inOrder.verify(stateDirectory).unlock(taskId);
         verifyNoMoreInteractions(stateManager, stateDirectory);
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index ff2b76c5f5d..8f9d228c273 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -95,6 +95,7 @@ import static org.apache.kafka.common.utils.Utils.union;
 import static 
org.apache.kafka.streams.processor.internals.TopologyMetadata.UNNAMED_TOPOLOGY;
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.standbyTask;
 import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statefulTask;
+import static org.apache.kafka.test.StreamsTestUtils.TaskBuilder.statelessTask;
 import static org.hamcrest.CoreMatchers.hasItem;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.instanceOf;
@@ -1836,6 +1837,19 @@ public class TaskManagerTest {
         );
     }
 
+    @Test
+    public void shouldNotComputeOffsetSumForRunningStatelessTask() {
+        final StreamTask runningStatelessTask = 
statelessTask(taskId00).inState(State.RUNNING).build();
+        final TasksRegistry tasks = mock(TasksRegistry.class);
+        final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
+        
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatelessTask)));
+
+        assertThat(
+                taskManager.taskOffsetSums(),
+                is(emptyMap())
+        );
+    }
+
     @Test
     public void shouldComputeOffsetSumForNonRunningActiveTask() throws 
Exception {
         final StreamTask restoringStatefulTask = statefulTask(taskId00, 
taskId00ChangelogPartitions)
@@ -1852,6 +1866,7 @@ public class TaskManagerTest {
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
+        
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
 
         assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
     }
@@ -1874,6 +1889,7 @@ public class TaskManagerTest {
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
         taskManager.handleRebalanceStart(singleton("topic"));
 
+        
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
         assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
     }
 
@@ -1885,12 +1901,11 @@ public class TaskManagerTest {
         
when(restoringStandbyTask.changelogOffsets()).thenReturn(mkMap(mkEntry(t1p0changelog,
 changelogOffset)));
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
-        final Map<TopicPartition, Long> changelogOffsetInCheckpoint = 
mkMap(mkEntry(t1p0changelog, 24L));
-        writeCheckpointFile(taskId00, changelogOffsetInCheckpoint);
         final TasksRegistry tasks = mock(TasksRegistry.class);
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask));
         taskManager.handleRebalanceStart(singleton("topic"));
+        
when(stateDirectory.taskOffsetSums(Collections.singleton(taskId00))).thenReturn(mkMap(mkEntry(taskId00,
 changelogOffset)));
 
         assertThat(taskManager.taskOffsetSums(), is(mkMap(mkEntry(taskId00, 
changelogOffset))));
     }
@@ -1916,6 +1931,12 @@ public class TaskManagerTest {
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
         
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
runningStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStandbyTask, 
restoringStatefulTask));
+        when(stateDirectory.taskOffsetSums(Set.of(taskId00, taskId01, 
taskId02)))
+                .thenReturn(mkMap(
+                        mkEntry(taskId00, changelogOffsetOfRunningTask),
+                        mkEntry(taskId01, 
changelogOffsetOfRestoringStatefulTask),
+                        mkEntry(taskId02, 
changelogOffsetOfRestoringStandbyTask)
+                ));
 
         assertThat(
             taskManager.taskOffsetSums(),
@@ -1942,12 +1963,12 @@ public class TaskManagerTest {
         
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId01, 
restoringStatefulTask)));
         when(stateUpdater.tasks()).thenReturn(Set.of(restoringStatefulTask));
 
-        assertThat(
-            taskManager.taskOffsetSums(),
-            is(mkMap(
+        final Map<TaskId, Long> expectedOffsetSums = mkMap(
                 mkEntry(taskId01, changelogOffsetOfRestoringStandbyTask)
-            ))
         );
+
+        
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
+        assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
     }
 
     @Test
@@ -1976,20 +1997,17 @@ public class TaskManagerTest {
         taskManager.handleRebalanceStart(singleton("topic"));
         taskManager.handleAssignment(emptyMap(), taskId00Assignment);
 
+        
when(stateDirectory.taskOffsetSums(any())).thenReturn(expectedOffsetSums);
         assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
     }
 
     @Test
     public void shouldComputeOffsetSumForUnassignedTaskWeCanLock() throws 
Exception {
-        final Map<TopicPartition, Long> changelogOffsets = mkMap(
-            mkEntry(new TopicPartition("changelog", 0), 5L),
-            mkEntry(new TopicPartition("changelog", 1), 10L)
-        );
         final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 
15L));
 
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
-        writeCheckpointFile(taskId00, changelogOffsets);
+        
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
 
         taskManager.handleRebalanceStart(singleton("topic"));
 
@@ -1999,10 +2017,6 @@ public class TaskManagerTest {
     @ParameterizedTest
     @EnumSource(value = State.class, names = {"CREATED", "CLOSED"})
     public void 
shouldComputeOffsetSumFromCheckpointFileForCreatedAndClosedTasks(final State 
state) throws Exception {
-        final Map<TopicPartition, Long> changelogOffsets = mkMap(
-            mkEntry(new TopicPartition("changelog", 0), 5L),
-            mkEntry(new TopicPartition("changelog", 1), 10L)
-        );
         final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 
15L));
 
         final StreamTask task = statefulTask(taskId00, 
taskId00ChangelogPartitions)
@@ -2012,15 +2026,17 @@ public class TaskManagerTest {
 
         final TasksRegistry tasks = mock(TasksRegistry.class);
         
when(tasks.allInitializedTasksPerId()).thenReturn(mkMap(mkEntry(taskId00, 
task)));
+        
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
 
         final TaskManager taskManager = 
setUpTaskManager(ProcessingMode.AT_LEAST_ONCE, tasks);
 
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
-        writeCheckpointFile(taskId00, changelogOffsets);
+        
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
 
         taskManager.handleRebalanceStart(singleton("topic"));
 
+        
when(stateDirectory.taskOffsetSums(Collections.singleton(taskId00))).thenReturn(expectedOffsetSums);
         assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));
     }
     
@@ -2039,7 +2055,6 @@ public class TaskManagerTest {
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
         expectDirectoryNotEmpty(taskId00);
-        
when(stateDirectory.checkpointFileFor(taskId00)).thenReturn(getCheckpointFile(taskId00));
         taskManager.handleRebalanceStart(singleton("topic"));
 
         assertTrue(taskManager.taskOffsetSums().isEmpty());
@@ -2047,17 +2062,11 @@ public class TaskManagerTest {
 
     @Test
     public void shouldPinOffsetSumToLongMaxValueInCaseOfOverflow() throws 
Exception {
-        final long largeOffset = Long.MAX_VALUE / 2;
-        final Map<TopicPartition, Long> changelogOffsets = mkMap(
-            mkEntry(new TopicPartition("changelog", 1), largeOffset),
-            mkEntry(new TopicPartition("changelog", 2), largeOffset),
-            mkEntry(new TopicPartition("changelog", 3), largeOffset)
-        );
         final Map<TaskId, Long> expectedOffsetSums = mkMap(mkEntry(taskId00, 
Long.MAX_VALUE));
 
         expectLockObtainedFor(taskId00);
         makeTaskFolders(taskId00.toString());
-        writeCheckpointFile(taskId00, changelogOffsets);
+        
when(stateDirectory.taskOffsetSums(expectedOffsetSums.keySet())).thenReturn(expectedOffsetSums);
         taskManager.handleRebalanceStart(singleton("topic"));
 
         assertThat(taskManager.taskOffsetSums(), is(expectedOffsetSums));

Reply via email to