This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new ee4debb9f0d KAFKA-19128: Kafka Streams should not get offsets when
close dirty (#19450)
ee4debb9f0d is described below
commit ee4debb9f0dc2749b317654f2e5bcf7bdab35884
Author: Alieh Saeedi <[email protected]>
AuthorDate: Fri Apr 25 06:23:07 2025 +0200
KAFKA-19128: Kafka Streams should not get offsets when close dirty (#19450)
Kafka Streams calls `prepareCommit()` in `Taskmanager#closeTaskDirty()`.
However, the dirty task must not get committed and therefore,
prepare-commit tasks such as getting offsets should not be needed as
well. The only thing needed before closing a task dirty is flushing.
Therefore, separating `flush` and `prepareCommit` could be a good fix.
Reviewers: Bill Bejeck <[email protected]>, Matthias J. Sax
<[email protected]>
---
.../streams/processor/internals/ReadOnlyTask.java | 2 +-
.../streams/processor/internals/StandbyTask.java | 10 ++-
.../streams/processor/internals/StreamTask.java | 7 +-
.../kafka/streams/processor/internals/Task.java | 2 +-
.../streams/processor/internals/TaskExecutor.java | 2 +-
.../streams/processor/internals/TaskManager.java | 12 ++--
.../processor/internals/StandbyTaskTest.java | 14 ++--
.../processor/internals/StreamTaskTest.java | 82 +++++++++++++---------
.../processor/internals/TaskManagerTest.java | 49 +++++++------
.../internals/tasks/DefaultTaskExecutorTest.java | 2 +-
.../apache/kafka/streams/TopologyTestDriver.java | 6 +-
11 files changed, 107 insertions(+), 81 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
index a895b71e4e9..dd5a2c6e1d7 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java
@@ -180,7 +180,7 @@ public class ReadOnlyTask implements Task {
}
@Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean
clean) {
throw new UnsupportedOperationException("This task is read-only");
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
index 417f754ca2c..4c6e6674bdb 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StandbyTask.java
@@ -179,7 +179,7 @@ public class StandbyTask extends AbstractTask implements
Task {
* or flushing state store get IO errors; such
error should cause the thread to die
*/
@Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean
clean) {
switch (state()) {
case CREATED:
log.debug("Skipped preparing created task for commit");
@@ -189,7 +189,11 @@ public class StandbyTask extends AbstractTask implements
Task {
case RUNNING:
case SUSPENDED:
// do not need to flush state store caches in pre-commit since
nothing would be sent for standby tasks
- log.debug("Prepared {} task for committing", state());
+ if (!clean) {
+ log.debug("Skipped preparing {} standby task with id {}
for commit since the task is getting closed dirty.", state(), id);
+ } else {
+ log.debug("Prepared {} task for committing", state());
+ }
break;
@@ -197,7 +201,7 @@ public class StandbyTask extends AbstractTask implements
Task {
throw new IllegalStateException("Illegal state " + state() + "
while preparing standby task " + id + " for committing ");
}
- return Collections.emptyMap();
+ return clean ? Collections.emptyMap() : null;
}
@Override
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 424d6f7af61..93737d82289 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -417,7 +417,6 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
timeCurrentIdlingStarted = Optional.empty();
}
-
public void flush() {
stateMgr.flushCache();
recordCollector.flush();
@@ -429,7 +428,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
* @return offsets that should be committed for this task
*/
@Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean
clean) {
switch (state()) {
case CREATED:
case RESTORING:
@@ -444,6 +443,10 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator,
//
// TODO: this should be removed after we decouple caching
with emitting
flush();
+ if (!clean) {
+ log.debug("Skipped preparing {} task with id {} for
commit since the task is getting closed dirty.", state(), id);
+ return null;
+ }
hasPendingTxCommit = eosEnabled;
log.debug("Prepared {} task for committing", state());
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
index 484c1ca574b..ba09700af8a 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/Task.java
@@ -201,7 +201,7 @@ public interface Task {
/**
* @throws StreamsException fatal error, should close the thread
*/
- Map<TopicPartition, OffsetAndMetadata> prepareCommit();
+ Map<TopicPartition, OffsetAndMetadata> prepareCommit(final boolean clean);
void postCommit(boolean enforceCheckpoint);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
index c993787503e..91deab0dd9d 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskExecutor.java
@@ -142,7 +142,7 @@ public class TaskExecutor {
for (final Task task : tasksToCommit) {
// we need to call commitNeeded first since we need to update
committable offsets
if (task.commitNeeded()) {
- final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata
= task.prepareCommit();
+ final Map<TopicPartition, OffsetAndMetadata> offsetAndMetadata
= task.prepareCommit(true);
if (!offsetAndMetadata.isEmpty()) {
consumedOffsetsAndMetadata.put(task, offsetAndMetadata);
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
index eccf0c8f33d..9376e6887f3 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
@@ -278,7 +278,7 @@ public class TaskManager {
// we do not need to take the returned offsets since we
are not going to commit anyways;
// this call is only used for active tasks to flush the
cache before suspending and
// closing the topology
- task.prepareCommit();
+ task.prepareCommit(false);
} catch (final RuntimeException swallow) {
log.warn("Error flushing cache for corrupted task {}. " +
"Since the task is closing dirty, the following
exception is swallowed: {}",
@@ -812,7 +812,7 @@ public class TaskManager {
// and their changelog positions should not change at all
postCommit would not write the checkpoint again.
// 2) for standby tasks prepareCommit should always return
empty, and then in postCommit we would probably
// write the checkpoint file.
- final Map<TopicPartition, OffsetAndMetadata> offsets =
task.prepareCommit();
+ final Map<TopicPartition, OffsetAndMetadata> offsets =
task.prepareCommit(true);
if (!offsets.isEmpty()) {
log.error("Task {} should have been committed when it was
suspended, but it reports non-empty " +
"offsets {} to commit; this means it
failed during last commit and hence should be closed dirty",
@@ -1264,7 +1264,7 @@ public class TaskManager {
final Map<Task,
Map<TopicPartition, OffsetAndMetadata>> consumedOffsetsPerTask) {
for (final Task task : tasksToPrepare) {
try {
- final Map<TopicPartition, OffsetAndMetadata>
committableOffsets = task.prepareCommit();
+ final Map<TopicPartition, OffsetAndMetadata>
committableOffsets = task.prepareCommit(true);
if (!committableOffsets.isEmpty()) {
consumedOffsetsPerTask.put(task, committableOffsets);
}
@@ -1479,7 +1479,7 @@ public class TaskManager {
try {
// we call this function only to flush the case if necessary
// before suspending and closing the topology
- task.prepareCommit();
+ task.prepareCommit(false);
} catch (final RuntimeException swallow) {
log.warn("Error flushing cache of dirty task {}. " +
"Since the task is closing dirty, the following exception is
swallowed: {}",
@@ -1630,7 +1630,7 @@ public class TaskManager {
// first committing all tasks and then suspend and close them clean
for (final Task task : activeTasksToClose) {
try {
- final Map<TopicPartition, OffsetAndMetadata>
committableOffsets = task.prepareCommit();
+ final Map<TopicPartition, OffsetAndMetadata>
committableOffsets = task.prepareCommit(true);
tasksToCommit.add(task);
if (!committableOffsets.isEmpty()) {
consumedOffsetsAndMetadataPerTask.put(task,
committableOffsets);
@@ -1719,7 +1719,7 @@ public class TaskManager {
// first committing and then suspend / close clean
for (final Task task : standbyTasksToClose) {
try {
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(true);
task.suspend();
closeTaskClean(task);
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
index e953a61fc1f..768f3787d0b 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java
@@ -213,7 +213,7 @@ public class StandbyTaskTest {
task.suspend();
task.closeClean();
- assertThrows(IllegalStateException.class, task::prepareCommit);
+ assertThrows(IllegalStateException.class, () ->
task.prepareCommit(true));
}
@Test
@@ -261,13 +261,13 @@ public class StandbyTaskTest {
task = createStandbyTask();
task.initializeIfNeeded();
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(false); // this should not checkpoint
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(false); // this should checkpoint
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(false); // this should not checkpoint
verify(stateManager).checkpoint();
@@ -322,7 +322,7 @@ public class StandbyTaskTest {
task = createStandbyTask();
task.initializeIfNeeded();
task.suspend();
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(true);
task.closeClean();
@@ -360,7 +360,7 @@ public class StandbyTaskTest {
// could commit if the offset advanced beyond threshold
assertTrue(task.commitNeeded());
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(true);
}
@@ -389,7 +389,7 @@ public class StandbyTaskTest {
task = createStandbyTask();
task.initializeIfNeeded();
- task.prepareCommit();
+ task.prepareCommit(true);
assertThrows(RuntimeException.class, () -> task.postCommit(true));
assertEquals(RUNNING, task.state());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
index bcf24ee7df8..98807cd6342 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
@@ -645,6 +645,22 @@ public class StreamTaskTest {
assertEquals(asList(201, 202, 203), source2.values);
}
+ @Test
+ public void shouldNotGetOffsetsIfPrepareCommitDirty() {
+ when(stateManager.taskId()).thenReturn(taskId);
+ when(stateManager.taskType()).thenReturn(TaskType.ACTIVE);
+ task = createStatefulTask(createConfig("100"), false);
+
+ task.addRecords(partition1,
List.of(getConsumerRecordWithOffsetAsTimestamp(partition1, 0)));
+ task.addRecords(partition2,
List.of(getConsumerRecordWithOffsetAsTimestamp(partition2, 0)));
+
+ assertTrue(task.process(0L));
+ assertTrue(task.commitNeeded());
+
+ // committableOffsetsAndMetadata() has not been called, otherwise
prepareCommit() would have returned a map
+ assertNull(task.prepareCommit(false));
+ }
+
@Test
public void shouldProcessRecordsAfterPrepareCommitWhenEosDisabled() {
when(stateManager.taskId()).thenReturn(taskId);
@@ -660,7 +676,7 @@ public class StreamTaskTest {
));
assertTrue(task.process(time.milliseconds()));
- task.prepareCommit();
+ task.prepareCommit(true);
assertTrue(task.process(time.milliseconds()));
task.postCommit(false);
assertTrue(task.process(time.milliseconds()));
@@ -683,7 +699,7 @@ public class StreamTaskTest {
));
assertTrue(task.process(time.milliseconds()));
- task.prepareCommit();
+ task.prepareCommit(true);
assertFalse(task.process(time.milliseconds()));
task.postCommit(false);
assertTrue(task.process(time.milliseconds()));
@@ -1328,7 +1344,7 @@ public class StreamTaskTest {
assertTrue(task.process(0L));
assertTrue(task.commitNeeded());
- task.prepareCommit();
+ task.prepareCommit(true);
assertTrue(task.commitNeeded());
task.postCommit(true);
@@ -1338,7 +1354,7 @@ public class StreamTaskTest {
assertTrue(task.maybePunctuateStreamTime());
assertTrue(task.commitNeeded());
- task.prepareCommit();
+ task.prepareCommit(true);
assertTrue(task.commitNeeded());
task.postCommit(true);
@@ -1349,7 +1365,7 @@ public class StreamTaskTest {
assertTrue(task.maybePunctuateSystemTime());
assertTrue(task.commitNeeded());
- task.prepareCommit();
+ task.prepareCommit(true);
assertTrue(task.commitNeeded());
task.postCommit(true);
@@ -1374,7 +1390,7 @@ public class StreamTaskTest {
task.process(0L);
processorSystemTime.mockProcessor.addProcessorMetadata("key2", 200L);
- final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata =
task.prepareCommit();
+ final Map<TopicPartition, OffsetAndMetadata> offsetsAndMetadata =
task.prepareCommit(true);
final TopicPartitionMetadata expected = new TopicPartitionMetadata(3L,
new ProcessorMetadata(
mkMap(
@@ -1413,7 +1429,7 @@ public class StreamTaskTest {
final TopicPartitionMetadata metadata = new TopicPartitionMetadata(0,
new ProcessorMetadata());
assertTrue(task.commitNeeded());
- assertThat(task.prepareCommit(), equalTo(
+ assertThat(task.prepareCommit(true), equalTo(
mkMap(
mkEntry(partition1, new OffsetAndMetadata(3L,
Optional.of(2), metadata.encode()))
)
@@ -1430,7 +1446,7 @@ public class StreamTaskTest {
task.process(0L);
assertTrue(task.commitNeeded());
- assertThat(task.prepareCommit(), equalTo(
+ assertThat(task.prepareCommit(true), equalTo(
mkMap(
mkEntry(partition1, new OffsetAndMetadata(3L, Optional.of(2),
metadata.encode())),
mkEntry(partition2, new OffsetAndMetadata(1L, Optional.of(0),
metadata.encode()))
@@ -1486,7 +1502,7 @@ public class StreamTaskTest {
assertTrue(task.commitNeeded());
- assertThat(task.prepareCommit(), equalTo(
+ assertThat(task.prepareCommit(true), equalTo(
mkMap(
mkEntry(partition1, new OffsetAndMetadata(1L, Optional.of(1),
expectedMetadata1.encode())),
mkEntry(partition2, new OffsetAndMetadata(2L, Optional.of(1),
expectedMetadata2.encode()))
@@ -1509,7 +1525,7 @@ public class StreamTaskTest {
assertTrue(task.commitNeeded());
// Processor metadata not updated, we just need to commit to
partition1 again with new offset
- assertThat(task.prepareCommit(), equalTo(
+ assertThat(task.prepareCommit(true), equalTo(
mkMap(mkEntry(partition1, new OffsetAndMetadata(2L,
Optional.of(1), expectedMetadata3.encode())))
));
task.postCommit(false);
@@ -1526,7 +1542,7 @@ public class StreamTaskTest {
final IllegalStateException thrown = assertThrows(
IllegalStateException.class,
- task::prepareCommit
+ () -> task.prepareCommit(true)
);
assertThat(thrown.getMessage(), is("Illegal state CLOSED while
preparing active task 0_0 for committing"));
@@ -1820,10 +1836,10 @@ public class StreamTaskTest {
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { }); // should checkpoint
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(true); // should checkpoint
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(false); // should not checkpoint
assertThat("Map was empty", task.highWaterMark().size() == 2);
@@ -1847,10 +1863,10 @@ public class StreamTaskTest {
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { }); // should checkpoint
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(true); // should checkpoint
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(false); // should checkpoint since the offset delta is
greater than the threshold
assertThat("Map was empty", task.highWaterMark().size() == 2);
@@ -1866,7 +1882,7 @@ public class StreamTaskTest {
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(false);
final File checkpointFile = new File(
stateDirectory.getOrCreateDirectoryForTask(taskId),
@@ -2011,7 +2027,7 @@ public class StreamTaskTest {
assertTrue(task.process(0L));
assertTrue(task.process(0L));
- task.prepareCommit();
+ task.prepareCommit(true);
if (doCommit) {
task.updateCommittedOffsets(repartition, 10L);
}
@@ -2050,7 +2066,7 @@ public class StreamTaskTest {
task.transitionTo(SUSPENDED);
task.transitionTo(Task.State.CLOSED);
- assertThrows(IllegalStateException.class, task::prepareCommit);
+ assertThrows(IllegalStateException.class, () ->
task.prepareCommit(true));
}
@Test
@@ -2101,7 +2117,7 @@ public class StreamTaskTest {
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { });
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(false);
task.suspend();
@@ -2123,7 +2139,7 @@ public class StreamTaskTest {
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { }); // should checkpoint
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(false); // should checkpoint since the offset delta is
greater than the threshold
task.suspend();
@@ -2207,7 +2223,7 @@ public class StreamTaskTest {
task.initializeIfNeeded();
task.completeRestoration(noOpResetter -> { }); // should flush and
checkpoint
task.suspend();
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(true); // should flush and checkpoint
task.closeClean();
@@ -2277,7 +2293,7 @@ public class StreamTaskTest {
assertTrue(task.commitNeeded());
task.suspend();
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(false);
assertEquals(SUSPENDED, task.state());
@@ -2307,7 +2323,7 @@ public class StreamTaskTest {
assertTrue(task.commitNeeded());
task.suspend();
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(true); // should checkpoint
assertThrows(ProcessorStateException.class, () -> task.closeClean());
@@ -2336,7 +2352,7 @@ public class StreamTaskTest {
task.addRecords(partition1,
singletonList(getConsumerRecordWithOffsetAsTimestamp(partition1, offset)));
task.process(100L);
- assertThrows(ProcessorStateException.class, task::prepareCommit);
+ assertThrows(ProcessorStateException.class, () ->
task.prepareCommit(true));
assertEquals(RUNNING, task.state());
@@ -2369,7 +2385,7 @@ public class StreamTaskTest {
assertTrue(task.commitNeeded());
task.suspend();
- task.prepareCommit();
+ task.prepareCommit(true);
assertThrows(ProcessorStateException.class, () ->
task.postCommit(true));
assertEquals(Task.State.SUSPENDED, task.state());
@@ -2672,7 +2688,7 @@ public class StreamTaskTest {
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
- task.prepareCommit(),
+ task.prepareCommit(true),
equalTo(mkMap(mkEntry(partition1,
new OffsetAndMetadata(offset + 1,
new TopicPartitionMetadata(RecordQueue.UNKNOWN, new
ProcessorMetadata()).encode()))))
@@ -2704,7 +2720,7 @@ public class StreamTaskTest {
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
- task.prepareCommit(),
+ task.prepareCommit(true),
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset +
1, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode()))))
);
}
@@ -2734,14 +2750,14 @@ public class StreamTaskTest {
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
- task.prepareCommit(),
+ task.prepareCommit(true),
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(1, new
TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
);
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
- task.prepareCommit(),
+ task.prepareCommit(true),
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(2, new
TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
);
}
@@ -2771,7 +2787,7 @@ public class StreamTaskTest {
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
- task.prepareCommit(),
+ task.prepareCommit(true),
equalTo(mkMap(mkEntry(partition1,
new OffsetAndMetadata(offset + 1,
new TopicPartitionMetadata(RecordQueue.UNKNOWN, new
ProcessorMetadata()).encode()))))
@@ -2803,7 +2819,7 @@ public class StreamTaskTest {
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
- task.prepareCommit(),
+ task.prepareCommit(true),
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(offset +
1, new TopicPartitionMetadata(offset, new ProcessorMetadata()).encode()))))
);
}
@@ -2834,14 +2850,14 @@ public class StreamTaskTest {
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
- task.prepareCommit(),
+ task.prepareCommit(true),
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(1, new
TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
);
assertTrue(task.process(offset));
assertTrue(task.commitNeeded());
assertThat(
- task.prepareCommit(),
+ task.prepareCommit(true),
equalTo(mkMap(mkEntry(partition1, new OffsetAndMetadata(2, new
TopicPartitionMetadata(0, new ProcessorMetadata()).encode()))))
);
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
index 9d7df53adbe..d8bb35c000a 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java
@@ -459,7 +459,7 @@ public class TaskManagerTest {
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
- verify(activeTaskToClose).prepareCommit();
+ verify(activeTaskToClose).prepareCommit(false);
verify(activeTaskToClose).suspend();
verify(activeTaskToClose).closeDirty();
verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
@@ -500,7 +500,7 @@ public class TaskManagerTest {
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
- verify(standbyTaskToClose).prepareCommit();
+ verify(standbyTaskToClose).prepareCommit(false);
verify(standbyTaskToClose).suspend();
verify(standbyTaskToClose).closeDirty();
verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
@@ -996,7 +996,7 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01,
taskId01Partitions)));
- verify(activeTaskToRecycle).prepareCommit();
+ verify(activeTaskToRecycle).prepareCommit(true);
verify(tasks).addPendingTasksToInit(Set.of(standbyTask));
verify(tasks).removeTask(activeTaskToRecycle);
verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
@@ -1019,7 +1019,7 @@ public class TaskManagerTest {
taskManager.handleAssignment(emptyMap(), mkMap(mkEntry(taskId01,
taskId01Partitions)));
- verify(activeTaskToRecycle).prepareCommit();
+ verify(activeTaskToRecycle).prepareCommit(true);
verify(tasks).replaceActiveWithStandby(standbyTask);
verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
@@ -1059,7 +1059,7 @@ public class TaskManagerTest {
taskManager.handleAssignment(Collections.emptyMap(),
Collections.emptyMap());
verify(activeTaskCreator).createTasks(consumer,
Collections.emptyMap());
- verify(activeTaskToClose).prepareCommit();
+ verify(activeTaskToClose).prepareCommit(true);
verify(activeTaskToClose).closeClean();
verify(tasks).removeTask(activeTaskToClose);
verify(standbyTaskCreator).createTasks(Collections.emptyMap());
@@ -1536,10 +1536,10 @@ public class TaskManagerTest {
taskManager.handleLostAll();
- verify(task1).prepareCommit();
+ verify(task1).prepareCommit(false);
verify(task1).suspend();
verify(task1).closeDirty();
- verify(task2).prepareCommit();
+ verify(task2).prepareCommit(false);
verify(task2).suspend();
verify(task2).closeDirty();
}
@@ -1569,7 +1569,7 @@ public class TaskManagerTest {
verify(task1).suspend();
verify(task1).closeClean();
- verify(task2).prepareCommit();
+ verify(task2).prepareCommit(false);
verify(task2).suspend();
verify(task2).closeDirty();
verify(task3).suspend();
@@ -2386,10 +2386,10 @@ public class TaskManagerTest {
taskManager.handleCorruption(Set.of(taskId02));
verify(activeRestoringTask, never()).commitNeeded();
- verify(activeRestoringTask, never()).prepareCommit();
+ verify(activeRestoringTask, never()).prepareCommit(true);
verify(activeRestoringTask, never()).postCommit(anyBoolean());
verify(standbyTask, never()).commitNeeded();
- verify(standbyTask, never()).prepareCommit();
+ verify(standbyTask, never()).prepareCommit(true);
verify(standbyTask, never()).postCommit(anyBoolean());
}
@@ -2418,9 +2418,9 @@ public class TaskManagerTest {
taskManager.handleCorruption(Set.of(taskId02));
verify(activeRestoringTask, never()).commitNeeded();
- verify(activeRestoringTask, never()).prepareCommit();
+ verify(activeRestoringTask, never()).prepareCommit(true);
verify(activeRestoringTask, never()).postCommit(anyBoolean());
- verify(standbyTask).prepareCommit();
+ verify(standbyTask).prepareCommit(true);
verify(standbyTask).postCommit(anyBoolean());
}
@@ -2431,7 +2431,7 @@ public class TaskManagerTest {
final StateMachineTask corruptedStandby = new
StateMachineTask(taskId00, taskId00Partitions, false, stateManager);
final StateMachineTask runningNonCorruptedActive = new
StateMachineTask(taskId01, taskId01Partitions, true, stateManager) {
@Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final
boolean clean) {
throw new TaskMigratedException("You dropped out of the
group!", new RuntimeException());
}
};
@@ -3394,7 +3394,7 @@ public class TaskManagerTest {
final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, false, stateManager) {
@Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final
boolean clean) {
throw new RuntimeException("task 0_1 prepare commit boom!");
}
};
@@ -3560,7 +3560,7 @@ public class TaskManagerTest {
verify(activeTaskCreator).close();
verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
- verify(failedStatefulTask).prepareCommit();
+ verify(failedStatefulTask).prepareCommit(false);
verify(failedStatefulTask).suspend();
verify(failedStatefulTask).closeDirty();
}
@@ -3634,16 +3634,16 @@ public class TaskManagerTest {
verify(stateUpdater).shutdown(Duration.ofMillis(Long.MAX_VALUE));
verify(tasks).addTask(removedStatefulTask);
verify(tasks).addTask(removedStandbyTask);
- verify(removedFailedStatefulTask).prepareCommit();
+ verify(removedFailedStatefulTask).prepareCommit(false);
verify(removedFailedStatefulTask).suspend();
verify(removedFailedStatefulTask).closeDirty();
- verify(removedFailedStandbyTask).prepareCommit();
+ verify(removedFailedStandbyTask).prepareCommit(false);
verify(removedFailedStandbyTask).suspend();
verify(removedFailedStandbyTask).closeDirty();
- verify(removedFailedStatefulTaskDuringRemoval).prepareCommit();
+ verify(removedFailedStatefulTaskDuringRemoval).prepareCommit(false);
verify(removedFailedStatefulTaskDuringRemoval).suspend();
verify(removedFailedStatefulTaskDuringRemoval).closeDirty();
- verify(removedFailedStandbyTaskDuringRemoval).prepareCommit();
+ verify(removedFailedStandbyTaskDuringRemoval).prepareCommit(false);
verify(removedFailedStandbyTaskDuringRemoval).suspend();
verify(removedFailedStandbyTaskDuringRemoval).closeDirty();
}
@@ -3869,7 +3869,7 @@ public class TaskManagerTest {
public void shouldPropagateExceptionFromActiveCommit() {
final StateMachineTask task00 = new StateMachineTask(taskId00,
taskId00Partitions, true, stateManager) {
@Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final
boolean clean) {
throw new RuntimeException("opsh.");
}
};
@@ -3893,7 +3893,7 @@ public class TaskManagerTest {
public void shouldPropagateExceptionFromStandbyCommit() {
final StateMachineTask task01 = new StateMachineTask(taskId01,
taskId01Partitions, false, stateManager) {
@Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final
boolean clean) {
throw new RuntimeException("opsh.");
}
};
@@ -4689,7 +4689,7 @@ public class TaskManagerTest {
final StandbyTask standbyTask = mock(StandbyTask.class);
when(standbyTask.id()).thenReturn(taskId00);
when(standbyTask.isActive()).thenReturn(false);
- when(standbyTask.prepareCommit()).thenReturn(Collections.emptyMap());
+
when(standbyTask.prepareCommit(true)).thenReturn(Collections.emptyMap());
final StreamTask activeTask = mock(StreamTask.class);
when(activeTask.id()).thenReturn(taskId00);
@@ -4939,10 +4939,13 @@ public class TaskManagerTest {
}
@Override
- public Map<TopicPartition, OffsetAndMetadata> prepareCommit() {
+ public Map<TopicPartition, OffsetAndMetadata> prepareCommit(final
boolean clean) {
commitPrepared = true;
if (commitNeeded) {
+ if (!clean) {
+ return null;
+ }
return committableOffsets;
} else {
return Collections.emptyMap();
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
index aac2dd36b49..d43670429b1 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/tasks/DefaultTaskExecutorTest.java
@@ -66,7 +66,7 @@ public class DefaultTaskExecutorTest {
when(task.isProcessable(anyLong())).thenReturn(true);
when(task.id()).thenReturn(new TaskId(0, 0, "A"));
when(task.process(anyLong())).thenReturn(true);
- when(task.prepareCommit()).thenReturn(Collections.emptyMap());
+ when(task.prepareCommit(true)).thenReturn(Collections.emptyMap());
}
@AfterEach
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index a4cee67ad5f..81c90d043ce 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -591,7 +591,7 @@ public class TopologyTestDriver implements Closeable {
// Process the record ...
task.process(mockWallClockTime.milliseconds());
task.maybePunctuateStreamTime();
- commit(task.prepareCommit());
+ commit(task.prepareCommit(true));
task.postCommit(true);
captureOutputsAndReEnqueueInternalResults();
}
@@ -709,7 +709,7 @@ public class TopologyTestDriver implements Closeable {
mockWallClockTime.sleep(advance.toMillis());
if (task != null) {
task.maybePunctuateSystemTime();
- commit(task.prepareCommit());
+ commit(task.prepareCommit(true));
task.postCommit(true);
}
completeAllProcessableWork();
@@ -1130,7 +1130,7 @@ public class TopologyTestDriver implements Closeable {
public void close() {
if (task != null) {
task.suspend();
- task.prepareCommit();
+ task.prepareCommit(true);
task.postCommit(true);
task.closeClean();
}