Copilot commented on code in PR #20735:
URL: https://github.com/apache/kafka/pull/20735#discussion_r2447007929
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1587,11 +1587,15 @@ public void handleStreamsRebalanceData() {
}
private void handleMissingSourceTopicsWithTimeout(final String
missingTopicsDetail) {
+ // Determine the timeout: use 2 * heartbeatIntervalMs
+ final int heartbeatIntervalMs =
streamsRebalanceData.get().getHeartbeatIntervalMs();
+ final long timeoutMs = 2 * heartbeatIntervalMs;
Review Comment:
Potential integer overflow when multiplying `heartbeatIntervalMs` by 2. If
`heartbeatIntervalMs` is close to `Integer.MAX_VALUE`, the multiplication will
overflow before being promoted to `long`. Cast `heartbeatIntervalMs` to `long`
before multiplication: `final long timeoutMs = 2L * heartbeatIntervalMs;`
```suggestion
final long timeoutMs = 2L * heartbeatIntervalMs;
```
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1587,11 +1587,15 @@ public void handleStreamsRebalanceData() {
}
private void handleMissingSourceTopicsWithTimeout(final String
missingTopicsDetail) {
+ // Determine the timeout: use 2 * heartbeatIntervalMs
+ final int heartbeatIntervalMs =
streamsRebalanceData.get().getHeartbeatIntervalMs();
+ final long timeoutMs = 2 * heartbeatIntervalMs;
+
// Start timeout tracking on first encounter with missing topics
if (topicsReadyTimer == null) {
- topicsReadyTimer = time.timer(maxPollTimeMs);
+ topicsReadyTimer = time.timer(timeoutMs);
Review Comment:
Missing validation for `heartbeatIntervalMs` being -1 (its default value
when not yet set by a heartbeat response). If no heartbeat has been received,
this will result in `timeoutMs = -2`, causing incorrect timer behavior. Add a
check to fall back to `maxPollTimeMs` when `heartbeatIntervalMs == -1`.
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/StreamsRebalanceDataTest.java:
##########
@@ -437,4 +437,44 @@ public void
streamsRebalanceDataShouldBeConstructedWithEmptyStatuses() {
assertTrue(streamsRebalanceData.statuses().isEmpty());
}
+ @Test
+ public void
streamsRebalanceDataShouldBeConstructedWithHeartbeatIntervalMsSetToMinusOne() {
+ final UUID processId = UUID.randomUUID();
+ final Optional<StreamsRebalanceData.HostInfo> endpoint =
Optional.of(new
+ StreamsRebalanceData.HostInfo("localhost", 9090));
+ final Map<String, StreamsRebalanceData.Subtopology> subtopologies =
Map.of();
+ final Map<String, String> clientTags = Map.of("clientTag1",
+ "clientTagValue1");
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
+ processId,
+ endpoint,
+ subtopologies,
+ clientTags
+ );
+
+ assertEquals(-1, streamsRebalanceData.getHeartbeatIntervalMs());
+ }
+
+ @Test
+ public void streamsRebalanceDataShouldBeAbleToUpdateHeartbeatIntervalMs() {
+ final UUID processId = UUID.randomUUID();
+ final Optional<StreamsRebalanceData.HostInfo> endpoint =
Optional.of(new
+ StreamsRebalanceData.HostInfo("localhost", 9090));
+ final Map<String, StreamsRebalanceData.Subtopology> subtopologies =
Map.of();
+ final Map<String, String> clientTags = Map.of("clientTag1",
+ "clientTagValue1");
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
+ processId,
+ endpoint,
+ subtopologies,
+ clientTags
+ );
+
+ streamsRebalanceData.setHeartbeatIntervalMs(1000);
+ assertEquals(1000, streamsRebalanceData.getHeartbeatIntervalMs());
+ }
+
+
+
+
Review Comment:
Remove unnecessary blank lines at the end of the file. There are three
consecutive blank lines before the closing brace.
```suggestion
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]