mjsax commented on code in PR #20284:
URL: https://github.com/apache/kafka/pull/20284#discussion_r2255473660


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1561,6 +1578,33 @@ public void handleStreamsRebalanceData() {
         }
     }
 
+    private void handleMissingSourceTopicsWithTimeout(final String 
missingTopicsDetail) {
+        // Start timeout tracking on first encounter with missing topics
+        if (topicsReadyTimer == null) {
+            topicsReadyTimer = time.timer(maxPollTimeMs);
+            log.info("Missing source topics detected: {}. Will wait up to {}ms 
before failing.", 
+                missingTopicsDetail, maxPollTimeMs);
+        } else {
+            topicsReadyTimer.update();
+        }
+        
+        if (topicsReadyTimer.isExpired()) {
+            final long elapsedTime = topicsReadyTimer.elapsedMs();
+            final String errorMsg = String.format("Missing source topics: %s. 
Timeout exceeded after %dms.", 
+                missingTopicsDetail, elapsedTime);
+            log.error(errorMsg);
+            
+            // Reset timer for next timeout cycle
+            topicsReadyTimer.updateAndReset(maxPollTimeMs);

Review Comment:
   Why do we need to update the timer? We throw `MissingSourceTopicException` 
below, and this should shut down the thread?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -1534,15 +1538,28 @@ private ConsumerRecords<byte[], byte[]> 
pollRequests(final Duration pollTime) {
 
     public void handleStreamsRebalanceData() {
         if (streamsRebalanceData.isPresent()) {
+            boolean hasMissingSourceTopics = false;
+            String missingTopicsDetail = null;
+            
             for (final StreamsGroupHeartbeatResponseData.Status status : 
streamsRebalanceData.get().statuses()) {
                 if (status.statusCode() == 
StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) {
                     shutdownErrorHook.run();
                 } else if (status.statusCode() == 
StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code()) {
-                    final String errorMsg = String.format("Missing source 
topics: %s", status.statusDetail());
+                    hasMissingSourceTopics = true;
+                    missingTopicsDetail = status.statusDetail();
+                } else if (status.statusCode() == 
StreamsGroupHeartbeatResponse.Status.INCORRECTLY_PARTITIONED_TOPICS.code()) {
+                    final String errorMsg = status.statusDetail();
                     log.error(errorMsg);
-                    throw new MissingSourceTopicException(errorMsg);
+                    throw new TopologyException(errorMsg);

Review Comment:
   It seems this case is newly added, but we did not add a new test for it?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java:
##########
@@ -4047,8 +4058,99 @@ public void 
testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic()
                         
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
                         .setStatusDetail("Missing source topics")
         ));
+        
+        // First call should not throw exception (within timeout)
+        thread.runOnceWithProcessingThreads();
+        
+        // Advance time beyond max.poll.interval.ms (default is 300000ms) to 
trigger timeout
+        mockTime.sleep(300001);
+        
         final MissingSourceTopicException exception = 
assertThrows(MissingSourceTopicException.class, () -> 
thread.runOnceWithProcessingThreads());
-        assertTrue(exception.getMessage().startsWith("Missing source topics"));
+        assertTrue(exception.getMessage().contains("Missing source topics"));
+        assertTrue(exception.getMessage().contains("Timeout exceeded"));
+    }
+
+    @Test
+    public void testStreamsProtocolMissingSourceTopicRecovery() {
+        final ConsumerGroupMetadata consumerGroupMetadata = 
Mockito.mock(ConsumerGroupMetadata.class);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        when(mainConsumer.poll(Mockito.any(Duration.class))).thenReturn(new 
ConsumerRecords<>(Map.of(), Map.of()));
+        when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
+                UUID.randomUUID(),
+                Optional.empty(),
+                Map.of(),
+                Map.of()
+        );
+
+        final Properties props = configProps(false, false, false);
+        final Runnable shutdownErrorHook = mock(Runnable.class);
+        final StreamsConfig config = new StreamsConfig(props);
+        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(
+                new TopologyMetadata(internalTopologyBuilder, config),
+                StreamsMetadataState.UNKNOWN_HOST,
+                new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
+        );
+        final MockTime mockTime = new MockTime(1);
+        thread = new StreamThread(
+                mockTime,
+                config,
+                null,
+                mainConsumer,
+                consumer,
+                changelogReader,
+                null,
+                mock(TaskManager.class),
+                null,
+                new StreamsMetricsImpl(metrics, CLIENT_ID, 
PROCESS_ID.toString(), mockTime),
+                new TopologyMetadata(internalTopologyBuilder, config),
+                PROCESS_ID,
+                CLIENT_ID,
+                new LogContext(""),
+                null,
+                new AtomicLong(Long.MAX_VALUE),
+                new LinkedList<>(),
+                shutdownErrorHook,
+                HANDLER,
+                null,
+                Optional.of(streamsRebalanceData),
+                streamsMetadataState
+        ).updateThreadMetadata(adminClientId(CLIENT_ID));
+
+        thread.setState(State.STARTING);
+        thread.runOnceWithoutProcessingThreads();
+
+        // Set missing source topics status
+        streamsRebalanceData.setStatuses(List.of(
+                new StreamsGroupHeartbeatResponseData.Status()
+                        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
+                        .setStatusDetail("Missing source topics")
+        ));
+        
+        // First call should not throw exception (within timeout)
+        thread.runOnceWithoutProcessingThreads();
+        
+        // Advance time but not beyond timeout
+        mockTime.sleep(150000); // Half of max.poll.interval.ms
+        
+        // Should still not throw exception
+        thread.runOnceWithoutProcessingThreads();
+        
+        // Clear the missing source topics (simulate recovery)
+        streamsRebalanceData.setStatuses(List.of());
+        
+        // Should complete without exception (recovery successful)
+        assertDoesNotThrow(() -> thread.runOnceWithoutProcessingThreads());
+        
+        // Set missing topics again - should reset the timeout
+        streamsRebalanceData.setStatuses(List.of(
+                new StreamsGroupHeartbeatResponseData.Status()
+                        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
+                        .setStatusDetail("Different missing topics")
+        ));
+        
+        // Should not throw immediately (timeout reset)
+        assertDoesNotThrow(() -> thread.runOnceWithoutProcessingThreads());

Review Comment:
   Well, while we did reset the timer, even without timeout reset we would not 
throw here, as we did not advance `mockTime` further? -- To really verify the 
timeout reset, it seems we would need to advance `mockTime` (eg by 20 seconds) 
after `setStatuses` but before this test?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to