vvcephei commented on a change in pull request #8818: URL: https://github.com/apache/kafka/pull/8818#discussion_r436758351
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamsPartitionAssignor.java ########## @@ -1084,12 +1088,15 @@ private boolean populateActiveTaskAndPartitionsLists(final List<TopicPartition> // If the partition is new to this consumer but is still owned by another, remove from the assignment // until it has been revoked and can safely be reassigned according to the COOPERATIVE protocol if (newPartitionForConsumer && allOwnedPartitions.contains(partition)) { - log.info("Removing task {} from assignment until it is safely revoked in followup rebalance", taskId); - clientState.unassignActive(taskId); Review comment: Ah, I see. I'll put it back with a comment that we're just using it for the internal assertions. ########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java ########## @@ -514,17 +515,24 @@ void handleLostAll() { /** * Compute the offset total summed across all stores in a task. Includes offset sum for any tasks we own the - * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()} - * - * @return Map from task id to its total offset summed across all state stores + * lock for, which includes assigned and unassigned tasks we locked in {@link #tryToLockAllNonEmptyTaskDirectories()}. + * Does not include stateless or non-logged tasks. */ public Map<TaskId, Long> getTaskOffsetSums() { final Map<TaskId, Long> taskOffsetSums = new HashMap<>(); - for (final TaskId id : lockedTaskDirectories) { + // Not all tasks will create directories, and there may be directories for tasks we don't currently own, + // so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should + // just have an empty changelogOffsets map. + for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.keySet())) { final Task task = tasks.get(id); if (task != null) { - taskOffsetSums.put(id, sumOfChangelogOffsets(id, task.changelogOffsets())); + final Map<TopicPartition, Long> changelogOffsets = task.changelogOffsets(); + if (changelogOffsets.isEmpty()) { + log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id); Review comment: Yep! We don't know that it's stateless, just that it didn't report any changelogs. ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java ########## @@ -142,4 +169,184 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il assertThat(taskAssignor, instanceOf(MyTaskAssignor.class)); } } + + @Test + public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws InterruptedException { + // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum + // value is one minute + shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName))); + } + + @Test + public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws InterruptedException { + // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum Review comment: https://en.wikipedia.org/wiki/Nota_bene :) ########## File path: streams/src/test/java/org/apache/kafka/streams/integration/TaskAssignorIntegrationTest.java ########## @@ -142,4 +169,184 @@ public void shouldProperlyConfigureTheAssignor() throws NoSuchFieldException, Il assertThat(taskAssignor, instanceOf(MyTaskAssignor.class)); } } + + @Test + public void shouldScaleOutWithWarmupTasksAndInMemoryStores() throws InterruptedException { + // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum + // value is one minute + shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.inMemoryKeyValueStore(storeName))); + } + + @Test + public void shouldScaleOutWithWarmupTasksAndPersistentStores() throws InterruptedException { + // NB: this test takes at least a minute to run, because it needs a probing rebalance, and the minimum + // value is one minute + shouldScaleOutWithWarmupTasks(storeName -> Materialized.as(Stores.persistentKeyValueStore(storeName))); + } + + private void shouldScaleOutWithWarmupTasks(final Function<String, Materialized<Object, Object, KeyValueStore<Bytes, byte[]>>> materializedFunction) throws InterruptedException { + final String testId = safeUniqueTestName(getClass(), testName); + final String appId = "appId_" + System.currentTimeMillis() + "_" + testId; + final String inputTopic = "input" + testId; + final String storeName = "store" + testId; + final String storeChangelog = appId + "-store" + testId + "-changelog"; + final Set<TopicPartition> changelogTopicPartitions = mkSet( + new TopicPartition(storeChangelog, 0), + new TopicPartition(storeChangelog, 1) + ); + + IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, 2, inputTopic, storeChangelog); + + final ReentrantLock assignmentLock = new ReentrantLock(); + final AtomicInteger assignmentsCompleted = new AtomicInteger(0); + final AtomicBoolean assignmentStable = new AtomicBoolean(false); + final AssignmentListener assignmentListener = + stable -> { + assignmentLock.lock(); + try { + assignmentsCompleted.incrementAndGet(); + assignmentStable.set(stable); + } finally { + assignmentLock.unlock(); + } + }; + + final StreamsBuilder builder = new StreamsBuilder(); + builder.table(inputTopic, materializedFunction.apply(storeName)); + final Topology topology = builder.build(); + + final Properties producerProperties = mkProperties( + mkMap( + mkEntry(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(ProducerConfig.ACKS_CONFIG, "all"), + mkEntry(ProducerConfig.RETRIES_CONFIG, "0"), + mkEntry(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()), + mkEntry(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()) + ) + ); + + final StringBuilder kiloBuilder = new StringBuilder(1000); + for (int i = 0; i < 1000; i++) { + kiloBuilder.append('0'); + } + final String kilo = kiloBuilder.toString(); + + try (final Producer<String, String> producer = new KafkaProducer<>(producerProperties)) { + for (int i = 0; i < 1000; i++) { + producer.send(new ProducerRecord<>(inputTopic, String.valueOf(i), kilo)); + } + } + + final Properties consumerProperties = mkProperties( + mkMap( + mkEntry(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()), + mkEntry(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()), + mkEntry(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()) + ) + ); + + + try (final KafkaStreams kafkaStreams0 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener)); + final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, streamsProperties(appId, assignmentListener)); + final Consumer<String, String> consumer = new KafkaConsumer<>(consumerProperties)) { + kafkaStreams0.start(); + + // wait until all the input records are in the changelog + TestUtils.waitForCondition( + () -> getChangelogOffsetSum(changelogTopicPartitions, consumer) == 1000, + 120_000L, + () -> "Input records haven't all been written to the changelog: " + getChangelogOffsetSum(changelogTopicPartitions, consumer) + ); + + final AtomicLong instance1TotalRestored = new AtomicLong(-1); + final CountDownLatch restoreCompleteLatch = new CountDownLatch(1); + kafkaStreams1.setGlobalStateRestoreListener(new StateRestoreListener() { + @Override + public void onRestoreStart(final TopicPartition topicPartition, + final String storeName, + final long startingOffset, + final long endingOffset) { + } + + @Override + public void onBatchRestored(final TopicPartition topicPartition, + final String storeName, + final long batchEndOffset, + final long numRestored) { + } Review comment: Good idea. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org