This is an automated email from the ASF dual-hosted git repository. bbejeck pushed a commit to branch 2.2 in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.2 by this push: new ed659fe KAFKA-8187: Add wait time for other thread in the same jvm to free the locks (#6818) ed659fe is described below commit ed659fe73dd652eeb57bba4618ce0054fcadf390 Author: Lifei Chen <lifei.c...@allseeingsecurity.com> AuthorDate: Thu May 30 23:33:37 2019 +0800 KAFKA-8187: Add wait time for other thread in the same jvm to free the locks (#6818) Fix KAFKA-8187: State store record loss across multiple reassignments when using standby tasks. Do not let the thread to transit to RUNNING until all tasks (including standby tasks) are ready. Reviewers: Guozhang Wang <wangg...@gmail.com>, Bill Bejeck <bbej...@gmail.com> --- .../streams/processor/internals/TaskManager.java | 2 +- .../processor/internals/TaskManagerTest.java | 108 ++++++++++++--------- 2 files changed, 64 insertions(+), 46 deletions(-) diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java index 455d226..86a8e49 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java @@ -334,7 +334,7 @@ public class TaskManager { log.trace("Resuming partitions {}", assignment); consumer.resume(assignment); assignStandbyPartitions(); - return true; + return standby.allTasksRunning(); } return false; } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index f71d7a1..fcf275b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -263,7 +263,7 @@ public class TaskManagerTest { @Test public void shouldAddNonResumedActiveTasks() { mockSingleActiveTask(); - EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false); + expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false); active.addNewTask(EasyMock.same(streamTask)); replay(); @@ -276,7 +276,7 @@ public class TaskManagerTest { @Test public void shouldNotAddResumedActiveTasks() { checkOrder(active, true); - EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true); + expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true); replay(); taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); @@ -289,7 +289,7 @@ public class TaskManagerTest { @Test public void shouldAddNonResumedStandbyTasks() { mockStandbyTaskExpectations(); - EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false); + expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(false); standby.addNewTask(EasyMock.same(standbyTask)); replay(); @@ -302,7 +302,7 @@ public class TaskManagerTest { @Test public void shouldNotAddResumedStandbyTasks() { checkOrder(active, true); - EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true); + expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true); replay(); taskManager.setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), taskId0Assignment); @@ -316,7 +316,7 @@ public class TaskManagerTest { public void shouldPauseActivePartitions() { mockSingleActiveTask(); consumer.pause(taskId0Partitions); - EasyMock.expectLastCall(); + expectLastCall(); replay(); taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); @@ -326,7 +326,7 @@ public class TaskManagerTest { @Test public void shouldSuspendActiveTasks() { - EasyMock.expect(active.suspend()).andReturn(null); + expect(active.suspend()).andReturn(null); replay(); taskManager.suspendTasksAndState(); @@ -335,7 +335,7 @@ public class TaskManagerTest { @Test public void shouldSuspendStandbyTasks() { - EasyMock.expect(standby.suspend()).andReturn(null); + expect(standby.suspend()).andReturn(null); replay(); taskManager.suspendTasksAndState(); @@ -345,7 +345,7 @@ public class TaskManagerTest { @Test public void shouldUnassignChangelogPartitionsOnSuspend() { restoreConsumer.unsubscribe(); - EasyMock.expectLastCall(); + expectLastCall(); replay(); taskManager.suspendTasksAndState(); @@ -354,9 +354,9 @@ public class TaskManagerTest { @Test public void shouldThrowStreamsExceptionAtEndIfExceptionDuringSuspend() { - EasyMock.expect(active.suspend()).andReturn(new RuntimeException("")); - EasyMock.expect(standby.suspend()).andReturn(new RuntimeException("")); - EasyMock.expectLastCall(); + expect(active.suspend()).andReturn(new RuntimeException("")); + expect(standby.suspend()).andReturn(new RuntimeException("")); + expectLastCall(); restoreConsumer.unsubscribe(); replay(); @@ -372,7 +372,7 @@ public class TaskManagerTest { @Test public void shouldCloseActiveTasksOnShutdown() { active.close(true); - EasyMock.expectLastCall(); + expectLastCall(); replay(); taskManager.shutdown(true); @@ -382,7 +382,7 @@ public class TaskManagerTest { @Test public void shouldCloseStandbyTasksOnShutdown() { standby.close(false); - EasyMock.expectLastCall(); + expectLastCall(); replay(); taskManager.shutdown(false); @@ -392,7 +392,7 @@ public class TaskManagerTest { @Test public void shouldUnassignChangelogPartitionsOnShutdown() { restoreConsumer.unsubscribe(); - EasyMock.expectLastCall(); + expectLastCall(); replay(); taskManager.shutdown(true); @@ -402,7 +402,7 @@ public class TaskManagerTest { @Test public void shouldInitializeNewActiveTasks() { active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject()); - EasyMock.expectLastCall(); + expectLastCall(); replay(); taskManager.updateNewAndRestoringTasks(); @@ -412,7 +412,7 @@ public class TaskManagerTest { @Test public void shouldInitializeNewStandbyTasks() { active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject()); - EasyMock.expectLastCall(); + expectLastCall(); replay(); taskManager.updateNewAndRestoringTasks(); @@ -421,9 +421,9 @@ public class TaskManagerTest { @Test public void shouldRestoreStateFromChangeLogReader() { - EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); + expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); active.updateRestored(taskId0Partitions); - EasyMock.expectLastCall(); + expectLastCall(); replay(); taskManager.updateNewAndRestoringTasks(); @@ -432,13 +432,13 @@ public class TaskManagerTest { @Test public void shouldResumeRestoredPartitions() { - EasyMock.expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); - EasyMock.expect(active.allTasksRunning()).andReturn(true); - EasyMock.expect(consumer.assignment()).andReturn(taskId0Partitions); - EasyMock.expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet()); + expect(changeLogReader.restore(active)).andReturn(taskId0Partitions); + expect(active.allTasksRunning()).andReturn(true); + expect(consumer.assignment()).andReturn(taskId0Partitions); + expect(standby.running()).andReturn(Collections.<StandbyTask>emptySet()); consumer.resume(taskId0Partitions); - EasyMock.expectLastCall(); + expectLastCall(); replay(); taskManager.updateNewAndRestoringTasks(); @@ -450,13 +450,31 @@ public class TaskManagerTest { mockAssignStandbyPartitions(1L); replay(); - assertTrue(taskManager.updateNewAndRestoringTasks()); + taskManager.updateNewAndRestoringTasks(); verify(restoreConsumer); } @Test + public void shouldReturnTrueWhenActiveAndStandbyTasksAreRunning() { + mockAssignStandbyPartitions(1L); + expect(standby.allTasksRunning()).andReturn(true); + replay(); + + assertTrue(taskManager.updateNewAndRestoringTasks()); + } + + @Test + public void shouldReturnFalseWhenOnlyActiveTasksAreRunning() { + mockAssignStandbyPartitions(1L); + expect(standby.allTasksRunning()).andReturn(false); + replay(); + + assertFalse(taskManager.updateNewAndRestoringTasks()); + } + + @Test public void shouldReturnFalseWhenThereAreStillNonRunningTasks() { - EasyMock.expect(active.allTasksRunning()).andReturn(false); + expect(active.allTasksRunning()).andReturn(false); replay(); assertFalse(taskManager.updateNewAndRestoringTasks()); @@ -466,7 +484,7 @@ public class TaskManagerTest { public void shouldSeekToCheckpointedOffsetOnStandbyPartitionsWhenOffsetGreaterThanEqualTo0() { mockAssignStandbyPartitions(1L); restoreConsumer.seek(t1p0, 1L); - EasyMock.expectLastCall(); + expectLastCall(); replay(); taskManager.updateNewAndRestoringTasks(); @@ -477,7 +495,7 @@ public class TaskManagerTest { public void shouldSeekToBeginningIfOffsetIsLessThan0() { mockAssignStandbyPartitions(-1L); restoreConsumer.seekToBeginning(taskId0Partitions); - EasyMock.expectLastCall(); + expectLastCall(); replay(); taskManager.updateNewAndRestoringTasks(); @@ -486,8 +504,8 @@ public class TaskManagerTest { @Test public void shouldCommitActiveAndStandbyTasks() { - EasyMock.expect(active.commit()).andReturn(1); - EasyMock.expect(standby.commit()).andReturn(2); + expect(active.commit()).andReturn(1); + expect(standby.commit()).andReturn(2); replay(); @@ -500,7 +518,7 @@ public class TaskManagerTest { // upgrade to strict mock to ensure no calls checkOrder(standby, true); active.commit(); - EasyMock.expectLastCall().andThrow(new RuntimeException("")); + expectLastCall().andThrow(new RuntimeException("")); replay(); try { @@ -514,7 +532,7 @@ public class TaskManagerTest { @Test public void shouldPropagateExceptionFromStandbyCommit() { - EasyMock.expect(standby.commit()).andThrow(new RuntimeException("")); + expect(standby.commit()).andThrow(new RuntimeException("")); replay(); try { @@ -534,8 +552,8 @@ public class TaskManagerTest { futureDeletedRecords.complete(null); - EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2); - EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2); + expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2); + expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2); replay(); taskManager.maybePurgeCommitedRecords(); @@ -549,8 +567,8 @@ public class TaskManagerTest { final Map<TopicPartition, RecordsToDelete> recordsToDelete = Collections.singletonMap(t1p1, RecordsToDelete.beforeOffset(5L)); final DeleteRecordsResult deleteRecordsResult = new DeleteRecordsResult(Collections.singletonMap(t1p1, futureDeletedRecords)); - EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).once(); - EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once(); + expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).once(); + expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).once(); replay(); taskManager.maybePurgeCommitedRecords(); @@ -567,8 +585,8 @@ public class TaskManagerTest { futureDeletedRecords.completeExceptionally(new Exception("KABOOM!")); - EasyMock.expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2); - EasyMock.expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2); + expect(active.recordsToDelete()).andReturn(Collections.singletonMap(t1p1, 5L)).times(2); + expect(adminClient.deleteRecords(recordsToDelete)).andReturn(deleteRecordsResult).times(2); replay(); taskManager.maybePurgeCommitedRecords(); @@ -578,7 +596,7 @@ public class TaskManagerTest { @Test public void shouldMaybeCommitActiveTasks() { - EasyMock.expect(active.maybeCommitPerUserRequested()).andReturn(5); + expect(active.maybeCommitPerUserRequested()).andReturn(5); replay(); assertThat(taskManager.maybeCommitActiveTasksPerUserRequested(), equalTo(5)); @@ -587,7 +605,7 @@ public class TaskManagerTest { @Test public void shouldProcessActiveTasks() { - EasyMock.expect(active.process(0L)).andReturn(10); + expect(active.process(0L)).andReturn(10); replay(); assertThat(taskManager.process(0L), equalTo(10)); @@ -596,7 +614,7 @@ public class TaskManagerTest { @Test public void shouldPunctuateActiveTasks() { - EasyMock.expect(active.punctuate()).andReturn(20); + expect(active.punctuate()).andReturn(20); replay(); assertThat(taskManager.punctuate(), equalTo(20)); @@ -605,7 +623,7 @@ public class TaskManagerTest { @Test public void shouldNotResumeConsumptionUntilAllStoresRestored() { - EasyMock.expect(active.allTasksRunning()).andReturn(false); + expect(active.allTasksRunning()).andReturn(false); final Consumer<byte[], byte[]> consumer = EasyMock.createStrictMock(Consumer.class); taskManager.setConsumer(consumer); EasyMock.replay(active, consumer); @@ -637,12 +655,12 @@ public class TaskManagerTest { private void mockAssignStandbyPartitions(final long offset) { final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class); - EasyMock.expect(active.allTasksRunning()).andReturn(true); - EasyMock.expect(standby.running()).andReturn(Collections.singletonList(task)); - EasyMock.expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset)); + expect(active.allTasksRunning()).andReturn(true); + expect(standby.running()).andReturn(Collections.singletonList(task)); + expect(task.checkpointedOffsets()).andReturn(Collections.singletonMap(t1p0, offset)); restoreConsumer.assign(taskId0Partitions); - EasyMock.expectLastCall(); + expectLastCall(); EasyMock.replay(task); }