This is an automated email from the ASF dual-hosted git repository.
lucasbru pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new c625b44d8c5 MINOR: Throw exceptions if source topic is missing (#20123)
c625b44d8c5 is described below
commit c625b44d8c587db19c58e62c6428f060ca45decb
Author: Jinhe Zhang <[email protected]>
AuthorDate: Wed Jul 9 15:19:12 2025 -0400
MINOR: Throw exceptions if source topic is missing (#20123)
In the old protocol, Kafka Streams used to throw a
`MissingSourceTopicException` when a source topic is missing. In the new
protocol, it doesn’t do that anymore, while only log the status that is
returned from the broker, which contains a status that indicates that a
source topic is missing.
This change:
1. Throws an `MissingSourceTopicException` when source topic is missing
2. Adds unit tests
3. Modifies integration tests to fit both old and new protocols
Reviewers: Lucas Brutschy <[email protected]>
---
...HandlingSourceTopicDeletionIntegrationTest.java | 14 ++-
.../JoinWithIncompleteMetadataIntegrationTest.java | 14 ++-
.../streams/processor/internals/StreamThread.java | 5 +
.../processor/internals/StreamThreadTest.java | 117 +++++++++++++++++++++
4 files changed, 144 insertions(+), 6 deletions(-)
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
index b7673cd882b..92da39227f1 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/HandlingSourceTopicDeletionIntegrationTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.integration;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KafkaStreams.State;
import org.apache.kafka.streams.StreamsBuilder;
@@ -33,10 +34,12 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestInfo;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -75,8 +78,9 @@ public class HandlingSourceTopicDeletionIntegrationTest {
CLUSTER.deleteTopics(INPUT_TOPIC, OUTPUT_TOPIC);
}
- @Test
- public void shouldThrowErrorAfterSourceTopicDeleted(final TestInfo
testName) throws InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void shouldThrowErrorAfterSourceTopicDeleted(final boolean
useNewProtocol, final TestInfo testName) throws InterruptedException {
final StreamsBuilder builder = new StreamsBuilder();
builder.stream(INPUT_TOPIC, Consumed.with(Serdes.Integer(),
Serdes.String()))
.to(OUTPUT_TOPIC, Produced.with(Serdes.Integer(),
Serdes.String()));
@@ -91,6 +95,10 @@ public class HandlingSourceTopicDeletionIntegrationTest {
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
Serdes.String().getClass());
streamsConfiguration.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG,
NUM_THREADS);
streamsConfiguration.put(StreamsConfig.METADATA_MAX_AGE_CONFIG, 2000);
+
+ if (useNewProtocol) {
+ streamsConfiguration.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+ }
final Topology topology = builder.build();
final KafkaStreams kafkaStreams1 = new KafkaStreams(topology,
streamsConfiguration);
diff --git
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
index 413799042e7..04f35dcfce4 100644
---
a/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
+++
b/streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/JoinWithIncompleteMetadataIntegrationTest.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.integration;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
+import org.apache.kafka.streams.GroupProtocol;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
@@ -32,10 +33,12 @@ import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
-import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
import java.io.IOException;
+import java.util.Locale;
import java.util.Properties;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -87,10 +90,15 @@ public class JoinWithIncompleteMetadataIntegrationTest {
IntegrationTestUtils.purgeLocalStreamsState(STREAMS_CONFIG);
}
- @Test
- public void testShouldAutoShutdownOnJoinWithIncompleteMetadata() throws
InterruptedException {
+ @ParameterizedTest
+ @ValueSource(booleans = {false, true})
+ public void testShouldAutoShutdownOnJoinWithIncompleteMetadata(final
boolean useNewProtocol) throws InterruptedException {
STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
STREAMS_CONFIG.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
CLUSTER.bootstrapServers());
+
+ if (useNewProtocol) {
+ STREAMS_CONFIG.put(StreamsConfig.GROUP_PROTOCOL_CONFIG,
GroupProtocol.STREAMS.name().toLowerCase(Locale.getDefault()));
+ }
final KStream<Long, String> notExistStream =
builder.stream(NON_EXISTENT_INPUT_TOPIC_LEFT);
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
index 86e12bf3f65..85700074a77 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java
@@ -52,6 +52,7 @@ import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.TaskMetadata;
import org.apache.kafka.streams.ThreadMetadata;
+import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
@@ -1536,6 +1537,10 @@ public class StreamThread extends Thread implements
ProcessingThread {
for (final StreamsGroupHeartbeatResponseData.Status status :
streamsRebalanceData.get().statuses()) {
if (status.statusCode() ==
StreamsGroupHeartbeatResponse.Status.SHUTDOWN_APPLICATION.code()) {
shutdownErrorHook.run();
+ } else if (status.statusCode() ==
StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code()) {
+ final String errorMsg = String.format("Missing source
topics: %s", status.statusDetail());
+ log.error(errorMsg);
+ throw new MissingSourceTopicException(errorMsg);
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
index 1b4143eceee..603cbe55b0e 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
@@ -66,6 +66,7 @@ import org.apache.kafka.streams.StreamsConfig.InternalConfig;
import org.apache.kafka.streams.ThreadMetadata;
import org.apache.kafka.streams.TopologyConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
+import org.apache.kafka.streams.errors.MissingSourceTopicException;
import org.apache.kafka.streams.errors.StreamsException;
import org.apache.kafka.streams.errors.TaskCorruptedException;
import org.apache.kafka.streams.errors.TaskMigratedException;
@@ -3875,6 +3876,64 @@ public class StreamThreadTest {
verify(shutdownErrorHook).run();
}
+ @Test
+ public void
testStreamsProtocolRunOnceWithoutProcessingThreadsMissingSourceTopic() {
+ final ConsumerGroupMetadata consumerGroupMetadata =
Mockito.mock(ConsumerGroupMetadata.class);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+ when(mainConsumer.poll(Mockito.any(Duration.class))).thenReturn(new
ConsumerRecords<>(Map.of(), Map.of()));
+ when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
+ UUID.randomUUID(),
+ Optional.empty(),
+ Map.of(),
+ Map.of()
+ );
+ final Runnable shutdownErrorHook = mock(Runnable.class);
+
+ final Properties props = configProps(false, false, false);
+ final StreamsMetadataState streamsMetadataState = new
StreamsMetadataState(
+ new TopologyMetadata(internalTopologyBuilder, new
StreamsConfig(props)),
+ StreamsMetadataState.UNKNOWN_HOST,
+ new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
+ );
+ final StreamsConfig config = new StreamsConfig(props);
+ thread = new StreamThread(
+ new MockTime(1),
+ config,
+ null,
+ mainConsumer,
+ consumer,
+ changelogReader,
+ null,
+ mock(TaskManager.class),
+ null,
+ new StreamsMetricsImpl(metrics, CLIENT_ID,
PROCESS_ID.toString(), mockTime),
+ new TopologyMetadata(internalTopologyBuilder, config),
+ PROCESS_ID,
+ CLIENT_ID,
+ new LogContext(""),
+ null,
+ new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
+ shutdownErrorHook,
+ HANDLER,
+ null,
+ Optional.of(streamsRebalanceData),
+ streamsMetadataState
+ ).updateThreadMetadata(adminClientId(CLIENT_ID));
+
+ thread.setState(State.STARTING);
+ thread.runOnceWithoutProcessingThreads();
+
+ streamsRebalanceData.setStatuses(List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
+ .setStatusDetail("Missing source topics")
+ ));
+ final MissingSourceTopicException exception =
assertThrows(MissingSourceTopicException.class, () ->
thread.runOnceWithoutProcessingThreads());
+ assertTrue(exception.getMessage().startsWith("Missing source topics"));
+ }
+
@Test
public void testStreamsProtocolRunOnceWithProcessingThreads() {
final ConsumerGroupMetadata consumerGroupMetadata =
Mockito.mock(ConsumerGroupMetadata.class);
@@ -3934,6 +3993,64 @@ public class StreamThreadTest {
verify(shutdownErrorHook).run();
}
+ @Test
+ public void
testStreamsProtocolRunOnceWithProcessingThreadsMissingSourceTopic() {
+ final ConsumerGroupMetadata consumerGroupMetadata =
Mockito.mock(ConsumerGroupMetadata.class);
+
when(consumerGroupMetadata.groupInstanceId()).thenReturn(Optional.empty());
+ when(mainConsumer.poll(Mockito.any(Duration.class))).thenReturn(new
ConsumerRecords<>(Map.of(), Map.of()));
+ when(mainConsumer.groupMetadata()).thenReturn(consumerGroupMetadata);
+ final StreamsRebalanceData streamsRebalanceData = new
StreamsRebalanceData(
+ UUID.randomUUID(),
+ Optional.empty(),
+ Map.of(),
+ Map.of()
+ );
+
+ final Properties props = configProps(false, false, false);
+ final Runnable shutdownErrorHook = mock(Runnable.class);
+ final StreamsConfig config = new StreamsConfig(props);
+ final StreamsMetadataState streamsMetadataState = new
StreamsMetadataState(
+ new TopologyMetadata(internalTopologyBuilder, config),
+ StreamsMetadataState.UNKNOWN_HOST,
+ new LogContext(String.format("stream-client [%s] ", CLIENT_ID))
+ );
+ thread = new StreamThread(
+ new MockTime(1),
+ config,
+ null,
+ mainConsumer,
+ consumer,
+ changelogReader,
+ null,
+ mock(TaskManager.class),
+ null,
+ new StreamsMetricsImpl(metrics, CLIENT_ID,
PROCESS_ID.toString(), mockTime),
+ new TopologyMetadata(internalTopologyBuilder, config),
+ PROCESS_ID,
+ CLIENT_ID,
+ new LogContext(""),
+ null,
+ new AtomicLong(Long.MAX_VALUE),
+ new LinkedList<>(),
+ shutdownErrorHook,
+ HANDLER,
+ null,
+ Optional.of(streamsRebalanceData),
+ streamsMetadataState
+ ).updateThreadMetadata(adminClientId(CLIENT_ID));
+
+ thread.setState(State.STARTING);
+ thread.runOnceWithProcessingThreads();
+
+ streamsRebalanceData.setStatuses(List.of(
+ new StreamsGroupHeartbeatResponseData.Status()
+
.setStatusCode(StreamsGroupHeartbeatResponse.Status.MISSING_SOURCE_TOPICS.code())
+ .setStatusDetail("Missing source topics")
+ ));
+ final MissingSourceTopicException exception =
assertThrows(MissingSourceTopicException.class, () ->
thread.runOnceWithoutProcessingThreads());
+ assertTrue(exception.getMessage().startsWith("Missing source topics"));
+ }
+
@Test
public void testGetTopicPartitionInfo() {
assertEquals(