This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch full-auto-oss-abstraction in repository https://gitbox.apache.org/repos/asf/pinot.git
commit f0e458d1baec2deb7084a2f37afcee3ee586ceac Author: jlli_LinkedIn <j...@linkedin.com> AuthorDate: Fri Mar 22 16:00:43 2024 -0700 Extract methods for Pinot table ideal state --- .../pinot/controller/BaseControllerStarter.java | 4 + .../helix/core/PinotHelixResourceManager.java | 14 +- .../helix/core/PinotTableIdealStateHelper.java | 145 --------------------- .../DefaultPinotTableIdealStateHelper.java | 76 +++++++++++ .../FullAutoPinotTableIdealStateHelper.java | 57 ++++++++ .../PinotTableIdealStateHelper.java | 57 ++++++++ .../PinotTableIdealStateHelperFactory.java | 26 ++++ .../realtime/MissingConsumingSegmentFinder.java | 4 +- .../realtime/PinotLLCRealtimeSegmentManager.java | 4 +- .../helix/core/retention/RetentionManagerTest.java | 10 +- 10 files changed, 236 insertions(+), 161 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java index 281c397401..c509738fca 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java @@ -88,6 +88,7 @@ import org.apache.pinot.controller.helix.RealtimeConsumerMonitor; import org.apache.pinot.controller.helix.SegmentStatusChecker; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; import org.apache.pinot.controller.helix.core.cleanup.StaleInstancesCleanupTask; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager; import org.apache.pinot.controller.helix.core.minion.PinotTaskManager; import org.apache.pinot.controller.helix.core.minion.TaskMetricsEmitter; @@ -245,6 +246,9 @@ public abstract class BaseControllerStarter implements ServiceStartable { _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager, _tenantRebalanceExecutorService); } + // Initialize the ideal state helper for Pinot tables. + PinotTableIdealStateHelperFactory.init(_config); + // Initialize the table config tuner registry. TableConfigTunerRegistry.init(_config.getTableConfigTunerPackages()); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 57c75d7618..1fa664b062 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -142,6 +142,8 @@ import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelper; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.controller.helix.core.lineage.LineageManager; import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; @@ -232,11 +234,14 @@ public class PinotHelixResourceManager { private SegmentDeletionManager _segmentDeletionManager; private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; private TableCache _tableCache; + + private final PinotTableIdealStateHelper _pinotTableIdealStateHelper; private final LineageManager _lineageManager; public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, boolean isSingleTenantCluster, boolean enableBatchMessageMode, int deletedSegmentsRetentionInDays, - boolean enableTieredSegmentAssignment, LineageManager lineageManager) { + boolean enableTieredSegmentAssignment, PinotTableIdealStateHelper pinotTableIdealStateHelper, + LineageManager lineageManager) { _helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL); _helixClusterName = helixClusterName; _dataDir = dataDir; @@ -258,6 +263,7 @@ public class PinotHelixResourceManager { for (int i = 0; i < _tableUpdaterLocks.length; i++) { _tableUpdaterLocks[i] = new Object(); } + _pinotTableIdealStateHelper = pinotTableIdealStateHelper; _lineageManager = lineageManager; } @@ -265,7 +271,7 @@ public class PinotHelixResourceManager { this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(), controllerConf.getDataDir(), controllerConf.tenantIsolationEnabled(), controllerConf.getEnableBatchMessageMode(), controllerConf.getDeletedSegmentsRetentionInDays(), controllerConf.tieredSegmentAssignmentEnabled(), - LineageManagerFactory.create(controllerConf)); + PinotTableIdealStateHelperFactory.create(), LineageManagerFactory.create(controllerConf)); } /** @@ -1583,9 +1589,7 @@ public class PinotHelixResourceManager { Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME, "Invalid table type: %s", tableType); - IdealState idealState = - PinotTableIdealStateHelper.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(), - _enableBatchMessageMode); + IdealState idealState = _pinotTableIdealStateHelper.buildEmptyIdealStateFor(tableConfig, _enableBatchMessageMode); // if (tableType == TableType.REALTIME) { // idealState = // PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java deleted file mode 100644 index 37c4b7555b..0000000000 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateHelper.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.pinot.controller.helix.core; - -import java.util.List; -import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; -import org.apache.helix.model.IdealState; -import org.apache.helix.model.builder.CustomModeISBuilder; -import org.apache.helix.model.builder.FullAutoModeISBuilder; -import org.apache.pinot.spi.config.table.TableType; -import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; -import org.apache.pinot.spi.stream.PartitionGroupMetadata; -import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; -import org.apache.pinot.spi.stream.StreamConfig; -import org.apache.pinot.spi.utils.builder.TableNameBuilder; -import org.apache.pinot.spi.utils.retry.RetryPolicies; -import org.apache.pinot.spi.utils.retry.RetryPolicy; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - - -public class PinotTableIdealStateHelper { - private PinotTableIdealStateHelper() { - } - - private static final Logger LOGGER = LoggerFactory.getLogger(PinotTableIdealStateHelper.class); - private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY = - RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L); - - public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int numReplicas, - boolean enableBatchMessageMode) { - LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); - CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableNameWithType); - customModeIdealStateBuilder - .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) - .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1); - IdealState idealState = customModeIdealStateBuilder.build(); - idealState.setInstanceGroupTag(tableNameWithType); - idealState.setBatchMessageMode(enableBatchMessageMode); - return idealState; - } - - public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas, - boolean enableBatchMessageMode) { - LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); - TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); - String stateModel; - if (tableType == null) { - throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType); - } else if (TableType.OFFLINE.equals(tableType)) { - stateModel = - PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - } else { - stateModel = - PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - } - - // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default - // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too - FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType); - idealStateBuilder - .setStateModel(stateModel) - .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1) - // TODO: Revisit the rebalance strategy to use (maybe we add a custom one) - .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); - // The below config guarantees if active number of replicas is no less than minimum active replica, there will - // not be partition movements happened. - // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix - // controller waits at most 5 minutes and then re-calculate the participant assignment. - // TODO: Assess which of these values need to be tweaked, removed, and what additional values that need to be added - idealStateBuilder.setMinActiveReplica(numReplicas - 1); - idealStateBuilder.setRebalanceDelay(300_000); - idealStateBuilder.enableDelayRebalance(); - // Set instance group tag - IdealState idealState = idealStateBuilder.build(); - idealState.setInstanceGroupTag(tableNameWithType); - idealState.setBatchMessageMode(enableBatchMessageMode); - return idealState; - } - - /** - * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, - * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. - * - * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed: - * - * 1) - * The current {@link PartitionGroupConsumptionStatus} is used to determine the offsets that have been consumed for - * a partition group. - * An example of where the offsets would be used: - * e.g. If partition group 1 contains shardId 1, with status DONE and endOffset 150. There's 2 possibilities: - * 1) the stream indicates that shardId's last offset is 200. - * This tells Pinot that partition group 1 still has messages which haven't been consumed, and must be included in - * the response. - * 2) the stream indicates that shardId's last offset is 150, - * This tells Pinot that all messages of partition group 1 have been consumed, and it need not be included in the - * response. - * Thus, this call will skip a partition group when it has reached end of life and all messages from that partition - * group have been consumed. - * - * The current {@link PartitionGroupConsumptionStatus} is also used to know about existing groupings of partitions, - * and accordingly make the new partition groups. - * e.g. Assume that partition group 1 has status IN_PROGRESS and contains shards 0,1,2 - * and partition group 2 has status DONE and contains shards 3,4. - * In the above example, the <code>partitionGroupConsumptionStatusList</code> indicates that - * the collection of shards in partition group 1, should remain unchanged in the response, - * whereas shards 3,4 can be added to new partition groups if needed. - * - * @param streamConfig the streamConfig from the tableConfig - * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current - * partition groups. - * The size of this list is equal to the number of partition groups, - * and is created using the latest segment zk metadata. - */ - public static List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, - List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { - PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = - new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList); - try { - DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); - return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); - } catch (Exception e) { - Exception fetcherException = partitionGroupMetadataFetcher.getException(); - LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfig.getTopicName(), - streamConfig.getTableNameWithType(), fetcherException); - throw new RuntimeException(fetcherException); - } - } -} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java new file mode 100644 index 0000000000..4ccac131ba --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/DefaultPinotTableIdealStateHelper.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import java.util.List; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.builder.CustomModeISBuilder; +import org.apache.helix.model.builder.FullAutoModeISBuilder; +import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator; +import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; +import org.apache.pinot.spi.stream.StreamConfig; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.apache.pinot.spi.utils.retry.RetryPolicies; +import org.apache.pinot.spi.utils.retry.RetryPolicy; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class DefaultPinotTableIdealStateHelper implements PinotTableIdealStateHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(DefaultPinotTableIdealStateHelper.class); + private static final RetryPolicy DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY = + RetryPolicies.randomDelayRetryPolicy(3, 100L, 200L); + + @Override + public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean enableBatchMessageMode) { + String tableNameWithType = tableConfig.getTableName(); + int numReplicas = tableConfig.getReplication(); + LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); + CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableNameWithType); + customModeIdealStateBuilder + .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) + .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1); + IdealState idealState = customModeIdealStateBuilder.build(); + idealState.setInstanceGroupTag(tableNameWithType); + idealState.setBatchMessageMode(enableBatchMessageMode); + return idealState; + } + + @Override + public List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList) { + PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = + new PartitionGroupMetadataFetcher(streamConfig, partitionGroupConsumptionStatusList); + try { + DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher); + return partitionGroupMetadataFetcher.getPartitionGroupMetadataList(); + } catch (Exception e) { + Exception fetcherException = partitionGroupMetadataFetcher.getException(); + LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of table: {}", streamConfig.getTopicName(), + streamConfig.getTableNameWithType(), fetcherException); + throw new RuntimeException(fetcherException); + } + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java new file mode 100644 index 0000000000..c8d2000ac2 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/FullAutoPinotTableIdealStateHelper.java @@ -0,0 +1,57 @@ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.builder.FullAutoModeISBuilder; +import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class FullAutoPinotTableIdealStateHelper extends DefaultPinotTableIdealStateHelper { + private static final Logger LOGGER = LoggerFactory.getLogger(FullAutoPinotTableIdealStateHelper.class); + + @Override + public IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean enableBatchMessageMode) { + String tableNameWithType = tableConfig.getTableName(); + int numReplicas = tableConfig.getReplication(); + + LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + String stateModel; + if (tableType == null) { + throw new RuntimeException("Failed to get table type from table name: " + tableNameWithType); + } else if (TableType.OFFLINE.equals(tableType)) { + stateModel = + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + } else { + stateModel = + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + } + + // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default + // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too + FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType); + idealStateBuilder + .setStateModel(stateModel) + .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1) + // TODO: Revisit the rebalance strategy to use (maybe we add a custom one) + .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); + // The below config guarantees if active number of replicas is no less than minimum active replica, there will + // not be partition movements happened. + // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix + // controller waits at most 5 minutes and then re-calculate the participant assignment. + // TODO: Assess which of these values need to be tweaked, removed, and what additional values that need to be added + idealStateBuilder.setMinActiveReplica(numReplicas - 1); + idealStateBuilder.setRebalanceDelay(300_000); + idealStateBuilder.enableDelayRebalance(); + // Set instance group tag + IdealState idealState = idealStateBuilder.build(); + idealState.setInstanceGroupTag(tableNameWithType); + idealState.setBatchMessageMode(enableBatchMessageMode); + return idealState; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java new file mode 100644 index 0000000000..84297cc665 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelper.java @@ -0,0 +1,57 @@ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import java.util.List; +import org.apache.helix.model.IdealState; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; +import org.apache.pinot.spi.stream.PartitionGroupMetadata; +import org.apache.pinot.spi.stream.StreamConfig; + + +public interface PinotTableIdealStateHelper { + + /** + * Builds an empty ideal state for the Pinot table. + * @param tableConfig table config. + * @param enableBatchMessageMode whether to enable batch message mode when building the ideal state. + */ + IdealState buildEmptyIdealStateFor(TableConfig tableConfig, boolean enableBatchMessageMode); + + /** + * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, + * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. + * + * Reasons why <code>partitionGroupConsumptionStatusList</code> is needed: + * + * 1) + * The current {@link PartitionGroupConsumptionStatus} is used to determine the offsets that have been consumed for + * a partition group. + * An example of where the offsets would be used: + * e.g. If partition group 1 contains shardId 1, with status DONE and endOffset 150. There's 2 possibilities: + * 1) the stream indicates that shardId's last offset is 200. + * This tells Pinot that partition group 1 still has messages which haven't been consumed, and must be included in + * the response. + * 2) the stream indicates that shardId's last offset is 150, + * This tells Pinot that all messages of partition group 1 have been consumed, and it need not be included in the + * response. + * Thus, this call will skip a partition group when it has reached end of life and all messages from that partition + * group have been consumed. + * + * The current {@link PartitionGroupConsumptionStatus} is also used to know about existing groupings of partitions, + * and accordingly make the new partition groups. + * e.g. Assume that partition group 1 has status IN_PROGRESS and contains shards 0,1,2 + * and partition group 2 has status DONE and contains shards 3,4. + * In the above example, the <code>partitionGroupConsumptionStatusList</code> indicates that + * the collection of shards in partition group 1, should remain unchanged in the response, + * whereas shards 3,4 can be added to new partition groups if needed. + * + * @param streamConfig the streamConfig from the tableConfig + * @param partitionGroupConsumptionStatusList List of {@link PartitionGroupConsumptionStatus} for the current + * partition groups. + * The size of this list is equal to the number of partition groups, + * and is created using the latest segment zk metadata. + */ + List<PartitionGroupMetadata> getPartitionGroupMetadataList(StreamConfig streamConfig, + List<PartitionGroupConsumptionStatus> partitionGroupConsumptionStatusList); + +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java new file mode 100644 index 0000000000..0f1890db6c --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/idealstatehelper/PinotTableIdealStateHelperFactory.java @@ -0,0 +1,26 @@ +package org.apache.pinot.controller.helix.core.idealstatehelper; + +import org.apache.pinot.controller.ControllerConf; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class PinotTableIdealStateHelperFactory { + private static final Logger LOGGER = LoggerFactory.getLogger(PinotTableIdealStateHelperFactory.class); + private static PinotTableIdealStateHelper INSTANCE = null; + private static ControllerConf _controllerConf; + + private PinotTableIdealStateHelperFactory() { + } + + public static void init(ControllerConf controllerConf) { + _controllerConf = controllerConf; + } + + public static PinotTableIdealStateHelper create() { + if (INSTANCE == null) { + INSTANCE = new DefaultPinotTableIdealStateHelper(); + } + return INSTANCE; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java index c9850856cd..94d158220c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java @@ -36,7 +36,7 @@ import org.apache.pinot.common.metrics.ControllerGauge; import org.apache.pinot.common.metrics.ControllerMeter; import org.apache.pinot.common.metrics.ControllerMetrics; import org.apache.pinot.common.utils.LLCSegmentName; -import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.spi.stream.OffsetCriteria; import org.apache.pinot.spi.stream.StreamConfig; import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider; @@ -79,7 +79,7 @@ public class MissingConsumingSegmentFinder { _partitionGroupIdToLargestStreamOffsetMap = new HashMap<>(); streamConfig.setOffsetCriteria(OffsetCriteria.LARGEST_OFFSET_CRITERIA); try { - PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig, Collections.emptyList()) + PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig, Collections.emptyList()) .forEach(metadata -> { _partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), metadata.getStartOffset()); }); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 40215c43a4..0f1b68e8f9 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -68,9 +68,9 @@ import org.apache.pinot.controller.api.events.MetadataEventNotifierFactory; import org.apache.pinot.controller.api.resources.Constants; import org.apache.pinot.controller.api.resources.PauseStatus; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor; import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdateManager; import org.apache.pinot.controller.helix.core.realtime.segment.FlushThresholdUpdater; @@ -817,7 +817,7 @@ public class PinotLLCRealtimeSegmentManager { @VisibleForTesting List<PartitionGroupMetadata> getNewPartitionGroupMetadataList(StreamConfig streamConfig, List<PartitionGroupConsumptionStatus> currentPartitionGroupConsumptionStatusList) { - return PinotTableIdealStateHelper.getPartitionGroupMetadataList(streamConfig, + return PinotTableIdealStateHelperFactory.create().getPartitionGroupMetadataList(streamConfig, currentPartitionGroupConsumptionStatusList); } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java index dc988ad669..017b0b2a5a 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/retention/RetentionManagerTest.java @@ -33,8 +33,8 @@ import org.apache.pinot.common.utils.LLCSegmentName; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.LeadControllerManager; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; -import org.apache.pinot.controller.helix.core.PinotTableIdealStateHelper; import org.apache.pinot.controller.helix.core.SegmentDeletionManager; +import org.apache.pinot.controller.helix.core.idealstatehelper.PinotTableIdealStateHelperFactory; import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils; import org.apache.pinot.spi.config.table.TableConfig; import org.apache.pinot.spi.config.table.TableType; @@ -270,9 +270,7 @@ public class RetentionManagerTest { final int replicaCount = tableConfig.getReplication(); List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); - - IdealState idealState = - PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, replicaCount, true); + IdealState idealState = PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, true); final int kafkaPartition = 5; final long millisInDays = TimeUnit.DAYS.toMillis(1); @@ -334,9 +332,7 @@ public class RetentionManagerTest { final int replicaCount = tableConfig.getReplication(); List<SegmentZKMetadata> segmentsZKMetadata = new ArrayList<>(); - - IdealState idealState = - PinotTableIdealStateHelper.buildEmptyIdealStateFor(REALTIME_TABLE_NAME, replicaCount, true); + IdealState idealState = PinotTableIdealStateHelperFactory.create().buildEmptyIdealStateFor(tableConfig, true); final int kafkaPartition = 5; final long millisInDays = TimeUnit.DAYS.toMillis(1); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org