cadonna commented on code in PR #13269:
URL: https://github.com/apache/kafka/pull/13269#discussion_r1155775071


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 
1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
+                singleton(repartition.topic())
         );
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
 // restoration checkpoint
+        
EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
         
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
         final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
-            taskId,
-            config,
-            stateManager,
-            streamsMetrics,
-            null
+                taskId,
+                config,
+                stateManager,
+                streamsMetrics,
+                null

Review Comment:
   Indentation of 4 is fine. Could you revert this change, please?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1546,8 +1551,9 @@ public void 
shouldNotCheckpointOffsetsAgainOnCommitIfSnapshotNotChangedMuch() {
         stateManager.checkpoint();
         EasyMock.expectLastCall().once();
         EasyMock.expect(stateManager.changelogOffsets())
-            .andReturn(singletonMap(changelogPartition, 10L))
-            .andReturn(singletonMap(changelogPartition, 20L));
+                .andReturn(singletonMap(changelogPartition, 10L)) // 
restoration checkpoint
+                .andReturn(singletonMap(changelogPartition, 10L))
+                .andReturn(singletonMap(changelogPartition, 20L));

Review Comment:
   ```suggestion
               .andReturn(singletonMap(changelogPartition, 10L)) // restoration 
checkpoint
               .andReturn(singletonMap(changelogPartition, 10L))
               .andReturn(singletonMap(changelogPartition, 20L));
   ```



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 
1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
+                singleton(repartition.topic())

Review Comment:
   Could you revert this change, please? An indentation of 4 is fine.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {

Review Comment:
   We usually use the form `should...` for test names. My proposal here is to 
use `shouldCheckpointAfterRestorationWhenAtLeastOnceEnabled`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, 0L)); // restoration 
checkpoint
+        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once(); // checkpoint should only be called 
once
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        EasyMock.verify(stateManager);
+    }
+
+    @Test
+    public void testRestoration_CheckpointNotWrittenWhenEOSEnabled() {
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().andStubThrow(new AssertionError("should not 
checkpoint on EOS"));
+        stateManager.changelogOffsets();
+        EasyMock.expectLastCall().andStubThrow(new AssertionError("should not 
checkpoint on EOS"));
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(EXACTLY_ONCE_V2, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });

Review Comment:
   Could you please use Mockito instead of EasyMock here? We are migrating to 
Mockito and we would like to have new tests using Mockito. See 
`TaskManagerTest` for an example where a test class uses Mockito and EasyMock. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 
1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
+                singleton(repartition.topic())
         );
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
 // restoration checkpoint
+        
EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
         
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
         final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
-            taskId,
-            config,
-            stateManager,
-            streamsMetrics,
-            null
+                taskId,
+                config,
+                stateManager,
+                streamsMetrics,
+                null
         );
 
         task = new StreamTask(
-            taskId,
-            mkSet(partition1, repartition),
-            topology,
-            consumer,
-            new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-            streamsMetrics,
-            stateDirectory,
-            cache,
-            time,
-            stateManager,
-            recordCollector,
-            context,
-            logContext);
+                taskId,
+                mkSet(partition1, repartition),
+                topology,
+                consumer,
+                new TopologyConfig(null, config, new 
Properties()).getTaskConfig(),
+                streamsMetrics,
+                stateDirectory,
+                cache,
+                time,
+                stateManager,
+                recordCollector,
+                context,
+                logContext);
 
         task.initializeIfNeeded();
-        task.completeRestoration(noOpResetter -> { });
+        task.completeRestoration(noOpResetter -> {
+        });

Review Comment:
   This change is not needed for the purpose of this PR.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, 0L)); // restoration 
checkpoint
+        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once(); // checkpoint should only be called 
once
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        EasyMock.verify(stateManager);

Review Comment:
   Could you please use Mockito instead of EasyMock here? We are migrating to 
Mockito and we would like to have new tests using Mockito. See 
`TaskManagerTest` for an example where a test class uses Mockito and EasyMock. 



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -2484,6 +2505,34 @@ public void 
shouldUpdateOffsetIfCorruptedRecordFollowsValid() {
         );
     }
 
+    @Test
+    public void testRestoration_CheckpointWrittenWhenEOSDisabled() {
+        EasyMock.expect(stateManager.changelogOffsets())
+                .andReturn(singletonMap(partition1, 0L)); // restoration 
checkpoint
+        
EasyMock.expect(recordCollector.offsets()).andReturn(Collections.emptyMap());
+        stateManager.checkpoint();
+        EasyMock.expectLastCall().once(); // checkpoint should only be called 
once
+        EasyMock.replay(stateManager, recordCollector);
+
+        task = createStatefulTask(createConfig(AT_LEAST_ONCE, "100"), true);
+        task.initializeIfNeeded();
+        task.completeRestoration(noOpResetter -> { });
+        EasyMock.verify(stateManager);
+    }
+
+    @Test
+    public void testRestoration_CheckpointNotWrittenWhenEOSEnabled() {

Review Comment:
   See aboveWe usually use the form `should...` for test names. My proposal 
here is to use `shouldNotCheckpointAfterRestorationWhenExactlyOnceEnabled`.



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java:
##########
@@ -1692,43 +1699,45 @@ public void 
shouldReturnOffsetsForRepartitionTopicsForPurging() {
         final TopicPartition repartition = new TopicPartition("repartition", 
1);
 
         final ProcessorTopology topology = withRepartitionTopics(
-            asList(source1, source2),
-            mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
-            singleton(repartition.topic())
+                asList(source1, source2),
+                mkMap(mkEntry(topic1, source1), mkEntry(repartition.topic(), 
source2)),
+                singleton(repartition.topic())
         );
         consumer.assign(asList(partition1, repartition));
         consumer.updateBeginningOffsets(mkMap(mkEntry(repartition, 0L)));
 
-        
EasyMock.expect(stateManager.changelogPartitions()).andReturn(Collections.emptySet());
+        
EasyMock.expect(stateManager.changelogOffsets()).andStubReturn(Collections.emptyMap());
 // restoration checkpoint
+        
EasyMock.expect(stateManager.changelogPartitions()).andStubReturn(Collections.emptySet());
         
EasyMock.expect(recordCollector.offsets()).andReturn(emptyMap()).anyTimes();
         EasyMock.replay(stateManager, recordCollector);
 
         final StreamsConfig config = createConfig();
         final InternalProcessorContext context = new ProcessorContextImpl(
-            taskId,
-            config,
-            stateManager,
-            streamsMetrics,
-            null
+                taskId,
+                config,
+                stateManager,
+                streamsMetrics,
+                null
         );
 
         task = new StreamTask(
-            taskId,
-            mkSet(partition1, repartition),
-            topology,
-            consumer,
-            new TopologyConfig(null,  config, new 
Properties()).getTaskConfig(),
-            streamsMetrics,
-            stateDirectory,
-            cache,
-            time,
-            stateManager,
-            recordCollector,
-            context,
-            logContext);
+                taskId,
+                mkSet(partition1, repartition),
+                topology,
+                consumer,
+                new TopologyConfig(null, config, new 
Properties()).getTaskConfig(),
+                streamsMetrics,
+                stateDirectory,
+                cache,
+                time,
+                stateManager,
+                recordCollector,
+                context,
+                logContext);

Review Comment:
   Thank you for removing the superfluous space! However, the indentation of 4 
is fine. Could you revert the indentation, please?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -255,6 +255,9 @@ public void completeRestoration(final 
java.util.function.Consumer<Set<TopicParti
                 resetOffsetsIfNeededAndInitializeMetadata(offsetResetter);
                 initializeTopology();
                 processorContext.initialize();
+                if (!eosEnabled) {
+                    maybeCheckpoint(true); // enforce checkpoint upon 
completing restoration

Review Comment:
   Could you please remove the inline comment? I do not think it adds any 
information that is not already contained in the call to 
`maybeCheckpoint(true)`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to