This is an automated email from the ASF dual-hosted git repository.

lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new c625b44d8c5 MINOR: Throw exceptions if source topic is missing (#20123)
c625b44d8c5 is described below

commit c625b44d8c587db19c58e62c6428f060ca45decb
Author: Jinhe Zhang <[email protected]>
AuthorDate: Wed Jul 9 15:19:12 2025 -0400

    MINOR: Throw exceptions if source topic is missing (#20123)
    
    In the old protocol, Kafka Streams used to throw a
    `MissingSourceTopicException` when a source topic is missing. In the new
    protocol, it doesn’t do that anymore, while only log the status that is
    returned from the broker, which contains a status that indicates that a
    source topic is missing.
    
    This change:
    1. Throws an `MissingSourceTopicException` when source topic is missing
    2. Adds unit tests
    3. Modifies integration tests to fit both old and new protocols
    
    Reviewers: Lucas Brutschy <[email protected]>
---
 ...HandlingSourceTopicDeletionIntegrationTest.java |  14 ++-
 .../JoinWithIncompleteMetadataIntegrationTest.java |  14 ++-
 .../streams/processor/internals/StreamThread.java  |   5 +
 .../processor/internals/StreamThreadTest.java      | 117 +++++++++++++++++++++
 4 files changed, 144 insertions(+), 6 deletions(-)

diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
index b7673cd882b..92da39227f1 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.KafkaStreams;
 import org.apache.kafka.streams.KafkaStreams.State;
 import org.apache.kafka.streams.StreamsBuilder;
@@ -33,10 +34,12 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.util.Locale;
 import java.util.Properties;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -75,8 +78,9 @@ public class HandlingSourceTopicDeletionIntegrationTest {
         CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC);
     }
 
-    @Test
-    public void shouldThrowErrorAfterSourceTopicDeleted(final TestInfo 
testName) throws InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void shouldThrowErrorAfterSourceTopicDeleted(final boolean 
useNewProtocol, final TestInfo testName) throws InterruptedException {
         final StreamsBuilder builder = new StreamsBuilder();
         builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(), 
Serdes.String()))
             .to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(), 
Serdes.String()));
@@ -91,6 +95,10 @@ public class HandlingSourceTopicDeletionIntegrationTest {
         
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
         streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 
NUM_THREADS);
         streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000);
+        
+        if (useNewProtocol) {
+            streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+        }
 
         final Topology topology = builder.build();
         final KafkaStreams kafkaStreams1 = new KafkaStreams(topology, 
streamsConfiguration);
diff --git 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
index 413799042e7..04f35dcfce4 100644
--- 
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
+++ 
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.GroupProtocol;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -32,10 +33,12 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
+import java.util.Locale;
 import java.util.Properties;
 
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -87,10 +90,15 @@ public class JoinWithIncompleteMetadataIntegrationTest {
         IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
     }
 
-    @Test
-    public void testShouldAutoShutdownOnJoinWithIncompleteMetadata() throws 
InterruptedException {
+    @ParameterizedTest
+    @ValueSource(booleans = {false, true})
+    public void testShouldAutoShutdownOnJoinWithIncompleteMetadata(final 
boolean useNewProtocol) throws InterruptedException {
         STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
         STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
CLUSTER.bootstrapServers());
+        
+        if (useNewProtocol) {
+            STREAMS_CONFIG.put(StreamsConfig.GROUP_PROTOCOL_CONFIG, 
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+        }
 
         final KStream<Long, String> notExistStream = 
builder.stream(NON_EXISTENT_INPUT_TOPIC_LEFT);
 
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 86e12bf3f65..85700074a77 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -52,6 +52,7 @@ import org.apache.kafka.streams.StreamsConfig;
 import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.TaskMetadata;
 import org.apache.kafka.streams.ThreadMetadata;
+import org.apache.kafka.streams.errors.MissingSourceTopicException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
@@ -1536,6 +1537,10 @@ public class StreamThread extends Thread implements 
ProcessingThread {
             for (final StreamsGroupHeartbeatResponseData.Status status : 
streamsRebalanceData.get().statuses()) {
                 if (status.statusCode() == 
StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) {
                     shutdownErrorHook.run();
+                } else if (status.statusCode() == 
StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code()) {
+                    final String errorMsg = String.format("Missing source 
topics: %s", status.statusDetail());
+                    log.error(errorMsg);
+                    throw new MissingSourceTopicException(errorMsg);
                 }
             }
 
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1b4143eceee..603cbe55b0e 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -66,6 +66,7 @@ import org.apache.kafka.streams.StreamsConfig.InternalConfig;
 import org.apache.kafka.streams.ThreadMetadata;
 import org.apache.kafka.streams.TopologyConfig;
 import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.errors.MissingSourceTopicException;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.apache.kafka.streams.errors.TaskCorruptedException;
 import org.apache.kafka.streams.errors.TaskMigratedException;
@@ -3875,6 +3876,64 @@ public class StreamThreadTest {
         verify(shutdownErrorHook).run();
     }
 
+    @Test
+    public void 
testStreamsProtocolRunOnceWithoutProcessingThreadsMissingSourceTopic() {
+        final ConsumerGroupMetadata consumerGroupMetadata = 
Mockito.mock(ConsumerGroupMetadata.class);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        when(mainConsumer.poll(Mockito.any(Duration.class))).thenReturn(new 
ConsumerRecords<>(Map.of(), Map.of()));
+        when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
+                UUID.randomUUID(),
+                Optional.empty(),
+                Map.of(),
+                Map.of()
+        );
+        final Runnable shutdownErrorHook = mock(Runnable.class);
+
+        final Properties props = configProps(false, false, false);
+        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(
+                new TopologyMetadata(internalTopologyBuilder, new 
StreamsConfig(props)),
+                StreamsMetadataState.UNKNOWN_HOST,
+                new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
+        );
+        final StreamsConfig config = new StreamsConfig(props);
+        thread = new StreamThread(
+                new MockTime(1),
+                config,
+                null,
+                mainConsumer,
+                consumer,
+                changelogReader,
+                null,
+                mock(TaskManager.class),
+                null,
+                new StreamsMetricsImpl(metrics, CLIENT_ID, 
PROCESS_ID.toString(), mockTime),
+                new TopologyMetadata(internalTopologyBuilder, config),
+                PROCESS_ID,
+                CLIENT_ID,
+                new LogContext(""),
+                null,
+                new AtomicLong(Long.MAX_VALUE),
+                new LinkedList<>(),
+                shutdownErrorHook,
+                HANDLER,
+                null,
+                Optional.of(streamsRebalanceData),
+                streamsMetadataState
+        ).updateThreadMetadata(adminClientId(CLIENT_ID));
+
+        thread.setState(State.STARTING);
+        thread.runOnceWithoutProcessingThreads();
+
+        streamsRebalanceData.setStatuses(List.of(
+                new StreamsGroupHeartbeatResponseData.Status()
+                        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
+                        .setStatusDetail("Missing source topics")
+        ));
+        final MissingSourceTopicException exception = 
assertThrows(MissingSourceTopicException.class, () -> 
thread.runOnceWithoutProcessingThreads());
+        assertTrue(exception.getMessage().startsWith("Missing source topics"));
+    }
+
     @Test
     public void testStreamsProtocolRunOnceWithProcessingThreads() {
         final ConsumerGroupMetadata consumerGroupMetadata = 
Mockito.mock(ConsumerGroupMetadata.class);
@@ -3934,6 +3993,64 @@ public class StreamThreadTest {
         verify(shutdownErrorHook).run();
     }
 
+    @Test
+    public void 
testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic() {
+        final ConsumerGroupMetadata consumerGroupMetadata = 
Mockito.mock(ConsumerGroupMetadata.class);
+        
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+        when(mainConsumer.poll(Mockito.any(Duration.class))).thenReturn(new 
ConsumerRecords<>(Map.of(), Map.of()));
+        when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+        final StreamsRebalanceData streamsRebalanceData = new 
StreamsRebalanceData(
+                UUID.randomUUID(),
+                Optional.empty(),
+                Map.of(),
+                Map.of()
+        );
+
+        final Properties props = configProps(false, false, false);
+        final Runnable shutdownErrorHook = mock(Runnable.class);
+        final StreamsConfig config = new StreamsConfig(props);
+        final StreamsMetadataState streamsMetadataState = new 
StreamsMetadataState(
+                new TopologyMetadata(internalTopologyBuilder, config),
+                StreamsMetadataState.UNKNOWN_HOST,
+                new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
+        );
+        thread = new StreamThread(
+                new MockTime(1),
+                config,
+                null,
+                mainConsumer,
+                consumer,
+                changelogReader,
+                null,
+                mock(TaskManager.class),
+                null,
+                new StreamsMetricsImpl(metrics, CLIENT_ID, 
PROCESS_ID.toString(), mockTime),
+                new TopologyMetadata(internalTopologyBuilder, config),
+                PROCESS_ID,
+                CLIENT_ID,
+                new LogContext(""),
+                null,
+                new AtomicLong(Long.MAX_VALUE),
+                new LinkedList<>(),
+                shutdownErrorHook,
+                HANDLER,
+                null,
+                Optional.of(streamsRebalanceData),
+                streamsMetadataState
+        ).updateThreadMetadata(adminClientId(CLIENT_ID));
+
+        thread.setState(State.STARTING);
+        thread.runOnceWithProcessingThreads();
+
+        streamsRebalanceData.setStatuses(List.of(
+                new StreamsGroupHeartbeatResponseData.Status()
+                        
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
+                        .setStatusDetail("Missing source topics")
+        ));
+        final MissingSourceTopicException exception = 
assertThrows(MissingSourceTopicException.class, () -> 
thread.runOnceWithoutProcessingThreads());
+        assertTrue(exception.getMessage().startsWith("Missing source topics"));
+    }
+
     @Test
     public void testGetTopicPartitionInfo() {
         assertEquals(

Reply via email to