lucasbru commented on code in PR #20284:
URL: https://github.com/apache/kafka/pull/20284#discussion_r2247861071
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -371,6 +372,9 @@ public boolean isStartingRunningOrPartitionAssigned() {
private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture =
new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new
KafkaFutureImpl<>();
+ // Missing source topic timeout tracking
+ private long firstMissingSourceTopicTime = -1L;
Review Comment:
Maybe it would make things slighly more easy to read if we'd use
`org.apache.kafka.common.utils.Timer` for this?
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1534,15 +1538,28 @@ private ConsumerRecords<byte[], byte[]>
pollRequests(final Duration pollTime) {
public void handleStreamsRebalanceData() {
if (streamsRebalanceData.isPresent()) {
+ boolean hasMissingSourceTopics = false;
+ String missingTopicsDetail = null;
+
for (final StreamsGroupHeartbeatResponseData.Status status :
streamsRebalanceData.get().statuses()) {
if (status.statusCode() ==
StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) {
shutdownErrorHook.run();
} else if (status.statusCode() ==
StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code()) {
- final String errorMsg = String.format("Missing source
topics: %s", status.statusDetail());
+ hasMissingSourceTopics = true;
+ missingTopicsDetail = status.statusDetail();
+ } else if (status.statusCode() ==
StreamsGroupHeartbeatResponse.Status.INCORRECTLY_PARTITIONED_TOPICS.code()) {
+ final String errorMsg = status.statusDetail();
log.error(errorMsg);
- throw new MissingSourceTopicException(errorMsg);
+ throw new TopologyException(errorMsg);
}
}
+
+ if (hasMissingSourceTopics) {
+ handleMissingSourceTopicsWithTimeout(missingTopicsDetail);
+ } else {
+ // Reset timeout tracking when no missing source topics are
reported
+ firstMissingSourceTopicTime = -1L;
Review Comment:
I think if you use org.apache.kafka.common.utils.Timer and call reset here,
you don't need the inline comment.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -371,6 +372,9 @@ public boolean isStartingRunningOrPartitionAssigned() {
private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture =
new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new
KafkaFutureImpl<>();
+ // Missing source topic timeout tracking
+ private long firstMissingSourceTopicTime = -1L;
Review Comment:
Also, can we rename this to a more generic `topicsReadyTimer`? I think we
may want to reuse the timer to also time out when internal topics are not
created in time.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1561,6 +1578,33 @@ public void handleStreamsRebalanceData() {
}
}
+ private void handleMissingSourceTopicsWithTimeout(final String
missingTopicsDetail) {
Review Comment:
Yes, using `org.apache.kafka.common.utils.Timer` should simplify this.
##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -371,6 +372,9 @@ public boolean isStartingRunningOrPartitionAssigned() {
private volatile KafkaFutureImpl<Uuid> restoreConsumerInstanceIdFuture =
new KafkaFutureImpl<>();
private volatile KafkaFutureImpl<Uuid> producerInstanceIdFuture = new
KafkaFutureImpl<>();
+ // Missing source topic timeout tracking
Review Comment:
If you describe a member, I'd use a javadoc comment. But this comment isn't
adding anything on top of the variable name, so maybe we can drop it altogether?
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionIntegrationTest.java:
##########
@@ -40,46 +67,21 @@
import org.apache.kafka.streams.kstream.Named;
import org.apache.kafka.streams.kstream.Repartitioned;
import org.apache.kafka.streams.processor.StreamPartitioner;
+import static org.apache.kafka.streams.utils.TestUtils.safeUniqueTestName;
import org.apache.kafka.test.TestUtils;
-
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.TestInfo;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
-
-import java.io.File;
Review Comment:
Can you please revert the import reordering?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]