This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new fe72e939e2 Make clientId to be unique for
PartitionGroupMetadataFetcher (#15393)
fe72e939e2 is described below
commit fe72e939e2230049a2b11fb199f69a313a5be7a6
Author: Xiang Fu <[email protected]>
AuthorDate: Mon Mar 31 23:40:13 2025 -0700
Make clientId to be unique for PartitionGroupMetadataFetcher (#15393)
---
.../core/realtime/PinotLLCRealtimeSegmentManager.java | 4 ++--
.../realtime/RealtimeConsumptionRateManager.java | 3 ++-
.../spi/stream/PartitionGroupMetadataFetcher.java | 19 ++++++++++---------
.../pinot/spi/stream/StreamConsumerFactory.java | 10 ++++++++++
.../pinot/spi/stream/StreamMetadataProvider.java | 2 +-
5 files changed, 25 insertions(+), 13 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 9e8bff5f5d..7d7393a06c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -998,9 +998,9 @@ public class PinotLLCRealtimeSegmentManager {
@VisibleForTesting
Set<Integer> getPartitionIds(StreamConfig streamConfig)
throws Exception {
- String clientId =
+ String clientId = StreamConsumerFactory.getUniqueClientId(
PinotLLCRealtimeSegmentManager.class.getSimpleName() + "-" +
streamConfig.getTableNameWithType() + "-"
- + streamConfig.getTopicName();
+ + streamConfig.getTopicName());
StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
try (StreamMetadataProvider metadataProvider =
consumerFactory.createStreamMetadataProvider(clientId)) {
return metadataProvider.fetchPartitionIds(5000L);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
index 0d59899c9e..153c22fd85 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeConsumptionRateManager.java
@@ -201,7 +201,8 @@ public class RealtimeConsumptionRateManager {
@VisibleForTesting
static final PartitionCountFetcher DEFAULT_PARTITION_COUNT_FETCHER =
streamConfig -> {
- String clientId = streamConfig.getTopicName() +
"-consumption.rate.manager";
+ String clientId =
+ StreamConsumerFactory.getUniqueClientId(streamConfig.getTopicName() +
"-consumption.rate.manager");
StreamConsumerFactory factory =
StreamConsumerFactoryProvider.create(streamConfig);
try (StreamMetadataProvider streamMetadataProvider =
factory.createStreamMetadataProvider(clientId)) {
return
streamMetadataProvider.fetchPartitionCount(/*maxWaitTimeMs*/10_000);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 158e28ce72..30cbe8bd63 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -78,15 +78,16 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
partitionGroupConsumptionStatus.getPartitionGroupId())
== index)
.collect(Collectors.toList());
try (
- StreamMetadataProvider streamMetadataProvider =
- streamConsumerFactory.createStreamMetadataProvider(clientId)) {
-
_newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId,
- _streamConfigs.get(i),
- topicPartitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/15000).stream().map(
- metadata -> new PartitionGroupMetadata(
- IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(
- metadata.getPartitionGroupId(), index),
- metadata.getStartOffset())).collect(Collectors.toList())
+ StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(
+ StreamConsumerFactory.getUniqueClientId(clientId))) {
+ _newPartitionGroupMetadataList.addAll(
+
streamMetadataProvider.computePartitionGroupMetadata(StreamConsumerFactory.getUniqueClientId(clientId),
+ _streamConfigs.get(i),
+ topicPartitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/15000).stream().map(
+ metadata -> new PartitionGroupMetadata(
+
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(
+ metadata.getPartitionGroupId(), index),
+ metadata.getStartOffset())).collect(Collectors.toList())
);
if (_exception != null) {
// We had at least one failure, but succeeded now. Log an info
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
index a8c4d22cc3..242cc9491b 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamConsumerFactory.java
@@ -19,12 +19,15 @@
package org.apache.pinot.spi.stream;
import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Factory for a stream which provides a consumer and a metadata provider for
the stream
*/
public abstract class StreamConsumerFactory {
+ private static final AtomicInteger CLIENT_ID_SEQ = new AtomicInteger(0);
+
protected StreamConfig _streamConfig;
/**
@@ -72,4 +75,11 @@ public abstract class StreamConsumerFactory {
String groupId) {
throw new UnsupportedOperationException();
}
+
+ public static String getUniqueClientId(String prefix) {
+ if (prefix == null) {
+ return String.valueOf(CLIENT_ID_SEQ.getAndIncrement());
+ }
+ return prefix + "-" + CLIENT_ID_SEQ.getAndIncrement();
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
index 052993a6d0..64770d3f83 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadataProvider.java
@@ -89,7 +89,7 @@ public interface StreamMetadataProvider extends Closeable {
StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
for (int i = partitionGroupConsumptionStatuses.size(); i < partitionCount;
i++) {
try (StreamMetadataProvider partitionMetadataProvider =
streamConsumerFactory.createPartitionMetadataProvider(
- clientId, i)) {
+ StreamConsumerFactory.getUniqueClientId(clientId), i)) {
StreamPartitionMsgOffset streamPartitionMsgOffset =
partitionMetadataProvider.fetchStreamPartitionOffset(streamConfig.getOffsetCriteria(),
timeoutMillis);
newPartitionGroupMetadataList.add(new PartitionGroupMetadata(i,
streamPartitionMsgOffset));
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]