jnh5y commented on code in PR #12161:
URL: https://github.com/apache/kafka/pull/12161#discussion_r887273657


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -1761,6 +1761,96 @@ public void shouldUpdateStandbyTask() throws Exception {
         thread.taskManager().shutdown(true);
     }
 
+    @SuppressWarnings("unchecked")
+    @Test
+    public void shouldNotUpdateStandbyTaskWhenPaused() throws Exception {
+        final String storeName1 = "count-one";
+        final String storeName2 = "table-two";
+        final String changelogName1 = APPLICATION_ID + "-" + storeName1 + 
"-changelog";
+        final String changelogName2 = APPLICATION_ID + "-" + storeName2 + 
"-changelog";
+        final TopicPartition partition1 = new TopicPartition(changelogName1, 
1);
+        final TopicPartition partition2 = new TopicPartition(changelogName2, 
1);
+        internalStreamsBuilder
+            .stream(Collections.singleton(topic1), consumed)
+            .groupByKey()
+            .count(Materialized.as(storeName1));
+        final MaterializedInternal<Object, Object, KeyValueStore<Bytes, 
byte[]>> materialized
+            = new MaterializedInternal<>(Materialized.as(storeName2), 
internalStreamsBuilder, "");
+        internalStreamsBuilder.table(topic2, new ConsumedInternal<>(), 
materialized);
+
+        internalStreamsBuilder.buildAndOptimizeTopology();
+        final StreamThread thread = createStreamThread(CLIENT_ID, config, 
false);
+        final MockConsumer<byte[], byte[]> restoreConsumer = 
clientSupplier.restoreConsumer;
+        restoreConsumer.updatePartitions(changelogName1,
+            Collections.singletonList(new PartitionInfo(changelogName1, 1, 
null, new Node[0], new Node[0]))
+        );
+
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition1, 
10L));
+        
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition1, 
0L));
+        restoreConsumer.updateEndOffsets(Collections.singletonMap(partition2, 
10L));
+        
restoreConsumer.updateBeginningOffsets(Collections.singletonMap(partition2, 
0L));
+        final OffsetCheckpoint checkpoint
+            = new OffsetCheckpoint(new 
File(stateDirectory.getOrCreateDirectoryForTask(task3), CHECKPOINT_FILE_NAME));
+        checkpoint.write(Collections.singletonMap(partition2, 5L));
+
+        thread.setState(StreamThread.State.STARTING);
+        thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
+
+        final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>();
+
+        // assign single partition
+        standbyTasks.put(task1, Collections.singleton(t1p1));
+        standbyTasks.put(task3, Collections.singleton(t2p1));
+
+        thread.taskManager().handleAssignment(emptyMap(), standbyTasks);
+        thread.taskManager().tryToCompleteRestoration(mockTime.milliseconds(), 
null);
+
+        
thread.rebalanceListener().onPartitionsAssigned(Collections.emptyList());
+
+        thread.runOnce();
+
+        final StandbyTask standbyTask1 = standbyTask(thread.taskManager(), 
t1p1);
+        final StandbyTask standbyTask2 = standbyTask(thread.taskManager(), 
t2p1);
+        assertEquals(task1, standbyTask1.id());
+        assertEquals(task3, standbyTask2.id());
+
+        final KeyValueStore<Object, Long> store1 = (KeyValueStore<Object, 
Long>) standbyTask1.getStore(storeName1);
+        final KeyValueStore<Object, Long> store2 = (KeyValueStore<Object, 
Long>) standbyTask2.getStore(storeName2);
+        assertEquals(0L, store1.approximateNumEntries());
+        assertEquals(0L, store2.approximateNumEntries());
+
+        // let the store1 be restored from 0 to 10; store2 be restored from 5 
(checkpointed) to 10
+        for (long i = 0L; i < 10L; i++) {
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                changelogName1,
+                1,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+            restoreConsumer.addRecord(new ConsumerRecord<>(
+                changelogName2,
+                1,
+                i,
+                ("K" + i).getBytes(),
+                ("V" + i).getBytes()));
+        }
+
+        // Simulate pause
+        
thread.taskManager().topologyMetadata().pauseTopology(TopologyMetadata.UNNAMED_TOPOLOGY);
+        thread.runOnce();
+        assertEquals(0L, store1.approximateNumEntries());
+        assertEquals(0L, store2.approximateNumEntries());
+
+        // Simulate resume
+        
thread.taskManager().topologyMetadata().resumeTopology(TopologyMetadata.UNNAMED_TOPOLOGY);
+        thread.runOnce();
+
+        assertEquals(10L, store1.approximateNumEntries());
+        assertEquals(4L, store2.approximateNumEntries());

Review Comment:
   For the refactoring, I've taken a pass at it here: 
https://github.com/apache/kafka/pull/12161/commits/a3bd8aec8990e2e72aec1401340c6f457db4f878.
   
   I tend to prefer to add new things consistently with existing code, so I'm 
hesitating to change the check.  I'm fine either way.
   
   I'll add something for active tasks if that's the way we go.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to