[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-07-12 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1260704672


##
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##
@@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws 
Exception {
 }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
 
 // Kill instance, delete state to force restoration.
-assertThat("Streams instance did not close within timeout", 
streams.close(Duration.ofSeconds(60)));
+assertThat("Streams instance did not close within timeout", 
streams.get().close(Duration.ofSeconds(60)));
 IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
 Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
 .map(Path::toFile)
 .forEach(f -> assertTrue(f.delete(), "Some state " + f + " 
could not be deleted"));
+} finally {
+streams.get().close();

Review Comment:
   Look at line 284. There `streams` is used in a lambda expression. You can 
only use variables that are final or effective final in a lambda expression. 
There are two possibilities to make `streams` final: either we use a final 
`AtomicReference` or we assign `streams` to an effective final variable and use 
that in the lambda. I chose the `AtomicReference` option. This are the two 
options because we need to initialize the variable outside of the `try`-clause 
to have an initialized variable in the `finally`-clause. 
   However, I now realized that we can create the Streams client outside the 
`try`-clause because if the constructor fails we do not need to call close and 
cleanup in the `finally`-clause.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-07-12 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1260704672


##
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##
@@ -308,23 +309,30 @@ public void shouldFetchLagsDuringRestoration() throws 
Exception {
 }, WAIT_TIMEOUT_MS, "Eventually should reach zero lag.");
 
 // Kill instance, delete state to force restoration.
-assertThat("Streams instance did not close within timeout", 
streams.close(Duration.ofSeconds(60)));
+assertThat("Streams instance did not close within timeout", 
streams.get().close(Duration.ofSeconds(60)));
 IntegrationTestUtils.purgeLocalStreamsState(streamsConfiguration);
 Files.walk(stateDir.toPath()).sorted(Comparator.reverseOrder())
 .map(Path::toFile)
 .forEach(f -> assertTrue(f.delete(), "Some state " + f + " 
could not be deleted"));
+} finally {
+streams.get().close();

Review Comment:
   Look at line 284. There `streams` is used in a lambda expression. You can 
only use variables that are final or effective final in a lambda expression. 
There are two possibilities to make `streams` final: either we use a final 
`AtomicReference` or we assign `streams` to an effective final variable and use 
that in the lambda. I chose the `AtomicReference` option. This are the two 
option because if we do not initialize the variable outside of the `try`-clause 
because we need an initialized variable in the `finally`-clause. 
   However, I now realized that we can create the Streams client outside the 
`try`-clause because if the constructor fails we do not need to call close and 
cleanup in the `finally`-clause.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-07-10 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1258012878


##
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##
@@ -173,9 +173,9 @@ public static void closeCluster() {
 private static final List> STANDARD_INPUT_DATA =
 asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
 private static final List> COUNT_OUTPUT_DATA =
-asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of 
count operation with caching

Review Comment:
   @wcarlson5 Could you elaborate what is worrying you here?
   
   I ran the tests with disabled state updater and the disabled cache and all 
tests pass. That tells me that the state updater does not change any results.
   Additionally, the results with enabled and disabled cache are semantically 
equivalent.
   ```
   pair("B", 1L), pair("A", 2L), pair("C", 2L)
   ```
   is equivalent to
   ```
   pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), pair("C", 2L)
   ```
   The latter just produces more intermediate results.
   
   In general, if a test verifies the correctness of results, disabling the 
cache makes the test more robust, because the results do not depend on commit 
time intervals or production rates.
   
   Let me know if I missed something here.  
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-07-10 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1257992420


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java:
##
@@ -207,11 +207,12 @@ public void 
shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMulti
 });
 }
 
-startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
+for (final KafkaStreams stream: kafkaStreamsList) {
+stream.start();
+}

Review Comment:
   I am not sure I understand what you are asking, so bear with me if my reply 
is not the one you expected. 
   
   Adding
   ```
   waitForApplicationState(Arrays.asList(streams, streamsTwo, streamsThree), 
KafkaStreams.State.RUNNING, ofSeconds(60))
   ```
   after starting all Streams clients makes the test flaky because with the 
state updater NOT all clients need to be in `RUNNING` before a Streams client 
transits to `ERROR`. Not even the Streams client that throws the exception 
needs to be in `RUNNING` before transitioning to `ERROR`. 
   Actually, now that I looked more at this code, I am wondering why this test 
was not flaky before my changes. Each Streams client is started independently. 
That means, there is no guarantee, that all three Streams clients are `RUNNING` 
at the same time. I did some testing with `Thread.sleep()` after each client 
were started to simulate delays in processing in the version of the test before 
my changes and the test was flaky indeed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-07-10 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1257910205


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -571,7 +571,9 @@ private Map> 
pendingTasksToCreate(final Map> entry = iter.next();
 final TaskId taskId = entry.getKey();
-if (taskId.topologyName() != null && 
!topologyMetadata.namedTopologiesView().contains(taskId.topologyName())) {
+final boolean taskIsOwned = tasks.allTaskIds().contains(taskId)

Review Comment:
   The idea was to make the code more readable since the condition is quite 
complex but it actually just verifies that a task is owned. I prefer to use an 
additional variable or method rather than use complex logical conditions that 
give the reader a hard time. So, I would rather keep the variable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-07-06 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1254284586


##
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##
@@ -87,7 +88,7 @@ public static void closeCluster() {
 }
 
 
-private final Time time = CLUSTER.time;
+private final Time time = new MockTime(1);

Review Comment:
   I tried to expose the `autoTickMs` parameter in embedded kafka, but I ran 
into a infinite loop.
   
   Regarding your question:
   
   > Are we not worried that broker time and our time might diverge?
   
   In almost all our integration tests broker time diverges from streams time 
since the broker time does not advance but we usually use system time in 
`KafkaStreams`. This is not something this PR introduces.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-07-06 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1254072786


##
streams/src/test/java/org/apache/kafka/streams/integration/LagFetchIntegrationTest.java:
##
@@ -277,6 +277,7 @@ public void shouldFetchLagsDuringRestoration() throws 
Exception {
 t1.toStream().to(outputTopicName);
 final KafkaStreams streams = new KafkaStreams(builder.build(), props);

Review Comment:
   Actually the Streams clients are closed in the `finally`-clause. The thing 
that was missing was that the Streams clients were not created in the 
`try`-clause. I needed to use `AtomicReference` for that, otherwise I could not 
use the Streams clients in lambdas.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246717245


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -368,8 +368,11 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
 
 final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, 
streamsMetrics);
 
-final boolean stateUpdaterEnabled =
-InternalConfig.getBoolean(config.originals(), 
InternalConfig.STATE_UPDATER_ENABLED, false);
+final boolean stateUpdaterEnabled = InternalConfig.getBoolean(
+config.originals(),
+InternalConfig.STATE_UPDATER_ENABLED,
+InternalConfig.STATE_UPDATER_ENABLED_DEFAULT
+);

Review Comment:
   Good idea!
   Done!
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246699178


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##
@@ -1123,7 +1138,10 @@ public void 
shouldInjectProducerPerThreadUsingClientSupplierOnCreateIfEosV2Enabl
 public void shouldOnlyCompleteShutdownAfterRebalanceNotInProgress() throws 
InterruptedException {
 internalTopologyBuilder.addSource(null, "source1", null, null, null, 
topic1);
 
-final StreamThread thread = createStreamThread(CLIENT_ID, new 
StreamsConfig(configProps(true)), true);
+final Properties props = configProps(true);
+props.put(InternalConfig.STATE_UPDATER_ENABLED, false);

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246696265


##
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##
@@ -3038,6 +3149,7 @@ public void 
shouldRespectPollTimeInPartitionsAssignedStateWithStateUpdater() {
 @Test
 public void 
shouldNotBlockWhenPollingInPartitionsAssignedStateWithoutStateUpdater() {
 final Properties streamsConfigProps = 
StreamsTestUtils.getStreamsConfig();
+streamsConfigProps.put(InternalConfig.STATE_UPDATER_ENABLED, false);

Review Comment:
   I thought the code is better readable if I leave it in the corresponding 
with-state-updater test.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246652052


##
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java:
##
@@ -843,15 +846,19 @@ private static  List 
waitUntilFinalKeyValueRecordsReceived(final Pro
 // still need to check that for each key, the ordering is 
expected
 final Map> finalAccumData = new HashMap<>();
 for (final T kv : accumulatedActual) {
-finalAccumData.computeIfAbsent(
-withTimestamp ? ((KeyValueTimestamp) kv).key() : 
((KeyValue) kv).key,
-key -> new ArrayList<>()).add(kv);
+final K key = withTimestamp ? ((KeyValueTimestamp) 
kv).key() : ((KeyValue) kv).key;
+final List records = 
finalAccumData.computeIfAbsent(key, k -> new ArrayList<>());
+if (!records.contains(kv)) {
+records.add(kv);
+}
 }
 final Map> finalExpected = new HashMap<>();
 for (final T kv : expectedRecords) {
-finalExpected.computeIfAbsent(
-withTimestamp ? ((KeyValueTimestamp) kv).key() : 
((KeyValue) kv).key,
-key -> new ArrayList<>()).add(kv);
+final K key = withTimestamp ? ((KeyValueTimestamp) 
kv).key() : ((KeyValue) kv).key;
+final List records = finalExpected.computeIfAbsent(key, 
k -> new ArrayList<>());
+if (!records.contains(kv)) {
+records.add(kv);
+}

Review Comment:
   These changes do not consider duplicate record during the comparison. A test 
that used this verification triggered a failure and verified then that the 
expected records were in the output topic. However, without the state updater 
no records were written to the output topic before the failure. With the state 
updater some records were written to the output topics.
   From a correctness point of view, both is correct since no commit happens 
before the simulated failure and so Streams reads again all input records after 
the failover (under ALOS).   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246652052


##
streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java:
##
@@ -843,15 +846,19 @@ private static  List 
waitUntilFinalKeyValueRecordsReceived(final Pro
 // still need to check that for each key, the ordering is 
expected
 final Map> finalAccumData = new HashMap<>();
 for (final T kv : accumulatedActual) {
-finalAccumData.computeIfAbsent(
-withTimestamp ? ((KeyValueTimestamp) kv).key() : 
((KeyValue) kv).key,
-key -> new ArrayList<>()).add(kv);
+final K key = withTimestamp ? ((KeyValueTimestamp) 
kv).key() : ((KeyValue) kv).key;
+final List records = 
finalAccumData.computeIfAbsent(key, k -> new ArrayList<>());
+if (!records.contains(kv)) {
+records.add(kv);
+}
 }
 final Map> finalExpected = new HashMap<>();
 for (final T kv : expectedRecords) {
-finalExpected.computeIfAbsent(
-withTimestamp ? ((KeyValueTimestamp) kv).key() : 
((KeyValue) kv).key,
-key -> new ArrayList<>()).add(kv);
+final K key = withTimestamp ? ((KeyValueTimestamp) 
kv).key() : ((KeyValue) kv).key;
+final List records = finalExpected.computeIfAbsent(key, 
k -> new ArrayList<>());
+if (!records.contains(kv)) {
+records.add(kv);
+}

Review Comment:
   These changes do not consider duplicate record during the comparison. A test 
that used this verification triggered a failure and verified then that the 
expected records were in the output topic. However, without the state updater 
no records were written to the output topic before the failure. With the state 
updater some records were written to the output topics.
   From a correctness point of view, both is correct since no commit happens 
before the simulated failure and so Streams reads again all input records after 
the failover.   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246640445


##
streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java:
##
@@ -87,7 +88,7 @@ public static void closeCluster() {
 }
 
 
-private final Time time = CLUSTER.time;
+private final Time time = new MockTime(1);

Review Comment:
   The time needs to progress when the state updater is shutdown, otherwise the 
shutdown is blocked. `CLUSTER` provides a mock time that does not progress. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593


##
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##
@@ -173,9 +173,9 @@ public static void closeCluster() {
 private static final List> STANDARD_INPUT_DATA =
 asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
 private static final List> COUNT_OUTPUT_DATA =
-asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of 
count operation with caching

Review Comment:
   I turned off the caching because with the state updater the results were 
different than without the state updater. The differences were only there due 
to caching. I think the processing and commit times differ when the state 
updater is on or off. 
   If caching is not needed, I think turning it off makes tests more robust.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593


##
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##
@@ -173,9 +173,9 @@ public static void closeCluster() {
 private static final List> STANDARD_INPUT_DATA =
 asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
 private static final List> COUNT_OUTPUT_DATA =
-asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of 
count operation with caching

Review Comment:
   I turned off the caching because with the state updater the commits happen 
with different contents of the cache than without the state updater.
   If caching is not needed, I think turning it off makes tests more robust.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593


##
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##
@@ -173,9 +173,9 @@ public static void closeCluster() {
 private static final List> STANDARD_INPUT_DATA =
 asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
 private static final List> COUNT_OUTPUT_DATA =
-asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of 
count operation with caching

Review Comment:
   I turned off the caching because with the state updater the commits happen 
with different contents of the cache than without the state updater.
   If it is not needed, I think turning off caching makes tests more robust.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246621516


##
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##
@@ -173,9 +173,9 @@ public static void closeCluster() {
 private static final List> STANDARD_INPUT_DATA =
 asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
 private static final List> COUNT_OUTPUT_DATA =
-asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of 
count operation with caching
+asList(pair("A", 1L), pair("B", 1L), pair("A", 2L), pair("C", 1L), 
pair("C", 2L));

Review Comment:
   See https://github.com/apache/kafka/pull/13927/files#r1246594593



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246594593


##
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##
@@ -173,9 +173,9 @@ public static void closeCluster() {
 private static final List> STANDARD_INPUT_DATA =
 asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
 private static final List> COUNT_OUTPUT_DATA =
-asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of 
count operation with caching

Review Comment:
   I turned off the caching because with the state updater the commits happen 
and different points in time than without the state updater. Thus, caching 
behaves differently.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246587948


##
streams/src/test/java/org/apache/kafka/streams/integration/NamedTopologyIntegrationTest.java:
##
@@ -173,9 +173,9 @@ public static void closeCluster() {
 private static final List> STANDARD_INPUT_DATA =
 asList(pair("A", 100L), pair("B", 200L), pair("A", 300L), pair("C", 
400L), pair("C", -50L));
 private static final List> COUNT_OUTPUT_DATA =
-asList(pair("B", 1L), pair("A", 2L), pair("C", 2L)); // output of 
count operation with caching

Review Comment:
   @wcarlson5 @ableegoldman Could you review this changes to know if I changed 
something I should not have done?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246577358


##
streams/src/test/java/org/apache/kafka/streams/integration/KTableKTableForeignKeyInnerJoinCustomPartitionerIntegrationTest.java:
##
@@ -207,11 +207,12 @@ public void 
shouldThrowIllegalArgumentExceptionWhenCustomPartitionerReturnsMulti
 });
 }
 
-startApplicationAndWaitUntilRunning(kafkaStreamsList, ofSeconds(120));
+for (final KafkaStreams stream: kafkaStreamsList) {
+stream.start();
+}

Review Comment:
   With the state updater enabled, the Streams clients might still be in 
`REBALANCING` when the task that throws the `IllegalStateExeption` is 
processing. Tasks are processed and restored in parallel, but the stream thread 
transitions to RUNNING when all tasks are restored.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246572219


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1881,15 +1915,11 @@ public static void executeAndMaybeSwallow(final boolean 
clean,
 }
 
 boolean needsInitializationOrRestoration() {
-return 
activeTaskIterable().stream().anyMatch(Task::needsInitializationOrRestoration);
+return 
activeTaskStream().anyMatch(Task::needsInitializationOrRestoration);
 }
 
 // for testing only
 void addTask(final Task task) {
 tasks.addTask(task);
 }
-
-TasksRegistry tasks() {
-return tasks;
-}

Review Comment:
   Never used.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246556477


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java:
##
@@ -1138,28 +1140,35 @@ public void signalResume() {
 public Map getTaskOffsetSums() {
 final Map taskOffsetSums = new HashMap<>();
 
-// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
-// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
-// just have an empty changelogOffsets map.
-for (final TaskId id : union(HashSet::new, lockedTaskDirectories, 
tasks.allTaskIds())) {
-final Task task = tasks.contains(id) ? tasks.task(id) : null;
-// Closed and uninitialized tasks don't have any offsets so we 
should read directly from the checkpoint
-if (task != null && task.state() != State.CREATED && task.state() 
!= State.CLOSED) {
+final Map tasks = allTasks();
+final Set createdAndClosedTasks = new HashSet<>();
+for (final Task task : tasks.values()) {
+if (task.state() != State.CREATED && task.state() != State.CLOSED) 
{
 final Map changelogOffsets = 
task.changelogOffsets();
 if (changelogOffsets.isEmpty()) {
-log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}", id);
+log.debug("Skipping to encode apparently stateless (or 
non-logged) offset sum for task {}",
+task.id());
 } else {
-taskOffsetSums.put(id, sumOfChangelogOffsets(id, 
changelogOffsets));
+taskOffsetSums.put(task.id(), 
sumOfChangelogOffsets(task.id(), changelogOffsets));
 }
 } else {
-final File checkpointFile = 
stateDirectory.checkpointFileFor(id);
-try {
-if (checkpointFile.exists()) {
-taskOffsetSums.put(id, sumOfChangelogOffsets(id, new 
OffsetCheckpoint(checkpointFile).read()));
-}
-} catch (final IOException e) {
-log.warn(String.format("Exception caught while trying to 
read checkpoint for task %s:", id), e);
+createdAndClosedTasks.add(task.id());
+}
+}
+
+// Not all tasks will create directories, and there may be directories 
for tasks we don't currently own,
+// so we consider all tasks that are either owned or on disk. This 
includes stateless tasks, which should
+// just have an empty changelogOffsets map.
+final Set lockedTaskDirectoriesOfNonOwnedTasks = new 
HashSet<>(lockedTaskDirectories);
+lockedTaskDirectoriesOfNonOwnedTasks.removeAll(tasks.keySet());
+for (final TaskId id : union(HashSet::new, 
lockedTaskDirectoriesOfNonOwnedTasks, createdAndClosedTasks)) {
+final File checkpointFile = stateDirectory.checkpointFileFor(id);
+try {
+if (checkpointFile.exists()) {
+taskOffsetSums.put(id, sumOfChangelogOffsets(id, new 
OffsetCheckpoint(checkpointFile).read()));
 }
+} catch (final IOException e) {
+log.warn(String.format("Exception caught while trying to read 
checkpoint for task %s:", id), e);
 }
 }

Review Comment:
   This is the fix from https://github.com/apache/kafka/pull/13925. This will 
disappear once the fix is merged and this PR rebased.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246555106


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -846,7 +853,7 @@ void runOnce() {
 
 if (log.isDebugEnabled()) {
 log.debug("Committed all active tasks {} and standby 
tasks {} in {}ms",
-taskManager.activeTaskIds(), 
taskManager.standbyTaskIds(), commitLatency);
+taskManager.activeRunningTaskIds(), 
taskManager.standbyTaskIds(), commitLatency);

Review Comment:
   This is basically a renaming to make the code clearer. With the state 
updater restoring tasks are owned by the state updater.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246547471


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/ReadOnlyTask.java:
##
@@ -201,7 +201,7 @@ public boolean commitNeeded() {
 
 @Override
 public StateStore getStore(final String name) {
-throw new UnsupportedOperationException("This task is read-only");
+return task.getStore(name);

Review Comment:
   Needed in some tests



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246546435


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StoreChangelogReader.java:
##
@@ -230,8 +230,11 @@ public StoreChangelogReader(final Time time,
 this.restoreConsumer = restoreConsumer;
 this.stateRestoreListener = stateRestoreListener;
 
-this.stateUpdaterEnabled =
-InternalConfig.getBoolean(config.originals(), 
InternalConfig.STATE_UPDATER_ENABLED, false);
+this.stateUpdaterEnabled = InternalConfig.getBoolean(
+config.originals(),
+InternalConfig.STATE_UPDATER_ENABLED,
+InternalConfig.STATE_UPDATER_ENABLED_DEFAULT
+);

Review Comment:
   Enabling the state updater by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246546045


##
streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java:
##
@@ -1171,6 +1171,7 @@ public static class InternalConfig {
 
 // Private API to enable the state updater (i.e. state updating on a 
dedicated thread)
 public static final String STATE_UPDATER_ENABLED = 
"__state.updater.enabled__";
+public static final boolean STATE_UPDATER_ENABLED_DEFAULT = true;

Review Comment:
   Enabling the state updater by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[GitHub] [kafka] cadonna commented on a diff in pull request #13927: KAFKA-10199: Enable state updater by default

2023-06-29 Thread via GitHub


cadonna commented on code in PR #13927:
URL: https://github.com/apache/kafka/pull/13927#discussion_r1246546787


##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -368,8 +368,11 @@ public static StreamThread create(final TopologyMetadata 
topologyMetadata,
 
 final ThreadCache cache = new ThreadCache(logContext, cacheSizeBytes, 
streamsMetrics);
 
-final boolean stateUpdaterEnabled =
-InternalConfig.getBoolean(config.originals(), 
InternalConfig.STATE_UPDATER_ENABLED, false);
+final boolean stateUpdaterEnabled = InternalConfig.getBoolean(
+config.originals(),
+InternalConfig.STATE_UPDATER_ENABLED,
+InternalConfig.STATE_UPDATER_ENABLED_DEFAULT
+);

Review Comment:
   Enabling the state updater by default.



##
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##
@@ -551,7 +554,11 @@ public StreamThread(final Time time,
 
 this.numIterations = 1;
 this.eosEnabled = eosEnabled(config);
-this.stateUpdaterEnabled = 
InternalConfig.getBoolean(config.originals(), 
InternalConfig.STATE_UPDATER_ENABLED, false);
+this.stateUpdaterEnabled = InternalConfig.getBoolean(
+config.originals(),
+InternalConfig.STATE_UPDATER_ENABLED,
+InternalConfig.STATE_UPDATER_ENABLED_DEFAULT
+);

Review Comment:
   Enabling the state updater by default.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org