This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8fb30362205 Fix multi-topic ingestion halt when one Kafka topic is
deleted (#18560)
8fb30362205 is described below
commit 8fb30362205fa382eb1677e15b0c7788d491f0e1
Author: Rekha Seethamraju <[email protected]>
AuthorDate: Wed May 27 01:30:55 2026 -0700
Fix multi-topic ingestion halt when one Kafka topic is deleted (#18560)
* Fix multi-topic ingestion halt when one Kafka topic is deleted
In a multi-topic table, if one Kafka topic becomes inaccessible (e.g.
deleted externally while a backfill topic is still registered in the
table config), PartitionGroupMetadataFetcher.fetchMultipleStreams() was
re-throwing the exception from computePartitionGroupMetadata(). This
caused the entire getStreamMetadataList() call to fail, which in turn
caused RealtimeSegmentValidationManager.ensureAllPartitionsConsuming()
to throw, halting ingestion repair for all topics on the table — not
just the inaccessible one.
Fix: catch permanent (non-transient) exceptions per-topic in
fetchMultipleStreams(), log and record the failed topic name in
_failedTopics, and continue fetching the remaining topics. The caller
(PinotTableIdealStateBuilder) reads getFailedTopics() after the fetch
and emits a PARTITION_GROUP_METADATA_FETCH_ERROR metric tagged with the
topic name for each failure, so operators can detect the inaccessible
topic. Healthy topics proceed normally through
ensureAllPartitionsConsuming().
TransientConsumerException handling is unchanged — transient failures
still return false to trigger a retry via the retry policy.
* Removed failed topics since we don't need to store it anymore
* cleaning up failed topics
---
.../kafka30/KafkaStreamMetadataProvider.java | 3 +-
.../kafka40/KafkaStreamMetadataProvider.java | 3 +-
.../spi/stream/PartitionGroupMetadataFetcher.java | 6 +
.../stream/PartitionGroupMetadataFetcherTest.java | 128 +++++++++++++++++++++
4 files changed, 138 insertions(+), 2 deletions(-)
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
index 18dcd125eab..ebc0a044ccd 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-3.0/src/main/java/org/apache/pinot/plugin/stream/kafka30/KafkaStreamMetadataProvider.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.PermanentConsumerException;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -384,7 +385,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
if (lastError != null) {
if (topicMissing) {
- throw new RuntimeException("Topic does not exist: " + _topic);
+ throw new PermanentConsumerException(new RuntimeException("Topic does
not exist: " + _topic));
}
if (lastError instanceof TransientConsumerException) {
throw (TransientConsumerException) lastError;
diff --git
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
index bd712ac6fb6..4a9ff545b67 100644
---
a/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
+++
b/pinot-plugins/pinot-stream-ingestion/pinot-kafka-4.0/src/main/java/org/apache/pinot/plugin/stream/kafka40/KafkaStreamMetadataProvider.java
@@ -48,6 +48,7 @@ import org.apache.pinot.spi.stream.OffsetCriteria;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionLagState;
+import org.apache.pinot.spi.stream.PermanentConsumerException;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
@@ -382,7 +383,7 @@ public class KafkaStreamMetadataProvider extends
KafkaPartitionLevelConnectionHa
if (lastError != null) {
if (topicMissing) {
- throw new RuntimeException("Topic does not exist: " + _topic);
+ throw new PermanentConsumerException(new RuntimeException("Topic does
not exist: " + _topic));
}
if (lastError instanceof TransientConsumerException) {
throw (TransientConsumerException) lastError;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 2e0443228d6..edf6d6ccdd4 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -151,6 +151,12 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
LOGGER.warn("Transient Exception: Could not get StreamMetadata for
topic {}", topicName, e);
_exception = e;
return Boolean.FALSE;
+ } catch (PermanentConsumerException e) {
+ // A confirmed-permanent failure (e.g. topic deleted from Kafka) must
not block metadata
+ // fetching for the remaining healthy topics in a multi-topic table.
Log, record, and
+ // continue — the caller emits a metric and healthy topics proceed
normally.
+ LOGGER.warn("Permanent failure fetching StreamMetadata for topic {},
skipping in multi-topic fetch",
+ topicName, e);
} catch (Exception e) {
LOGGER.warn("Could not get StreamMetadata for topic {}", topicName, e);
_exception = e;
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
index 1c229b602da..3a175ef7fcd 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
@@ -366,6 +366,134 @@ public class PartitionGroupMetadataFetcherTest {
}
}
+ /**
+ * When one topic in a multi-topic table is inaccessible (e.g. deleted from
Kafka), the fetcher must
+ * continue fetching metadata for the remaining topics and return partial
results rather than re-throwing
+ * and killing ingestion for all healthy topics.
+ */
+ @Test
+ public void testFetchMultipleStreamsOneTopicPermanentFailure()
+ throws Exception {
+ StreamConfig streamConfig1 = createMockStreamConfig("topic1",
"test-table_REALTIME", false);
+ StreamConfig streamConfig2 = createMockStreamConfig("topic2-deleted",
"test-table_REALTIME", false);
+ List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1,
streamConfig2);
+
+ StreamPartitionMsgOffset offset = mock(StreamPartitionMsgOffset.class);
+ PartitionGroupMetadata metadata = new PartitionGroupMetadata(0, offset);
+
+ // topic1 succeeds, topic2 throws a permanent (non-transient) exception
+ StreamMetadataProvider goodProvider = mock(StreamMetadataProvider.class);
+ when(goodProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(),
anyBoolean())).thenReturn(Collections.singletonList(metadata));
+
+ StreamMetadataProvider badProvider = mock(StreamMetadataProvider.class);
+ when(badProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(), anyBoolean()))
+ .thenThrow(new PermanentConsumerException(new RuntimeException("Topic
does not exist")));
+
+ StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+ when(factory.createStreamMetadataProvider(anyString()))
+ .thenReturn(goodProvider) // called for topic1
+ .thenReturn(badProvider); // called for topic2
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider =
Mockito.mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+ mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+ PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, Collections.emptyList(), Collections.emptyList(),
false);
+
+ Boolean result = fetcher.call();
+
+ // Fetch succeeds overall — only topic1's metadata is returned, topic2
is silently skipped
+ Assert.assertTrue(result);
+ Assert.assertEquals(fetcher.getStreamMetadataList().size(), 1);
+
Assert.assertEquals(fetcher.getStreamMetadataList().get(0).getNumPartitions(),
1);
+ Assert.assertNull(fetcher.getException());
+ }
+ }
+
+ @Test
+ public void testFetchMultipleStreamsFailedTopicDoesNotBlockOthersOnRetry()
+ throws Exception {
+ StreamConfig streamConfig1 = createMockStreamConfig("topic1",
"test-table_REALTIME", false);
+ StreamConfig streamConfig2 = createMockStreamConfig("topic2",
"test-table_REALTIME", false);
+ List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1,
streamConfig2);
+
+ StreamPartitionMsgOffset offset = mock(StreamPartitionMsgOffset.class);
+ PartitionGroupMetadata metadata = new PartitionGroupMetadata(0, offset);
+
+ // First call: topic2 throws. Second call: both succeed.
+ StreamMetadataProvider provider = mock(StreamMetadataProvider.class);
+ when(provider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(), anyBoolean()))
+ .thenReturn(Collections.singletonList(metadata)) // topic1
first call
+ .thenThrow(new PermanentConsumerException(new RuntimeException("Topic
does not exist"))) // topic2 first call
+ .thenReturn(Collections.singletonList(metadata)) // topic1
second call
+ .thenReturn(Collections.singletonList(metadata)); // topic2
second call
+
+ StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+
when(factory.createStreamMetadataProvider(anyString())).thenReturn(provider);
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider =
Mockito.mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+ mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+ PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, Collections.emptyList(), Collections.emptyList(),
false);
+
+ // First call: topic2 fails — only topic1's metadata returned, no
exception
+ Boolean result1 = fetcher.call();
+ Assert.assertTrue(result1);
+ Assert.assertEquals(fetcher.getStreamMetadataList().size(), 1);
+ Assert.assertNull(fetcher.getException());
+
+ // Second call: both succeed
+ Boolean result2 = fetcher.call();
+ Assert.assertTrue(result2);
+ Assert.assertEquals(fetcher.getStreamMetadataList().size(), 2);
+ }
+ }
+
+ @Test
+ public void testFetchMultipleStreamsNonPermanentExceptionStillPropagates()
+ throws Exception {
+ StreamConfig streamConfig1 = createMockStreamConfig("topic1",
"test-table_REALTIME", false);
+ StreamConfig streamConfig2 = createMockStreamConfig("topic2",
"test-table_REALTIME", false);
+ List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1,
streamConfig2);
+
+ StreamMetadataProvider goodProvider = mock(StreamMetadataProvider.class);
+ when(goodProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(), anyBoolean()))
+ .thenReturn(Collections.singletonList(new PartitionGroupMetadata(0,
mock(StreamPartitionMsgOffset.class))));
+
+ // Generic RuntimeException (not PermanentConsumerException) — auth error,
NPE, etc.
+ StreamMetadataProvider badProvider = mock(StreamMetadataProvider.class);
+ when(badProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(), anyBoolean()))
+ .thenThrow(new RuntimeException("Auth failure"));
+
+ StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+ when(factory.createStreamMetadataProvider(anyString()))
+ .thenReturn(goodProvider)
+ .thenReturn(badProvider);
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider =
Mockito.mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+ mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+ PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, Collections.emptyList(), Collections.emptyList(),
false);
+
+ try {
+ fetcher.call();
+ Assert.fail("Expected RuntimeException to propagate");
+ } catch (RuntimeException e) {
+ Assert.assertEquals(e.getMessage(), "Auth failure");
+ }
+ }
+ }
+
private StreamConfig createMockStreamConfig(String topicName, String
tableName, boolean isEphemeral) {
StreamConfig streamConfig = mock(StreamConfig.class);
when(streamConfig.getTopicName()).thenReturn(topicName);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]