This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch hotfix-kafka-duplicate-metric-name in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit e5f838b4922e26bf50c61eda35571ac7b138ba37 Author: Jiapeng Tao <[email protected]> AuthorDate: Wed Apr 28 20:52:34 2021 -0700 Add reason of creation to StreamMetadataProvider name. (#6862) --- .../helix/core/PinotTableIdealStateBuilder.java | 5 +++-- .../core/realtime/PinotLLCRealtimeSegmentManager.java | 17 +++++++++++------ .../realtime/PinotLLCRealtimeSegmentManagerTest.java | 9 +++++---- .../pinot/spi/stream/PartitionGroupMetadataFetcher.java | 17 +++++++++++++++-- 4 files changed, 34 insertions(+), 14 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 28a9bd3..a98fdfd 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -148,11 +148,12 @@ public class PinotTableIdealStateBuilder { * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current partition groups. * The size of this list is equal to the number of partition groups, * and is created using the latest segment zk metadata. + * @param reason the reason to get partition group metadata */ public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, - List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList, String reason) { PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = - new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList); + new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList, reason); try { DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 2e1dcc3..b8eca7c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -75,6 +75,7 @@ import org.apache.pinot.spi.filesystem.PinotFSFactory; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; import org.apache.pinot.spi.stream.PartitionLevelStreamConfig; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConfigProperties; @@ -257,7 +258,8 @@ public class PinotLLCRealtimeSegmentManager { PartitionLevelStreamConfig streamConfig = new PartitionLevelStreamConfig(tableConfig.getTableName(), IngestionConfigUtils.getStreamConfigMap(tableConfig)); InstancePartitions instancePartitions = getConsumingInstancePartitions(tableConfig); - List<PartitionGroupMetadata> newPartitionGroupMetadataList = getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList()); + List<PartitionGroupMetadata> newPartitionGroupMetadataList = + getNewPartitionGroupMetadataList(streamConfig, Collections.emptyList(), PartitionGroupMetadataFetcher.Reason.TABLE_CREATION.name()); int numPartitionGroups = newPartitionGroupMetadataList.size(); int numReplicas = getNumReplicas(tableConfig, instancePartitions); @@ -502,7 +504,8 @@ public class PinotLLCRealtimeSegmentManager { // Fetches new partition groups, given current list of {@link PartitionGroupConsumptionStatus}. List<PartitionGroupMetadata> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); + getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList, + PartitionGroupMetadataFetcher.Reason.SEGMENT_COMMITMENT.name() + "-" + committingSegmentPartitionGroupId); Set<Integer> newPartitionGroupSet = newPartitionGroupMetadataList.stream().map(PartitionGroupMetadata::getPartitionGroupId).collect(Collectors.toSet()); int numPartitionGroups = newPartitionGroupMetadataList.size(); @@ -705,9 +708,9 @@ public class PinotLLCRealtimeSegmentManager { */ @VisibleForTesting List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig, - List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) { + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList, String reason) { return PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfig, - currentPartitionGroupConsumptionStatusList); + currentPartitionGroupConsumptionStatusList, reason); } /** @@ -813,7 +816,8 @@ public class PinotLLCRealtimeSegmentManager { List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList = getPartitionGroupConsumptionStatusList(idealState, streamConfig); List<PartitionGroupMetadata> newPartitionGroupMetadataList = - getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); + getNewPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList, + PartitionGroupMetadataFetcher.Reason.PERIODIC_SEGMENT_VALIDATION.name()); return ensureAllPartitionsConsuming(tableConfig, streamConfig, idealState, newPartitionGroupMetadataList); } else { @@ -1134,7 +1138,8 @@ public class PinotLLCRealtimeSegmentManager { StreamConfig smallestOffsetCriteriaStreamConfig = new StreamConfig(streamConfig.getTableNameWithType(), streamConfigMapWithSmallestOffsetCriteria); List<PartitionGroupMetadata> smallestOffsetCriteriaPartitionGroupMetadata = - getNewPartitionGroupMetadataList(smallestOffsetCriteriaStreamConfig, Collections.emptyList()); + getNewPartitionGroupMetadataList(smallestOffsetCriteriaStreamConfig, Collections.emptyList(), + PartitionGroupMetadataFetcher.Reason.PERIODIC_PARTITION_GROUP_SMALLEST_OFFSET_FETCHER.name() + "-" + partitionGroupId); StreamPartitionMsgOffset partitionStartOffset = null; for (PartitionGroupMetadata info : smallestOffsetCriteriaPartitionGroupMetadata) { if (info.getPartitionGroupId() == partitionGroupId) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index c3d58da..3db14e7 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -254,7 +254,7 @@ public class PinotLLCRealtimeSegmentManagerTest { // committing segment's partitionGroupId no longer in the newPartitionGroupMetadataList List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 = - segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, Collections.emptyList()); + segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, Collections.emptyList(), "TEST_TABLE_CREATION"); partitionGroupMetadataListWithout0.remove(0); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout0; @@ -565,7 +565,8 @@ public class PinotLLCRealtimeSegmentManagerTest { */ // 1 reached end of shard. List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 = - segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, Collections.emptyList()); + segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfig, Collections.emptyList(), + "TEST_TABLE_CREATION"); partitionGroupMetadataListWithout1.remove(1); segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout1; // noop @@ -962,7 +963,7 @@ public class PinotLLCRealtimeSegmentManagerTest { public void ensureAllPartitionsConsuming() { ensureAllPartitionsConsuming(_tableConfig, _streamConfig, _idealState, - getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList())); + getNewPartitionGroupMetadataList(_streamConfig, Collections.emptyList(), "TEST_PERIODIC_SEGMENT_VALIDATION")); } @Override @@ -1029,7 +1030,7 @@ public class PinotLLCRealtimeSegmentManagerTest { @Override List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig, - List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) { + List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList, String reason) { if (_partitionGroupMetadataList != null) { return _partitionGroupMetadataList; } else { diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java index 6cc74ce..8439462 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java @@ -20,6 +20,7 @@ package org.apache.pinot.spi.stream; import java.util.List; import java.util.concurrent.Callable; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,18 +33,27 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { private static final Logger LOGGER = LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class); + public enum Reason { + TABLE_CREATION, SEGMENT_COMMITMENT, PERIODIC_SEGMENT_VALIDATION, PERIODIC_PARTITION_GROUP_SMALLEST_OFFSET_FETCHER + } + private List<PartitionGroupMetadata> _newPartitionGroupMetadataList; private final StreamConfig _streamConfig; private final List<PartitionGroupConsumptionStatus> _partitionGroupConsumptionStatusList; private final StreamConsumerFactory _streamConsumerFactory; private Exception _exception; private final String _topicName; + private final String _reason; - public PartitionGroupMetadataFetcher(StreamConfig streamConfig, List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { + public PartitionGroupMetadataFetcher( + StreamConfig streamConfig, + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList, + String reason) { _streamConsumerFactory = StreamConsumerFactoryProvider.create(streamConfig); _topicName = streamConfig.getTopicName(); _streamConfig = streamConfig; _partitionGroupConsumptionStatusList = partitionGroupConsumptionStatusList; + _reason = reason; } public List<PartitionGroupMetadata> getPartitionGroupMetadataList() { @@ -61,7 +71,10 @@ public class PartitionGroupMetadataFetcher implements Callable<Boolean> { @Override public Boolean call() throws Exception { - String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + "-" + _topicName; + String clientId = PartitionGroupMetadataFetcher.class.getSimpleName() + + "-" + _topicName + + "-" + TableNameBuilder.extractRawTableName(_streamConfig.getTableNameWithType()) + + "-" + _reason; try ( StreamMetadataProvider streamMetadataProvider = _streamConsumerFactory.createStreamMetadataProvider(clientId)) { _newPartitionGroupMetadataList = streamMetadataProvider --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
