This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 8885da0d8b Always use segment partition id as stream partition id for
single stream (#15957)
8885da0d8b is described below
commit 8885da0d8b2ff8b3785b0ccfc0ba54cda31027c0
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Jun 6 15:04:57 2025 -0600
Always use segment partition id as stream partition id for single stream
(#15957)
---
.../realtime/PinotLLCRealtimeSegmentManager.java | 86 ++++++++++++++++-----
.../PinotLLCRealtimeSegmentManagerTest.java | 2 +-
.../realtime/RealtimeSegmentDataManager.java | 31 +++++---
.../stream/PartitionGroupConsumptionStatus.java | 28 +++----
.../spi/stream/PartitionGroupMetadataFetcher.java | 89 +++++++++++++++-------
.../pinot/spi/utils/IngestionConfigUtils.java | 25 +++---
6 files changed, 178 insertions(+), 83 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index d2445f3234..a7b04e2766 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -276,20 +276,49 @@ public class PinotLLCRealtimeSegmentManager {
}
// Create a {@link PartitionGroupConsumptionStatus} for each latest segment
- StreamPartitionMsgOffsetFactory offsetFactory =
-
StreamConsumerFactoryProvider.create(streamConfigs.get(0)).createStreamMsgOffsetFactory();
- for (Map.Entry<Integer, LLCSegmentName> entry :
partitionGroupIdToLatestSegment.entrySet()) {
- int partitionGroupId = entry.getKey();
- LLCSegmentName llcSegmentName = entry.getValue();
- SegmentZKMetadata segmentZKMetadata =
- getSegmentZKMetadata(streamConfigs.get(0).getTableNameWithType(),
llcSegmentName.getSegmentName());
- PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
- new PartitionGroupConsumptionStatus(partitionGroupId,
llcSegmentName.getSequenceNumber(),
- offsetFactory.create(segmentZKMetadata.getStartOffset()),
- segmentZKMetadata.getEndOffset() == null ? null :
offsetFactory.create(segmentZKMetadata.getEndOffset()),
- segmentZKMetadata.getStatus().toString());
- partitionGroupConsumptionStatusList.add(partitionGroupConsumptionStatus);
+ String tableNameWithType = streamConfigs.get(0).getTableNameWithType();
+ int numStreams = streamConfigs.size();
+ if (numStreams == 1) {
+ // Single stream
+ // NOTE: We skip partition id translation logic to handle cases where
custom stream might return partition id
+ // larger than 10000.
+ StreamConfig streamConfig = streamConfigs.get(0);
+ StreamPartitionMsgOffsetFactory offsetFactory =
+
StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory();
+ for (Map.Entry<Integer, LLCSegmentName> entry :
partitionGroupIdToLatestSegment.entrySet()) {
+ int partitionGroupId = entry.getKey();
+ LLCSegmentName llcSegmentName = entry.getValue();
+ SegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, llcSegmentName.getSegmentName());
+ PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
+ new PartitionGroupConsumptionStatus(partitionGroupId,
llcSegmentName.getSequenceNumber(),
+ offsetFactory.create(segmentZKMetadata.getStartOffset()),
+ segmentZKMetadata.getEndOffset() != null ?
offsetFactory.create(segmentZKMetadata.getEndOffset())
+ : null, segmentZKMetadata.getStatus().toString());
+
partitionGroupConsumptionStatusList.add(partitionGroupConsumptionStatus);
+ }
+ } else {
+ // Multiple streams
+ StreamPartitionMsgOffsetFactory[] offsetFactories = new
StreamPartitionMsgOffsetFactory[numStreams];
+ for (Map.Entry<Integer, LLCSegmentName> entry :
partitionGroupIdToLatestSegment.entrySet()) {
+ int partitionGroupId = entry.getKey();
+ int index =
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionGroupId);
+ int streamPartitionId =
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId);
+ LLCSegmentName llcSegmentName = entry.getValue();
+ SegmentZKMetadata segmentZKMetadata =
getSegmentZKMetadata(tableNameWithType, llcSegmentName.getSegmentName());
+ StreamPartitionMsgOffsetFactory offsetFactory = offsetFactories[index];
+ if (offsetFactory == null) {
+ offsetFactory =
StreamConsumerFactoryProvider.create(streamConfigs.get(index)).createStreamMsgOffsetFactory();
+ offsetFactories[index] = offsetFactory;
+ }
+ PartitionGroupConsumptionStatus partitionGroupConsumptionStatus =
+ new PartitionGroupConsumptionStatus(partitionGroupId,
streamPartitionId, llcSegmentName.getSequenceNumber(),
+ offsetFactory.create(segmentZKMetadata.getStartOffset()),
+ segmentZKMetadata.getEndOffset() != null ?
offsetFactory.create(segmentZKMetadata.getEndOffset())
+ : null, segmentZKMetadata.getStatus().toString());
+
partitionGroupConsumptionStatusList.add(partitionGroupConsumptionStatus);
+ }
}
+
return partitionGroupConsumptionStatusList;
}
@@ -995,18 +1024,37 @@ public class PinotLLCRealtimeSegmentManager {
Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState
idealState) {
Set<Integer> partitionIds = new HashSet<>();
boolean allPartitionIdsFetched = true;
- for (int i = 0; i < streamConfigs.size(); i++) {
- final int index = i;
+ int numStreams = streamConfigs.size();
+ if (numStreams == 1) {
+ // Single stream
+ // NOTE: We skip partition id translation logic to handle cases where
custom stream might return partition id
+ // larger than 10000.
+ StreamConfig streamConfig = streamConfigs.get(0);
try {
- partitionIds.addAll(getPartitionIds(streamConfigs.get(index)).stream()
- .map(partitionId ->
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId,
index))
- .collect(Collectors.toSet()));
+ partitionIds = getPartitionIds(streamConfig);
} catch (UnsupportedOperationException ignored) {
allPartitionIdsFetched = false;
// Stream does not support fetching partition ids. There is a log in
the fallback code which is sufficient
} catch (Exception e) {
allPartitionIdsFetched = false;
- LOGGER.warn("Failed to fetch partition ids for stream: {}",
streamConfigs.get(i).getTopicName(), e);
+ LOGGER.warn("Failed to fetch partition ids for stream: {}",
streamConfig.getTopicName(), e);
+ }
+ } else {
+ // Multiple streams
+ for (int i = 0; i < numStreams; i++) {
+ StreamConfig streamConfig = streamConfigs.get(i);
+ int index = i;
+ try {
+ partitionIds.addAll(getPartitionIds(streamConfig).stream()
+ .map(partitionId ->
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(partitionId,
index))
+ .collect(Collectors.toSet()));
+ } catch (UnsupportedOperationException ignored) {
+ allPartitionIdsFetched = false;
+ // Stream does not support fetching partition ids. There is a log in
the fallback code which is sufficient
+ } catch (Exception e) {
+ allPartitionIdsFetched = false;
+ LOGGER.warn("Failed to fetch partition ids for stream: {}",
streamConfig.getTopicName(), e);
+ }
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index 25c286d3a2..d15f87efbd 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -1367,7 +1367,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void testGetPartitionIds()
throws Exception {
List<StreamConfig> streamConfigs =
List.of(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs());
- IdealState idealState = new IdealState("table");
+ IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
segmentManager._numPartitions = 2;
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index c34a6e373e..114f968a11 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -293,12 +293,12 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private Thread _consumerThread;
// _partitionGroupId represents the Pinot's internal partition number which
will eventually be used as part of
// segment name.
- // _streamPatitionGroupId represents the partition number in the stream
topic, which could be derived from the
+ // _streamPartitionId represents the partition number in the stream topic,
which could be derived from the
// _partitionGroupId and identify which partition of the stream topic this
consumer is consuming from.
// Note that in traditional single topic ingestion mode, those two concepts
were identical which got separated
// in multi-topic ingestion mode.
private final int _partitionGroupId;
- private final int _streamPatitionGroupId;
+ private final int _streamPartitionId;
private final PartitionGroupConsumptionStatus
_partitionGroupConsumptionStatus;
final String _clientId;
private final TransformPipeline _transformPipeline;
@@ -1638,9 +1638,22 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
// TODO Validate configs
IndexingConfig indexingConfig = _tableConfig.getIndexingConfig();
_partitionGroupId = llcSegmentName.getPartitionGroupId();
- _streamPatitionGroupId =
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId);
- _streamConfig = new StreamConfig(_tableNameWithType,
IngestionConfigUtils.getStreamConfigMaps(_tableConfig)
-
.get(IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId)));
+ List<Map<String, String>> streamConfigMaps =
IngestionConfigUtils.getStreamConfigMaps(_tableConfig);
+ int numStreams = streamConfigMaps.size();
+ if (numStreams == 1) {
+ // Single stream
+ // NOTE: We skip partition id translation logic to handle cases where
custom stream might return partition id
+ // larger than 10000.
+ _streamPartitionId = _partitionGroupId;
+ _streamConfig = new StreamConfig(_tableNameWithType,
streamConfigMaps.get(0));
+ } else {
+ // Multiple streams
+ _streamPartitionId =
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(_partitionGroupId);
+ int index =
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(_partitionGroupId);
+ Preconditions.checkState(numStreams > index, "Cannot find stream config
of index: %s for table: %s", index,
+ _tableNameWithType);
+ _streamConfig = new StreamConfig(_tableNameWithType,
streamConfigMaps.get(index));
+ }
_streamConsumerFactory =
StreamConsumerFactoryProvider.create(_streamConfig);
_streamPartitionMsgOffsetFactory =
_streamConsumerFactory.createStreamMsgOffsetFactory();
String streamTopic = _streamConfig.getTopicName();
@@ -1655,9 +1668,9 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
String clientIdSuffix =
instanceDataManagerConfig != null ?
instanceDataManagerConfig.getConsumerClientIdSuffix() : null;
if (StringUtils.isNotBlank(clientIdSuffix)) {
- _clientId = _tableNameWithType + "-" + streamTopic + "-" +
_streamPatitionGroupId + "-" + clientIdSuffix;
+ _clientId = _tableNameWithType + "-" + streamTopic + "-" +
_streamPartitionId + "-" + clientIdSuffix;
} else {
- _clientId = _tableNameWithType + "-" + streamTopic + "-" +
_streamPatitionGroupId;
+ _clientId = _tableNameWithType + "-" + streamTopic + "-" +
_streamPartitionId;
}
_segmentLogger =
LoggerFactory.getLogger(RealtimeSegmentDataManager.class.getName() + "_" +
_segmentNameStr);
_tableStreamName = _tableNameWithType + "_" + streamTopic;
@@ -1977,8 +1990,8 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private void createPartitionMetadataProvider(String reason) {
closePartitionMetadataProvider();
_segmentLogger.info("Creating new partition metadata provider, reason:
{}", reason);
- _partitionMetadataProvider =
_streamConsumerFactory.createPartitionMetadataProvider(
- _clientId, _streamPatitionGroupId);
+ _partitionMetadataProvider =
+ _streamConsumerFactory.createPartitionMetadataProvider(_clientId,
_streamPartitionId);
}
private void updateIngestionMetrics(RowMetadata metadata) {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
index bc02df8462..d0405906cd 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupConsumptionStatus.java
@@ -18,49 +18,51 @@
*/
package org.apache.pinot.spi.stream;
-import org.apache.pinot.spi.utils.IngestionConfigUtils;
-
-
/**
* A PartitionGroup is a group of partitions/shards that the same consumer
should consume from.
* This class contains all information which describes the latest state of a
partition group.
* It is constructed by looking at the segment zk metadata of the latest
segment of each partition group.
* It consists of:
* 1. partitionGroupId - A unique ID for the partitionGroup
- * 2. sequenceNumber - The sequenceNumber this partitionGroup is currently at
- * 3. startOffset - The start offset that the latest segment started consuming
from
- * 4. endOffset - The endOffset (if segment consuming from this partition
group has finished consuming the segment
+ * 2. streamPartitionId - Partition ID of the stream that this partitionGroup
belongs to.
+ * 3. sequenceNumber - The sequenceNumber this partitionGroup is currently at
+ * 4. startOffset - The start offset that the latest segment started consuming
from
+ * 5. endOffset - The endOffset (if segment consuming from this partition
group has finished consuming the segment
* and recorded the end
* offset)
- * 5. status - the consumption status IN_PROGRESS/DONE
+ * 6. status - the consumption status IN_PROGRESS/DONE
*
* This information is needed by the stream, when grouping the
partitions/shards into new partition groups.
*/
public class PartitionGroupConsumptionStatus {
-
private final int _partitionGroupId;
- private final int _streamPartitionGroupId;
+ private final int _streamPartitionId;
private int _sequenceNumber;
private StreamPartitionMsgOffset _startOffset;
private StreamPartitionMsgOffset _endOffset;
private String _status;
- public PartitionGroupConsumptionStatus(int partitionGroupId, int
sequenceNumber, StreamPartitionMsgOffset startOffset,
- StreamPartitionMsgOffset endOffset, String status) {
+ public PartitionGroupConsumptionStatus(int partitionGroupId, int
streamPartitionId, int sequenceNumber,
+ StreamPartitionMsgOffset startOffset, StreamPartitionMsgOffset
endOffset, String status) {
_partitionGroupId = partitionGroupId;
- _streamPartitionGroupId =
IngestionConfigUtils.getStreamPartitionIdFromPinotPartitionId(partitionGroupId);
+ _streamPartitionId = streamPartitionId;
_sequenceNumber = sequenceNumber;
_startOffset = startOffset;
_endOffset = endOffset;
_status = status;
}
+ public PartitionGroupConsumptionStatus(int partitionGroupId, int
sequenceNumber, StreamPartitionMsgOffset startOffset,
+ StreamPartitionMsgOffset endOffset, String status) {
+ this(partitionGroupId, partitionGroupId, sequenceNumber, startOffset,
endOffset, status);
+ }
+
public int getPartitionGroupId() {
return _partitionGroupId;
}
public int getStreamPartitionGroupId() {
- return _streamPartitionGroupId;
+ return _streamPartitionId;
}
public int getSequenceNumber() {
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 53f0e33ed1..bf05ea0285 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -32,23 +32,19 @@ import org.slf4j.LoggerFactory;
* using the {@link StreamMetadataProvider}
*/
public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
-
private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
- private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList;
private final List<StreamConfig> _streamConfigs;
private final List<PartitionGroupConsumptionStatus>
_partitionGroupConsumptionStatusList;
- private Exception _exception;
- private final List<String> _topicNames;
private final boolean _forceGetOffsetFromStream;
+ private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList =
new ArrayList<>();
+
+ private Exception _exception;
public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs,
- List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList,
- boolean forceGetOffsetFromStream) {
- _topicNames =
streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.toList());
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList, boolean forceGetOffsetFromStream) {
_streamConfigs = streamConfigs;
_partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList;
- _newPartitionGroupMetadataList = new ArrayList<>();
_forceGetOffsetFromStream = forceGetOffsetFromStream;
}
@@ -69,39 +65,74 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
public Boolean call()
throws Exception {
_newPartitionGroupMetadataList.clear();
- for (int i = 0; i < _streamConfigs.size(); i++) {
- String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() +
"-"
- + _streamConfigs.get(i).getTableNameWithType() + "-" +
_topicNames.get(i);
- StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(_streamConfigs.get(i));
- final int index = i;
+ return _streamConfigs.size() == 1 ? fetchSingleStream() :
fetchMultipleStreams();
+ }
+
+ private Boolean fetchSingleStream()
+ throws Exception {
+ StreamConfig streamConfig = _streamConfigs.get(0);
+ String topicName = streamConfig.getTopicName();
+ String clientId =
+ PartitionGroupMetadataFetcher.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
+ + topicName;
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ try (StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(
+ StreamConsumerFactory.getUniqueClientId(clientId))) {
+
_newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId,
streamConfig,
+ _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000,
_forceGetOffsetFromStream));
+ if (_exception != null) {
+ // We had at least one failure, but succeeded now. Log an info
+ LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic
{}", topicName);
+ }
+ } catch (TransientConsumerException e) {
+ LOGGER.warn("Transient Exception: Could not get partition count for
topic {}", topicName, e);
+ _exception = e;
+ return Boolean.FALSE;
+ } catch (Exception e) {
+ LOGGER.warn("Could not get partition count for topic {}", topicName, e);
+ _exception = e;
+ throw e;
+ }
+ return Boolean.TRUE;
+ }
+
+ private Boolean fetchMultipleStreams()
+ throws Exception {
+ int numStreams = _streamConfigs.size();
+ for (int i = 0; i < numStreams; i++) {
+ StreamConfig streamConfig = _streamConfigs.get(i);
+ String topicName = streamConfig.getTopicName();
+ String clientId =
+ PartitionGroupMetadataFetcher.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
+ + topicName;
+ StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ int index = i;
List<PartitionGroupConsumptionStatus>
topicPartitionGroupConsumptionStatusList =
_partitionGroupConsumptionStatusList.stream()
- .filter(partitionGroupConsumptionStatus ->
-
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
- partitionGroupConsumptionStatus.getPartitionGroupId())
== index)
+ .filter(partitionGroupConsumptionStatus ->
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(
+ partitionGroupConsumptionStatus.getPartitionGroupId()) ==
index)
.collect(Collectors.toList());
- try (
- StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(
- StreamConsumerFactory.getUniqueClientId(clientId))) {
+ try (StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(
+ StreamConsumerFactory.getUniqueClientId(clientId))) {
_newPartitionGroupMetadataList.addAll(
-
streamMetadataProvider.computePartitionGroupMetadata(StreamConsumerFactory.getUniqueClientId(clientId),
- _streamConfigs.get(i),
- topicPartitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/15000, _forceGetOffsetFromStream).stream()
+ streamMetadataProvider.computePartitionGroupMetadata(clientId,
+ streamConfig, topicPartitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/15000,
+ _forceGetOffsetFromStream)
+ .stream()
.map(metadata -> new PartitionGroupMetadata(
-
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(
- metadata.getPartitionGroupId(), index),
- metadata.getStartOffset())).collect(Collectors.toList())
- );
+
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
+ index), metadata.getStartOffset()))
+ .collect(Collectors.toList()));
if (_exception != null) {
// We had at least one failure, but succeeded now. Log an info
- LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic
{}", _topicNames.get(i));
+ LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic
{}", topicName);
}
} catch (TransientConsumerException e) {
- LOGGER.warn("Transient Exception: Could not get partition count for
topic {}", _topicNames.get(i), e);
+ LOGGER.warn("Transient Exception: Could not get partition count for
topic {}", topicName, e);
_exception = e;
return Boolean.FALSE;
} catch (Exception e) {
- LOGGER.warn("Could not get partition count for topic {}",
_topicNames.get(i), e);
+ LOGGER.warn("Could not get partition count for topic {}", topicName,
e);
_exception = e;
throw e;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
index 99766db9e0..6e51b4e7be 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/IngestionConfigUtils.java
@@ -128,19 +128,20 @@ public final class IngestionConfigUtils {
* Fetches the streamConfig from the list of streamConfigs according to the
partition id.
*/
public static Map<String, String> getStreamConfigMap(TableConfig
tableConfig, int partitionId) {
- if (partitionId < PARTITION_PADDING_OFFSET) {
- return getFirstStreamConfigMap(tableConfig);
+ List<Map<String, String>> streamConfigMaps =
getStreamConfigMaps(tableConfig);
+ int numStreams = streamConfigMaps.size();
+ if (numStreams == 1) {
+ // Single stream
+ // NOTE: We skip partition id translation logic to handle cases where
custom stream might return partition id
+ // larger than 10000.
+ return streamConfigMaps.get(0);
+ } else {
+ // Multiple streams
+ int index = getStreamConfigIndexFromPinotPartitionId(partitionId);
+ Preconditions.checkState(numStreams > index, "Cannot find stream config
of index: %s for table: %s", index,
+ tableConfig.getTableName());
+ return streamConfigMaps.get(index);
}
-
- String tableNameWithType = tableConfig.getTableName();
- Preconditions.checkState(tableConfig.getTableType() == TableType.REALTIME,
- "Cannot fetch streamConfigs for OFFLINE table: %s", tableNameWithType);
- int index = getStreamConfigIndexFromPinotPartitionId(partitionId);
- IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
- Preconditions.checkState(ingestionConfig != null &&
ingestionConfig.getStreamIngestionConfig() != null
- &&
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps().size() > index,
- "Cannot find stream config of index: %s for table: %s", index,
tableNameWithType);
- return
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps().get(index);
}
public static List<AggregationConfig> getAggregationConfigs(TableConfig
tableConfig) {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]