[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1260704672 ## streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java: ## @@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws Exception { }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag."); // Kill instance, delete state to force restoration. -assertThat("Streams instance did not close within timeout", streams.close(Duration.ofSeconds(60))); +assertThat("Streams instance did not close within timeout", streams.get().close(Duration.ofSeconds(60))); IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder()) .map(Path::toFile) .forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted")); +} finally { +streams.get().close(); Review Comment: Look at line 284. There `streams` is used in a lambda expression. You can only use variables that are final or effective final in a lambda expression. There are two possibilities to make `streams` final: either we use a final `AtomicReference` or we assign `streams` to an effective final variable and use that in the lambda. I chose the `AtomicReference` option. This are the two options because we need to initialize the variable outside of the `try`-clause to have an initialized variable in the `finally`-clause. However, I now realized that we can create the Streams client outside the `try`-clause because if the constructor fails we do not need to call close and cleanup in the `finally`-clause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1260704672 ## streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java: ## @@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws Exception { }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag."); // Kill instance, delete state to force restoration. -assertThat("Streams instance did not close within timeout", streams.close(Duration.ofSeconds(60))); +assertThat("Streams instance did not close within timeout", streams.get().close(Duration.ofSeconds(60))); IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration); Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder()) .map(Path::toFile) .forEach(f -> assertTrue(f.delete(), "Some state " + f + " could not be deleted")); +} finally { +streams.get().close(); Review Comment: Look at line 284. There `streams` is used in a lambda expression. You can only use variables that are final or effective final in a lambda expression. There are two possibilities to make `streams` final: either we use a final `AtomicReference` or we assign `streams` to an effective final variable and use that in the lambda. I chose the `AtomicReference` option. This are the two option because if we do not initialize the variable outside of the `try`-clause because we need an initialized variable in the `finally`-clause. However, I now realized that we can create the Streams client outside the `try`-clause because if the constructor fails we do not need to call close and cleanup in the `finally`-clause. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1258012878 ## streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java: ## @@ -173,9 +173,9 @@ public static void closeCluster() { private static final List> STANDARD_INPUT_DATA = asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); private static final List> COUNT_OUTPUT_DATA = -asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching Review Comment: @wcarlson5 Could you elaborate what is worrying you here? I ran the tests with disabled state updater and the disabled cache and all tests pass. That tells me that the state updater does not change any results. Additionally, the results with enabled and disabled cache are semantically equivalent. ``` pair("B", 1L), pair("A", 2L), pair("C", 2L) ``` is equivalent to ``` pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L) ``` The latter just produces more intermediate results. In general, if a test verifies the correctness of results, disabling the cache makes the test more robust, because the results do not depend on commit time intervals or production rates. Let me know if I missed something here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1257992420 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java: ## @@ -207,11 +207,12 @@ public void shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMulti }); } -startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120)); +for (final KafkaStreams stream: kafkaStreamsList) { +stream.start(); +} Review Comment: I am not sure I understand what you are asking, so bear with me if my reply is not the one you expected. Adding ``` waitForApplicationState(Arrays.asList(streams, streamsTwo, streamsThree), KafkaStreams.State.RUNNING, ofSeconds(60)) ``` after starting all Streams clients makes the test flaky because with the state updater NOT all clients need to be in `RUNNING` before a Streams client transits to `ERROR`. Not even the Streams client that throws the exception needs to be in `RUNNING` before transitioning to `ERROR`. Actually, now that I looked more at this code, I am wondering why this test was not flaky before my changes. Each Streams client is started independently. That means, there is no guarantee, that all three Streams clients are `RUNNING` at the same time. I did some testing with `Thread.sleep()` after each client were started to simulate delays in processing in the version of the test before my changes and the test was flaky indeed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1257910205 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -571,7 +571,9 @@ private Map> pendingTasksToCreate(final Map> entry = iter.next(); final TaskId taskId = entry.getKey(); -if (taskId.topologyName() != null && !topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) { +final boolean taskIsOwned = tasks.allTaskIds().contains(taskId) Review Comment: The idea was to make the code more readable since the condition is quite complex but it actually just verifies that a task is owned. I prefer to use an additional variable or method rather than use complex logical conditions that give the reader a hard time. So, I would rather keep the variable. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1254284586 ## streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java: ## @@ -87,7 +88,7 @@ public static void closeCluster() { } -private final Time time = CLUSTER.time; +private final Time time = new MockTime(1); Review Comment: I tried to expose the `autoTickMs` parameter in embedded kafka, but I ran into a infinite loop. Regarding your question: > Are we not worried that broker time and our time might diverge? In almost all our integration tests broker time diverges from streams time since the broker time does not advance but we usually use system time in `KafkaStreams`. This is not something this PR introduces. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1254072786 ## streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java: ## @@ -277,6 +277,7 @@ public void shouldFetchLagsDuringRestoration() throws Exception { t1.toStream().to(outputTopicName); final KafkaStreams streams = new KafkaStreams(builder.build(), props); Review Comment: Actually the Streams clients are closed in the `finally`-clause. The thing that was missing was that the Streams clients were not created in the `try`-clause. I needed to use `AtomicReference` for that, otherwise I could not use the Streams clients in lambdas. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246717245 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -368,8 +368,11 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); -final boolean stateUpdaterEnabled = -InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false); +final boolean stateUpdaterEnabled = InternalConfig.getBoolean( +config.originals(), +InternalConfig.STATE_UPDATER_ENABLED, +InternalConfig.STATE_UPDATER_ENABLED_DEFAULT +); Review Comment: Good idea! Done! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246699178 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ## @@ -1123,7 +1138,10 @@ public void shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws InterruptedException { internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); -final StreamThread thread = createStreamThread(CLIENT_ID, new StreamsConfig(configProps(true)), true); +final Properties props = configProps(true); +props.put(InternalConfig.STATE_UPDATER_ENABLED, false); Review Comment: Done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246696265 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java: ## @@ -3038,6 +3149,7 @@ public void shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() { @Test public void shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() { final Properties streamsConfigProps = StreamsTestUtils.getStreamsConfig(); +streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, false); Review Comment: I thought the code is better readable if I leave it in the corresponding with-state-updater test. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246652052 ## streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java: ## @@ -843,15 +846,19 @@ private static List waitUntilFinalKeyValueRecordsReceived(final Pro // still need to check that for each key, the ordering is expected final Map> finalAccumData = new HashMap<>(); for (final T kv : accumulatedActual) { -finalAccumData.computeIfAbsent( -withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key, -key -> new ArrayList<>()).add(kv); +final K key = withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key; +final List records = finalAccumData.computeIfAbsent(key, k -> new ArrayList<>()); +if (!records.contains(kv)) { +records.add(kv); +} } final Map> finalExpected = new HashMap<>(); for (final T kv : expectedRecords) { -finalExpected.computeIfAbsent( -withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key, -key -> new ArrayList<>()).add(kv); +final K key = withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key; +final List records = finalExpected.computeIfAbsent(key, k -> new ArrayList<>()); +if (!records.contains(kv)) { +records.add(kv); +} Review Comment: These changes do not consider duplicate record during the comparison. A test that used this verification triggered a failure and verified then that the expected records were in the output topic. However, without the state updater no records were written to the output topic before the failure. With the state updater some records were written to the output topics. From a correctness point of view, both is correct since no commit happens before the simulated failure and so Streams reads again all input records after the failover (under ALOS). -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246652052 ## streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java: ## @@ -843,15 +846,19 @@ private static List waitUntilFinalKeyValueRecordsReceived(final Pro // still need to check that for each key, the ordering is expected final Map> finalAccumData = new HashMap<>(); for (final T kv : accumulatedActual) { -finalAccumData.computeIfAbsent( -withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key, -key -> new ArrayList<>()).add(kv); +final K key = withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key; +final List records = finalAccumData.computeIfAbsent(key, k -> new ArrayList<>()); +if (!records.contains(kv)) { +records.add(kv); +} } final Map> finalExpected = new HashMap<>(); for (final T kv : expectedRecords) { -finalExpected.computeIfAbsent( -withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key, -key -> new ArrayList<>()).add(kv); +final K key = withTimestamp ? ((KeyValueTimestamp) kv).key() : ((KeyValue) kv).key; +final List records = finalExpected.computeIfAbsent(key, k -> new ArrayList<>()); +if (!records.contains(kv)) { +records.add(kv); +} Review Comment: These changes do not consider duplicate record during the comparison. A test that used this verification triggered a failure and verified then that the expected records were in the output topic. However, without the state updater no records were written to the output topic before the failure. With the state updater some records were written to the output topics. From a correctness point of view, both is correct since no commit happens before the simulated failure and so Streams reads again all input records after the failover. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246640445 ## streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java: ## @@ -87,7 +88,7 @@ public static void closeCluster() { } -private final Time time = CLUSTER.time; +private final Time time = new MockTime(1); Review Comment: The time needs to progress when the state updater is shutdown, otherwise the shutdown is blocked. `CLUSTER` provides a mock time that does not progress. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593 ## streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java: ## @@ -173,9 +173,9 @@ public static void closeCluster() { private static final List> STANDARD_INPUT_DATA = asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); private static final List> COUNT_OUTPUT_DATA = -asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching Review Comment: I turned off the caching because with the state updater the results were different than without the state updater. The differences were only there due to caching. I think the processing and commit times differ when the state updater is on or off. If caching is not needed, I think turning it off makes tests more robust. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593 ## streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java: ## @@ -173,9 +173,9 @@ public static void closeCluster() { private static final List> STANDARD_INPUT_DATA = asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); private static final List> COUNT_OUTPUT_DATA = -asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching Review Comment: I turned off the caching because with the state updater the commits happen with different contents of the cache than without the state updater. If caching is not needed, I think turning it off makes tests more robust. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593 ## streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java: ## @@ -173,9 +173,9 @@ public static void closeCluster() { private static final List> STANDARD_INPUT_DATA = asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); private static final List> COUNT_OUTPUT_DATA = -asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching Review Comment: I turned off the caching because with the state updater the commits happen with different contents of the cache than without the state updater. If it is not needed, I think turning off caching makes tests more robust. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246621516 ## streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java: ## @@ -173,9 +173,9 @@ public static void closeCluster() { private static final List> STANDARD_INPUT_DATA = asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); private static final List> COUNT_OUTPUT_DATA = -asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching +asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L)); Review Comment: See https://github.com/apache/kafka/pull/13927/files#r1246594593 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593 ## streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java: ## @@ -173,9 +173,9 @@ public static void closeCluster() { private static final List> STANDARD_INPUT_DATA = asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); private static final List> COUNT_OUTPUT_DATA = -asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching Review Comment: I turned off the caching because with the state updater the commits happen and different points in time than without the state updater. Thus, caching behaves differently. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246587948 ## streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java: ## @@ -173,9 +173,9 @@ public static void closeCluster() { private static final List> STANDARD_INPUT_DATA = asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 400L), pair("C", -50L)); private static final List> COUNT_OUTPUT_DATA = -asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of count operation with caching Review Comment: @wcarlson5 @ableegoldman Could you review this changes to know if I changed something I should not have done? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246577358 ## streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java: ## @@ -207,11 +207,12 @@ public void shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMulti }); } -startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120)); +for (final KafkaStreams stream: kafkaStreamsList) { +stream.start(); +} Review Comment: With the state updater enabled, the Streams clients might still be in `REBALANCING` when the task that throws the `IllegalStateExeption` is processing. Tasks are processed and restored in parallel, but the stream thread transitions to RUNNING when all tasks are restored. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246572219 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1881,15 +1915,11 @@ public static void executeAndMaybeSwallow(final boolean clean, } boolean needsInitializationOrRestoration() { -return activeTaskIterable().stream().anyMatch(Task::needsInitializationOrRestoration); +return activeTaskStream().anyMatch(Task::needsInitializationOrRestoration); } // for testing only void addTask(final Task task) { tasks.addTask(task); } - -TasksRegistry tasks() { -return tasks; -} Review Comment: Never used. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246556477 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java: ## @@ -1138,28 +1140,35 @@ public void signalResume() { public Map getTaskOffsetSums() { final Map taskOffsetSums = new HashMap<>(); -// Not all tasks will create directories, and there may be directories for tasks we don't currently own, -// so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should -// just have an empty changelogOffsets map. -for (final TaskId id : union(HashSet::new, lockedTaskDirectories, tasks.allTaskIds())) { -final Task task = tasks.contains(id) ? tasks.task(id) : null; -// Closed and uninitialized tasks don't have any offsets so we should read directly from the checkpoint -if (task != null && task.state() != State.CREATED && task.state() != State.CLOSED) { +final Map tasks = allTasks(); +final Set createdAndClosedTasks = new HashSet<>(); +for (final Task task : tasks.values()) { +if (task.state() != State.CREATED && task.state() != State.CLOSED) { final Map changelogOffsets = task.changelogOffsets(); if (changelogOffsets.isEmpty()) { -log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", id); +log.debug("Skipping to encode apparently stateless (or non-logged) offset sum for task {}", +task.id()); } else { -taskOffsetSums.put(id, sumOfChangelogOffsets(id, changelogOffsets)); +taskOffsetSums.put(task.id(), sumOfChangelogOffsets(task.id(), changelogOffsets)); } } else { -final File checkpointFile = stateDirectory.checkpointFileFor(id); -try { -if (checkpointFile.exists()) { -taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read())); -} -} catch (final IOException e) { -log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e); +createdAndClosedTasks.add(task.id()); +} +} + +// Not all tasks will create directories, and there may be directories for tasks we don't currently own, +// so we consider all tasks that are either owned or on disk. This includes stateless tasks, which should +// just have an empty changelogOffsets map. +final Set lockedTaskDirectoriesOfNonOwnedTasks = new HashSet<>(lockedTaskDirectories); +lockedTaskDirectoriesOfNonOwnedTasks.removeAll(tasks.keySet()); +for (final TaskId id : union(HashSet::new, lockedTaskDirectoriesOfNonOwnedTasks, createdAndClosedTasks)) { +final File checkpointFile = stateDirectory.checkpointFileFor(id); +try { +if (checkpointFile.exists()) { +taskOffsetSums.put(id, sumOfChangelogOffsets(id, new OffsetCheckpoint(checkpointFile).read())); } +} catch (final IOException e) { +log.warn(String.format("Exception caught while trying to read checkpoint for task %s:", id), e); } } Review Comment: This is the fix from https://github.com/apache/kafka/pull/13925. This will disappear once the fix is merged and this PR rebased. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246555106 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -846,7 +853,7 @@ void runOnce() { if (log.isDebugEnabled()) { log.debug("Committed all active tasks {} and standby tasks {} in {}ms", -taskManager.activeTaskIds(), taskManager.standbyTaskIds(), commitLatency); +taskManager.activeRunningTaskIds(), taskManager.standbyTaskIds(), commitLatency); Review Comment: This is basically a renaming to make the code clearer. With the state updater restoring tasks are owned by the state updater. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246547471 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java: ## @@ -201,7 +201,7 @@ public boolean commitNeeded() { @Override public StateStore getStore(final String name) { -throw new UnsupportedOperationException("This task is read-only"); +return task.getStore(name); Review Comment: Needed in some tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246546435 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java: ## @@ -230,8 +230,11 @@ public StoreChangelogReader(final Time time, this.restoreConsumer = restoreConsumer; this.stateRestoreListener = stateRestoreListener; -this.stateUpdaterEnabled = -InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false); +this.stateUpdaterEnabled = InternalConfig.getBoolean( +config.originals(), +InternalConfig.STATE_UPDATER_ENABLED, +InternalConfig.STATE_UPDATER_ENABLED_DEFAULT +); Review Comment: Enabling the state updater by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246546045 ## streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java: ## @@ -1171,6 +1171,7 @@ public static class InternalConfig { // Private API to enable the state updater (i.e. state updating on a dedicated thread) public static final String STATE_UPDATER_ENABLED = "__state.updater.enabled__"; +public static final boolean STATE_UPDATER_ENABLED_DEFAULT = true; Review Comment: Enabling the state updater by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default
cadonna commented on code in PR #13927: URL: https://github.com/apache/kafka/pull/13927#discussion_r1246546787 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -368,8 +368,11 @@ public static StreamThread create(final TopologyMetadata topologyMetadata, final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, streamsMetrics); -final boolean stateUpdaterEnabled = -InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false); +final boolean stateUpdaterEnabled = InternalConfig.getBoolean( +config.originals(), +InternalConfig.STATE_UPDATER_ENABLED, +InternalConfig.STATE_UPDATER_ENABLED_DEFAULT +); Review Comment: Enabling the state updater by default. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java: ## @@ -551,7 +554,11 @@ public StreamThread(final Time time, this.numIterations = 1; this.eosEnabled = eosEnabled(config); -this.stateUpdaterEnabled = InternalConfig.getBoolean(config.originals(), InternalConfig.STATE_UPDATER_ENABLED, false); +this.stateUpdaterEnabled = InternalConfig.getBoolean( +config.originals(), +InternalConfig.STATE_UPDATER_ENABLED, +InternalConfig.STATE_UPDATER_ENABLED_DEFAULT +); Review Comment: Enabling the state updater by default. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org