vvcephei commented on a change in pull request #8818:
URL: https://github.com/apache/kafka/pull/8818#discussion_r436758351



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java
##########
@@ -1084,12 +1088,15 @@ private boolean 
populateActiveTaskAndPartitionsLists(final List<TopicPartition>
                 // If the partition is new to this consumer but is still owned 
by another, remove from the assignment
                 // until it has been revoked and can safely be reassigned 
according to the COOPERATIVE protocol
                 if (newPartitionForConsumer && 
allOwnedPartitions.contains(partition)) {
-                    log.info("Removing task {} from assignment until it is 
safely revoked in followup rebalance", taskId);
-                    clientState.unassignActive(taskId);

Review comment:
       Ah, I see. I'll put it back with a comment that we're just using it for 
the internal assertions.

##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##########
@@ -514,17 +515,24 @@ void handleLostAll() {
 
     /**
      * Compute the offset total summed across all stores in a task. Includes 
offset sum for any tasks we own the
-     * lock for, which includes assigned and unassigned tasks we locked in 
{@link #tryToLockAllNonEmptyTaskDirectories()}
-     *
-     * @return Map from task id to its total offset summed across all state 
stores
+     * lock for, which includes assigned and unassigned tasks we locked in 
{@link #tryToLockAllNonEmptyTaskDirectories()}.
+     * Does not include stateless or non-logged tasks.
      */
     public Map<TaskId, Long> getTaskOffsetSums() {
         final Map<TaskId, Long> taskOffsetSums = new HashMap<>();
 
-        for (final TaskId id : lockedTaskDirectories) {
+        // Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
+        // so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
+        // just have an empty changelogOffsets map.
+        for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.keySet())) {
             final Task task = tasks.get(id);
             if (task != null) {
-                taskOffsetSums.put(id, sumOfChangelogOffsets(id, 
task.changelogOffsets()));
+                final Map<TopicPartition, Long> changelogOffsets = 
task.changelogOffsets();
+                if (changelogOffsets.isEmpty()) {
+                    log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}", id);

Review comment:
       Yep! We don't know that it's stateless, just that it didn't report any 
changelogs.

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -142,4 +169,184 @@ public void shouldProperlyConfigureTheAssignor() throws 
NoSuchFieldException, Il
             assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
         }
     }
+
+    @Test
+    public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws 
InterruptedException {
+        // NB: this test takes at least a minute to run, because it needs a 
probing rebalance, and the minimum
+        // value is one minute
+        shouldScaleOutWithWarmupTasks(storeName -> 
Materialized.as(Stores.inMemoryKeyValueStore(storeName)));
+    }
+
+    @Test
+    public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws 
InterruptedException {
+        // NB: this test takes at least a minute to run, because it needs a 
probing rebalance, and the minimum

Review comment:
       https://en.wikipedia.org/wiki/Nota_bene :) 

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java
##########
@@ -142,4 +169,184 @@ public void shouldProperlyConfigureTheAssignor() throws 
NoSuchFieldException, Il
             assertThat(taskAssignor, instanceOf(MyTaskAssignor.class));
         }
     }
+
+    @Test
+    public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws 
InterruptedException {
+        // NB: this test takes at least a minute to run, because it needs a 
probing rebalance, and the minimum
+        // value is one minute
+        shouldScaleOutWithWarmupTasks(storeName -> 
Materialized.as(Stores.inMemoryKeyValueStore(storeName)));
+    }
+
+    @Test
+    public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws 
InterruptedException {
+        // NB: this test takes at least a minute to run, because it needs a 
probing rebalance, and the minimum
+        // value is one minute
+        shouldScaleOutWithWarmupTasks(storeName -> 
Materialized.as(Stores.persistentKeyValueStore(storeName)));
+    }
+
+    private void shouldScaleOutWithWarmupTasks(final Function<String, 
Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> 
materializedFunction) throws InterruptedException {
+        final String testId = safeUniqueTestName(getClass(), testName);
+        final String appId = "appId_" + System.currentTimeMillis() + "_" + 
testId;
+        final String inputTopic = "input" + testId;
+        final String storeName = "store" + testId;
+        final String storeChangelog = appId + "-store" + testId + "-changelog";
+        final Set<TopicPartition> changelogTopicPartitions = mkSet(
+            new TopicPartition(storeChangelog, 0),
+            new TopicPartition(storeChangelog, 1)
+        );
+
+        IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, inputTopic, 
storeChangelog);
+
+        final ReentrantLock assignmentLock = new ReentrantLock();
+        final AtomicInteger assignmentsCompleted = new AtomicInteger(0);
+        final AtomicBoolean assignmentStable = new AtomicBoolean(false);
+        final AssignmentListener assignmentListener =
+            stable -> {
+                assignmentLock.lock();
+                try {
+                    assignmentsCompleted.incrementAndGet();
+                    assignmentStable.set(stable);
+                } finally {
+                    assignmentLock.unlock();
+                }
+            };
+
+        final StreamsBuilder builder = new StreamsBuilder();
+        builder.table(inputTopic, materializedFunction.apply(storeName));
+        final Topology topology = builder.build();
+
+        final Properties producerProperties = mkProperties(
+            mkMap(
+                mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+                mkEntry(ProducerConfig.ACKS_CONFIG, "all"),
+                mkEntry(ProducerConfig.RETRIES_CONFIG, "0"),
+                mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName()),
+                mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
StringSerializer.class.getName())
+            )
+        );
+
+        final StringBuilder kiloBuilder = new StringBuilder(1000);
+        for (int i = 0; i < 1000; i++) {
+            kiloBuilder.append('0');
+        }
+        final String kilo = kiloBuilder.toString();
+
+        try (final Producer<String, String> producer = new 
KafkaProducer<>(producerProperties)) {
+            for (int i = 0; i < 1000; i++) {
+                producer.send(new ProducerRecord<>(inputTopic, 
String.valueOf(i), kilo));
+            }
+        }
+
+        final Properties consumerProperties = mkProperties(
+            mkMap(
+                mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers()),
+                mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName()),
+                mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
StringDeserializer.class.getName())
+            )
+        );
+
+
+        try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, 
streamsProperties(appId, assignmentListener));
+             final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, 
streamsProperties(appId, assignmentListener));
+             final Consumer<String, String> consumer = new 
KafkaConsumer<>(consumerProperties)) {
+            kafkaStreams0.start();
+
+            // wait until all the input records are in the changelog
+            TestUtils.waitForCondition(
+                () -> getChangelogOffsetSum(changelogTopicPartitions, 
consumer) == 1000,
+                120_000L,
+                () -> "Input records haven't all been written to the 
changelog: " + getChangelogOffsetSum(changelogTopicPartitions, consumer)
+            );
+
+            final AtomicLong instance1TotalRestored = new AtomicLong(-1);
+            final CountDownLatch restoreCompleteLatch = new CountDownLatch(1);
+            kafkaStreams1.setGlobalStateRestoreListener(new 
StateRestoreListener() {
+                @Override
+                public void onRestoreStart(final TopicPartition topicPartition,
+                                           final String storeName,
+                                           final long startingOffset,
+                                           final long endingOffset) {
+                }
+
+                @Override
+                public void onBatchRestored(final TopicPartition 
topicPartition,
+                                            final String storeName,
+                                            final long batchEndOffset,
+                                            final long numRestored) {
+                }

Review comment:
       Good idea.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to