This is an automated email from the ASF dual-hosted git repository. jlli pushed a commit to branch full-auto-poc in repository https://gitbox.apache.org/repos/asf/pinot.git
commit 9864022a2f0e9a7a8c5f2e3e6ab07ae89f658497 Author: Sonam Mandal <soman...@linkedin.com> AuthorDate: Wed Jan 17 13:31:58 2024 -0800 Initial POC code --- .../common/assignment/InstancePartitionsUtils.java | 11 +- .../pinot/common/utils/helix/HelixHelper.java | 10 +- .../pinot/controller/LeadControllerManager.java | 1 + .../PinotInstanceAssignmentRestletResource.java | 4 +- .../api/resources/PinotTenantRestletResource.java | 3 +- ...ineSegmentOnlineOfflineStateModelGenerator.java | 65 +++++++++ .../helix/core/PinotHelixResourceManager.java | 46 ++++-- ...lixSegmentOnlineOfflineStateModelGenerator.java | 2 +- .../helix/core/PinotTableIdealStateBuilder.java | 30 ++++ .../assignment/segment/SegmentAssignmentUtils.java | 8 + .../realtime/PinotLLCRealtimeSegmentManager.java | 50 ++++--- .../helix/core/rebalance/TableRebalancer.java | 46 +++++- .../helix/core/util/HelixSetupUtils.java | 19 +++ .../PinotLLCRealtimeSegmentManagerTest.java | 6 +- .../integration/tests/HelixZNodeSizeLimitTest.java | 19 ++- .../server/starter/helix/BaseServerStarter.java | 9 +- ...flineSegmentOnlineOfflineStateModelFactory.java | 162 +++++++++++++++++++++ .../SegmentOnlineOfflineStateModelFactory.java | 38 +++++ .../org/apache/pinot/tools/HybridQuickstart.java | 5 +- .../tools/admin/command/MoveReplicaGroup.java | 17 ++- .../tools/admin/command/QuickstartRunner.java | 2 +- .../airlineStats_offline_table_config.json | 4 +- 22 files changed, 501 insertions(+), 56 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java index 759d387af4..f8bbd08934 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/assignment/InstancePartitionsUtils.java @@ -23,7 +23,9 @@ import java.util.Collections; import java.util.List; import javax.annotation.Nullable; import org.apache.helix.AccessOption; +import org.apache.helix.ConfigAccessor; import org.apache.helix.HelixManager; +import org.apache.helix.model.ResourceConfig; import org.apache.helix.store.HelixPropertyStore; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.helix.zookeeper.datamodel.ZNRecord; @@ -165,12 +167,19 @@ public class InstancePartitionsUtils { * Persists the instance partitions to Helix property store. */ public static void persistInstancePartitions(HelixPropertyStore<ZNRecord> propertyStore, - InstancePartitions instancePartitions) { + ConfigAccessor configAccessor, String helixClusterName, InstancePartitions instancePartitions) { String path = ZKMetadataProvider .constructPropertyStorePathForInstancePartitions(instancePartitions.getInstancePartitionsName()); if (!propertyStore.set(path, instancePartitions.toZNRecord(), AccessOption.PERSISTENT)) { throw new ZkException("Failed to persist instance partitions: " + instancePartitions); } + + // Set the INSTANCE_PARTITIONS under the RESOURCE config (only modifying set path for now to ensure it works) + // This is just a test to see how to access and update the CONFIG/RESOURCES + String resourceName = "INSTANCE_PARTITION_" + instancePartitions.getInstancePartitionsName(); + ResourceConfig resourceConfig = new ResourceConfig(resourceName); + resourceConfig.setPreferenceLists(instancePartitions.getPartitionToInstancesMap()); + configAccessor.setResourceConfig(helixClusterName, resourceName, resourceConfig); } /** diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java index 4160dd44ef..8f76ba801f 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/helix/HelixHelper.java @@ -185,9 +185,12 @@ public class HelixHelper { String partitionName = it.next(); int numChars = partitionName.length(); Map<String, String> stateMap = is.getInstanceStateMap(partitionName); - for (Map.Entry<String, String> entry : stateMap.entrySet()) { - numChars += entry.getKey().length(); - numChars += entry.getValue().length(); + if (stateMap != null) { + // The stateMap might be NULL for FULL-AUTO segments, so always do this NULL check + for (Map.Entry<String, String> entry : stateMap.entrySet()) { + numChars += entry.getKey().length(); + numChars += entry.getValue().length(); + } } numChars *= is.getNumPartitions(); if (_minNumCharsInISToTurnOnCompression > 0 @@ -200,6 +203,7 @@ public class HelixHelper { }); return idealStateWrapper._idealState; } catch (Exception e) { + LOGGER.error("Caught exception while updating ideal state for resource: " + resourceName, e); throw new RuntimeException("Caught exception while updating ideal state for resource: " + resourceName, e); } } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java index 5c595bae2a..a1a4f7c270 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/LeadControllerManager.java @@ -220,6 +220,7 @@ public class LeadControllerManager { return; } + LOGGER.info("************** onResourceConfigChange() fired ***************"); boolean leadControllerResourceEnabled; try { leadControllerResourceEnabled = LeadControllerUtils.isLeadControllerResourceEnabled(_helixManager); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java index 282431e04b..6fd157d620 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotInstanceAssignmentRestletResource.java @@ -301,7 +301,9 @@ public class PinotInstanceAssignmentRestletResource { private void persistInstancePartitionsHelper(InstancePartitions instancePartitions) { try { LOGGER.info("Persisting instance partitions: {}", instancePartitions); - InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(), instancePartitions); + InstancePartitionsUtils.persistInstancePartitions(_resourceManager.getPropertyStore(), + _resourceManager.getHelixZkManager().getConfigAccessor(), _resourceManager.getHelixClusterName(), + instancePartitions); } catch (Exception e) { throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions", Response.Status.INTERNAL_SERVER_ERROR, e); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java index 8166427a93..1fea68c75f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTenantRestletResource.java @@ -359,7 +359,8 @@ public class PinotTenantRestletResource { try { LOGGER.info("Persisting instance partitions: {}", instancePartitions); InstancePartitionsUtils.persistInstancePartitions(_pinotHelixResourceManager.getPropertyStore(), - instancePartitions); + _pinotHelixResourceManager.getHelixZkManager().getConfigAccessor(), + _pinotHelixResourceManager.getHelixClusterName(), instancePartitions); } catch (Exception e) { throw new ControllerApplicationException(LOGGER, "Caught Exception while persisting the instance partitions", Response.Status.INTERNAL_SERVER_ERROR, e); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java new file mode 100644 index 0000000000..e4f93b3a52 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core; + +import org.apache.helix.model.StateModelDefinition; + +/** + * Offline Segment state model generator describes the transitions for offline segment states. + * + * Online to Offline, Online to Dropped + * Offline to Online, Offline to Dropped + * + * This does not include the state transitions for realtime segments (which includes the CONSUMING state) + */ +public class PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator { + private PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator() { + } + + public static final String PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL = "OfflineSegmentOnlineOfflineStateModel"; + + public static final String ONLINE_STATE = "ONLINE"; + public static final String OFFLINE_STATE = "OFFLINE"; + public static final String DROPPED_STATE = "DROPPED"; + + public static StateModelDefinition generatePinotStateModelDefinition() { + StateModelDefinition.Builder builder = + new StateModelDefinition.Builder(PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL); + builder.initialState(OFFLINE_STATE); + + builder.addState(ONLINE_STATE); + builder.addState(OFFLINE_STATE); + builder.addState(DROPPED_STATE); + // Set the initial state when the node starts + + // Add transitions between the states. + builder.addTransition(OFFLINE_STATE, ONLINE_STATE); + builder.addTransition(ONLINE_STATE, OFFLINE_STATE); + builder.addTransition(OFFLINE_STATE, DROPPED_STATE); + + // set constraints on states. + // static constraint + builder.dynamicUpperBound(ONLINE_STATE, "R"); + // dynamic constraint, R means it should be derived based on the replication + // factor. + + StateModelDefinition statemodelDefinition = builder.build(); + return statemodelDefinition; + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 35c942aba6..ae208715fd 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -1579,16 +1579,29 @@ public class PinotHelixResourceManager { LOGGER.info("Adding table {}: Validate table configs", tableNameWithType); validateTableTenantConfig(tableConfig); - IdealState idealState = - PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), - _enableBatchMessageMode); TableType tableType = tableConfig.getTableType(); + Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME, + "Invalid table type: %s", tableType); + + IdealState idealState; + if (tableType == TableType.REALTIME) { + idealState = + PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), + _enableBatchMessageMode); + } else { + // Creates a FULL-AUTO based ideal state, supported for OFFLINE tables only + idealState = + PinotTableIdealStateBuilder.buildEmptyFullAutoIdealStateFor(tableNameWithType, tableConfig.getReplication(), + _enableBatchMessageMode); + } + // IdealState idealState = + // PinotTableIdealStateBuilder.buildEmptyIdealStateFor(tableNameWithType, tableConfig.getReplication(), + // _enableBatchMessageMode); + // Ensure that table is not created if schema is not present if (ZKMetadataProvider.getSchema(_propertyStore, TableNameBuilder.extractRawTableName(tableNameWithType)) == null) { throw new InvalidTableConfigException("No schema defined for table: " + tableNameWithType); } - Preconditions.checkState(tableType == TableType.OFFLINE || tableType == TableType.REALTIME, - "Invalid table type: %s", tableType); // Add table config LOGGER.info("Adding table {}: Creating table config in the property store", tableNameWithType); @@ -1785,7 +1798,8 @@ public class PinotHelixResourceManager { referenceInstancePartitionsName); } } - InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions); + InstancePartitionsUtils.persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(), + _helixClusterName, instancePartitions); } } @@ -1803,7 +1817,8 @@ public class PinotHelixResourceManager { instanceAssignmentDriver.assignInstances(tierConfig.getName(), instanceConfigs, null, tableConfig.getInstanceAssignmentConfigMap().get(tierConfig.getName())); LOGGER.info("Persisting instance partitions: {}", instancePartitions); - InstancePartitionsUtils.persistInstancePartitions(_propertyStore, instancePartitions); + InstancePartitionsUtils.persistInstancePartitions(_propertyStore, _helixZkManager.getConfigAccessor(), + _helixClusterName, instancePartitions); } } } @@ -2243,7 +2258,8 @@ public class PinotHelixResourceManager { HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { assert idealState != null; Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); - if (currentAssignment.containsKey(segmentName)) { + Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields(); + if (currentAssignment.containsKey(segmentName) && currentAssignmentList.containsKey(segmentName)) { LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, tableNameWithType); } else { @@ -2251,8 +2267,18 @@ public class PinotHelixResourceManager { segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, tableNameWithType); - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType == TableType.REALTIME) { + // TODO: Once REALTIME uses FULL-AUTO only the listFields should be updated + currentAssignment.put(segmentName, + SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); + } else { + // TODO: Assess whether to pass in an empty instance list or to set the preferred list + currentAssignmentList.put(segmentName, Collections.emptyList() + /* SegmentAssignmentUtils.getInstanceStateList(assignedInstances) */); + } + // currentAssignment.put(segmentName, + // SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); } return idealState; }); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java index 18b6ac0a75..3e52de0359 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixSegmentOnlineOfflineStateModelGenerator.java @@ -69,7 +69,7 @@ public class PinotHelixSegmentOnlineOfflineStateModelGenerator { builder.dynamicUpperBound(ONLINE_STATE, "R"); // dynamic constraint, R means it should be derived based on the replication // factor. - builder.dynamicUpperBound(CONSUMING_STATE, "R"); + // builder.dynamicUpperBound(CONSUMING_STATE, "R"); StateModelDefinition statemodelDefinition = builder.build(); return statemodelDefinition; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java index 23a115417f..63222df7e3 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java @@ -19,8 +19,10 @@ package org.apache.pinot.controller.helix.core; import java.util.List; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.model.IdealState; import org.apache.helix.model.builder.CustomModeISBuilder; +import org.apache.helix.model.builder.FullAutoModeISBuilder; import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus; import org.apache.pinot.spi.stream.PartitionGroupMetadata; import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher; @@ -41,6 +43,7 @@ public class PinotTableIdealStateBuilder { public static IdealState buildEmptyIdealStateFor(String tableNameWithType, int numReplicas, boolean enableBatchMessageMode) { + LOGGER.info("Building CUSTOM IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); CustomModeISBuilder customModeIdealStateBuilder = new CustomModeISBuilder(tableNameWithType); customModeIdealStateBuilder .setStateModel(PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) @@ -51,6 +54,33 @@ public class PinotTableIdealStateBuilder { return idealState; } + public static IdealState buildEmptyFullAutoIdealStateFor(String tableNameWithType, int numReplicas, + boolean enableBatchMessageMode) { + LOGGER.info("Building FULL-AUTO IdealState for Table: {}, numReplicas: {}", tableNameWithType, numReplicas); + // FULL-AUTO Segment Online-Offline state model with a rebalance strategy, crushed auto-rebalance by default + // TODO: The state model used only works for OFFLINE tables today. Add support for REALTIME state model too + FullAutoModeISBuilder idealStateBuilder = new FullAutoModeISBuilder(tableNameWithType); + idealStateBuilder + .setStateModel( + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL) + .setNumPartitions(0).setNumReplica(numReplicas).setMaxPartitionsPerNode(1) + // TODO: Revisit the rebalance strategy to use (maybe we add a custom one) + .setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); + // The below config guarantees if active number of replicas is no less than minimum active replica, there will + // not be partition movements happened. + // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix + // controller waits at most 5 minutes and then re-calculate the participant assignment. + // TODO: Assess which of these values need to be tweaked, removed, and what additional values that need to be added + idealStateBuilder.setMinActiveReplica(numReplicas - 1); + idealStateBuilder.setRebalanceDelay(300_000); + idealStateBuilder.enableDelayRebalance(); + // Set instance group tag + IdealState idealState = idealStateBuilder.build(); + idealState.setInstanceGroupTag(tableNameWithType); + idealState.setBatchMessageMode(enableBatchMessageMode); + return idealState; + } + /** * Fetches the list of {@link PartitionGroupMetadata} for the new partition groups for the stream, * with the help of the {@link PartitionGroupConsumptionStatus} of the current partitionGroups. diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java index 52f736a555..46e4cc4eca 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/SegmentAssignmentUtils.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.assignment.segment; import com.google.common.base.Preconditions; import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.LinkedHashMap; @@ -272,6 +273,13 @@ public class SegmentAssignmentUtils { return instanceStateMap; } + public static List<String> getInstanceStateList(Collection<String> instances) { + List<String> instanceStateList = new ArrayList<>(); + instanceStateList.addAll(instances); + Collections.sort(instanceStateList); + return instanceStateList; + } + /** * Returns a map from instance name to number of segments to be moved to it. */ diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java index 25e40084ab..003f985e3f 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java @@ -332,12 +332,13 @@ public class PinotLLCRealtimeSegmentManager { long currentTimeMs = getCurrentTimeMs(); Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + Map<String, List<String>> instancesStateList = idealState.getRecord().getListFields(); for (PartitionGroupMetadata partitionGroupMetadata : newPartitionGroupMetadataList) { String segmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, numPartitionGroups, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, segmentName, segmentAssignment, - instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, segmentName, + segmentAssignment, instancePartitionsMap); } setIdealState(realtimeTableName, idealState); @@ -818,6 +819,7 @@ public class PinotLLCRealtimeSegmentManager { try { HelixHelper.updateIdealState(_helixManager, realtimeTableName, idealState -> { assert idealState != null; + // TODO: how to handle such state updates for FULL-AUTO mode? So far we don't enable FULL-AUTO for REALTIME Map<String, String> stateMap = idealState.getInstanceStateMap(segmentName); String state = stateMap.get(instanceName); if (SegmentStateModel.CONSUMING.equals(state)) { @@ -972,7 +974,8 @@ public class PinotLLCRealtimeSegmentManager { throw new HelixHelper.PermanentUpdaterException( "Exceeded max segment completion time for segment " + committingSegmentName); } - updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), committingSegmentName, + updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(), + idealState.getRecord().getListFields(), committingSegmentName, isTablePaused(idealState) ? null : newSegmentName, segmentAssignment, instancePartitionsMap); return idealState; }, RetryPolicies.exponentialBackoffRetryPolicy(10, 1000L, 1.2f)); @@ -984,8 +987,10 @@ public class PinotLLCRealtimeSegmentManager { @VisibleForTesting void updateInstanceStatesForNewConsumingSegment(Map<String, Map<String, String>> instanceStatesMap, + Map<String, List<String>> instanceStatesList, @Nullable String committingSegmentName, @Nullable String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { + // TODO: Need to figure out the best way to handle committed segments' state change if (committingSegmentName != null) { // Change committing segment state to ONLINE Set<String> instances = instanceStatesMap.get(committingSegmentName).keySet(); @@ -1023,8 +1028,15 @@ public class PinotLLCRealtimeSegmentManager { // Assign instances to the new segment and add instances as state CONSUMING List<String> instancesAssigned = segmentAssignment.assignSegment(newSegmentName, instanceStatesMap, instancePartitionsMap); + // No need to check for tableType as offline tables can never go to CONSUMING state. All callers are for REALTIME instanceStatesMap.put(newSegmentName, SegmentAssignmentUtils.getInstanceStateMap(instancesAssigned, SegmentStateModel.CONSUMING)); + // TODO: Once REALTIME segments move to FULL-AUTO, we cannot update the map. Uncomment below lines to update list. + // Assess whether we should set am empty InstanceStateList for the segment or not. i.e. do we support + // this preferred list concept, and does Helix-Auto even allow preferred list concept (from code reading it + // looks like it does) + // instanceStatesList.put(newSegmentName, Collections.emptyList() + // /*SegmentAssignmentUtils.getInstanceStateList(instancesAssigned)*/); LOGGER.info("Adding new CONSUMING segment: {} to instances: {}", newSegmentName, instancesAssigned); } } @@ -1116,6 +1128,7 @@ public class PinotLLCRealtimeSegmentManager { Collections.singletonMap(InstancePartitionsType.CONSUMING, instancePartitions); Map<String, Map<String, String>> instanceStatesMap = idealState.getRecord().getMapFields(); + Map<String, List<String>> instanceStatesList = idealState.getRecord().getListFields(); long currentTimeMs = getCurrentTimeMs(); StreamPartitionMsgOffsetFactory offsetFactory = StreamConsumerFactoryProvider.create(streamConfig).createStreamMsgOffsetFactory(); @@ -1181,14 +1194,14 @@ public class PinotLLCRealtimeSegmentManager { (offsetFactory.create(latestSegmentZKMetadata.getEndOffset()).toString()), 0); createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, newSegmentName, - segmentAssignment, instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName, + newSegmentName, segmentAssignment, instancePartitionsMap); } else { // partition group reached end of life LOGGER.info("PartitionGroup: {} has reached end of life. Updating ideal state for segment: {}. " + "Skipping creation of new ZK metadata and new segment in ideal state", partitionGroupId, latestSegmentName); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, latestSegmentName, null, segmentAssignment, - instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, latestSegmentName, + null, segmentAssignment, instancePartitionsMap); } } // else, the metadata should be IN_PROGRESS, which is the right state for a consuming segment. @@ -1212,8 +1225,8 @@ public class PinotLLCRealtimeSegmentManager { partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getStartOffset()); // segments are OFFLINE; start from beginning createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, - newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, - instancePartitionsMap, startOffset); + newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList, + segmentAssignment, instancePartitionsMap, startOffset); } else { if (newPartitionGroupSet.contains(partitionGroupId)) { if (recreateDeletedConsumingSegment && latestSegmentZKMetadata.getStatus().isCompleted() @@ -1231,8 +1244,8 @@ public class PinotLLCRealtimeSegmentManager { partitionGroupIdToSmallestStreamOffset, tableConfig.getTableName(), offsetFactory, latestSegmentZKMetadata.getEndOffset()); createNewConsumingSegment(tableConfig, streamConfig, latestSegmentZKMetadata, currentTimeMs, - newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, segmentAssignment, - instancePartitionsMap, startOffset); + newPartitionGroupMetadataList, instancePartitions, instanceStatesMap, instanceStatesList, + segmentAssignment, instancePartitionsMap, startOffset); } else { LOGGER.error("Got unexpected instance state map: {} for segment: {}", instanceStateMap, latestSegmentName); @@ -1269,8 +1282,8 @@ public class PinotLLCRealtimeSegmentManager { partitionGroupId, realtimeTableName); _controllerMetrics.addMeteredTableValue(realtimeTableName, ControllerMeter.LLC_STREAM_DATA_LOSS, 1L); } - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, previousConsumingSegment, latestSegmentName, - segmentAssignment, instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, previousConsumingSegment, + latestSegmentName, segmentAssignment, instancePartitionsMap); } else { LOGGER.error("Got unexpected status: {} in segment ZK metadata for segment: {}", latestSegmentZKMetadata.getStatus(), latestSegmentName); @@ -1285,8 +1298,8 @@ public class PinotLLCRealtimeSegmentManager { String newSegmentName = setupNewPartitionGroup(tableConfig, streamConfig, partitionGroupMetadata, currentTimeMs, instancePartitions, numPartitions, numReplicas); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, - instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instanceStatesList, null, newSegmentName, + segmentAssignment, instancePartitionsMap); } } @@ -1296,7 +1309,8 @@ public class PinotLLCRealtimeSegmentManager { private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig streamConfig, SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs, List<PartitionGroupMetadata> newPartitionGroupMetadataList, InstancePartitions instancePartitions, - Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment segmentAssignment, + Map<String, Map<String, String>> instanceStatesMap, Map<String, List<String>> instancesStateList, + SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, StreamPartitionMsgOffset startOffset) { int numReplicas = getNumReplicas(tableConfig, instancePartitions); int numPartitions = newPartitionGroupMetadataList.size(); @@ -1307,8 +1321,8 @@ public class PinotLLCRealtimeSegmentManager { createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, currentTimeMs, committingSegmentDescriptor, latestSegmentZKMetadata, instancePartitions, numPartitions, numReplicas); String newSegmentName = newLLCSegmentName.getSegmentName(); - updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, newSegmentName, segmentAssignment, - instancePartitionsMap); + updateInstanceStatesForNewConsumingSegment(instanceStatesMap, instancesStateList, null, newSegmentName, + segmentAssignment, instancePartitionsMap); } private Map<Integer, StreamPartitionMsgOffset> fetchPartitionGroupIdToSmallestOffset(StreamConfig streamConfig) { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java index 5c3514fa7e..f97aa7b06a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java @@ -21,6 +21,7 @@ package org.apache.pinot.controller.helix.core.rebalance; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -158,6 +159,16 @@ public class TableRebalancer { } } + private Map<String, List<String>> convertTargetAssignmentToListFields( + Map<String, Map<String, String>> targetAssignment, boolean setInstances) { + Map<String, List<String>> convertedListField = new HashMap<>(); + for (Map.Entry<String, Map<String, String>> entry : targetAssignment.entrySet()) { + List<String> value = setInstances ? new ArrayList<>(entry.getValue().keySet()) : Collections.emptyList(); + convertedListField.put(entry.getKey(), value); + } + return convertedListField; + } + private RebalanceResult doRebalance(TableConfig tableConfig, RebalanceConfig rebalanceConfig, @Nullable String rebalanceJobId) { long startTimeMs = System.currentTimeMillis(); @@ -299,8 +310,19 @@ public class TableRebalancer { LOGGER.info("For rebalanceId: {}, rebalancing table: {} with downtime", rebalanceJobId, tableNameWithType); // Reuse current IdealState to update the IdealState in cluster + // TODO: Assess how rebalance will change if we use FULL-AUTO mode. In FULL-AUTO mode the mapFields should not + // be set up at all. Maybe we don't even need this TableRebalancer? + TableType tableType = tableConfig.getTableType(); ZNRecord idealStateRecord = currentIdealState.getRecord(); - idealStateRecord.setMapFields(targetAssignment); + if (tableType == TableType.REALTIME) { + // TODO: Only set listFields once REALTIME tables use FULL-AUTO + idealStateRecord.setMapFields(targetAssignment); + } else { + // TODO: Assess whether we should set the preferred host list or not, for now setting empty list + Map<String, List<String>> listFieldsConverted = convertTargetAssignmentToListFields(targetAssignment, false); + idealStateRecord.setListFields(listFieldsConverted); + } + // idealStateRecord.setMapFields(targetAssignment); currentIdealState.setNumPartitions(targetAssignment.size()); currentIdealState.setReplicas(Integer.toString(targetAssignment.values().iterator().next().size())); @@ -496,7 +518,18 @@ public class TableRebalancer { SegmentAssignmentUtils.getNumSegmentsToBeMovedPerInstance(currentAssignment, nextAssignment)); // Reuse current IdealState to update the IdealState in cluster - idealStateRecord.setMapFields(nextAssignment); + // TODO: Assess how rebalance will change if we use FULL-AUTO mode. In FULL-AUTO mode the mapFields should not + // be set up at all. Maybe we don't even need this TableRebalancer? + TableType tableType = tableConfig.getTableType(); + if (tableType == TableType.REALTIME) { + // TODO: Only set listFields once REALTIME tables use FULL-AUTO + idealStateRecord.setMapFields(targetAssignment); + } else { + // TODO: Assess whether we should set the preferred host list or not, for now setting empty list + Map<String, List<String>> listFieldsConverted = convertTargetAssignmentToListFields(targetAssignment, false); + idealStateRecord.setListFields(listFieldsConverted); + } + // idealStateRecord.setMapFields(nextAssignment); idealState.setNumPartitions(nextAssignment.size()); idealState.setReplicas(Integer.toString(nextAssignment.values().iterator().next().size())); @@ -598,7 +631,7 @@ public class TableRebalancer { if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), - instancePartitions); + _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions); } } else { String referenceInstancePartitionsName = tableConfig.getInstancePartitionsMap().get(instancePartitionsType); @@ -614,7 +647,7 @@ public class TableRebalancer { LOGGER.info("Persisting instance partitions: {} (based on {})", instancePartitions, preConfiguredInstancePartitions); InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), - instancePartitions); + _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions); } } else { instancePartitions = @@ -625,7 +658,7 @@ public class TableRebalancer { LOGGER.info("Persisting instance partitions: {} (referencing {})", instancePartitions, referenceInstancePartitionsName); InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), - instancePartitions); + _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions); } } } @@ -730,7 +763,8 @@ public class TableRebalancer { boolean instancePartitionsUnchanged = instancePartitions.equals(existingInstancePartitions); if (!dryRun && !instancePartitionsUnchanged) { LOGGER.info("Persisting instance partitions: {} to ZK", instancePartitions); - InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), instancePartitions); + InstancePartitionsUtils.persistInstancePartitions(_helixManager.getHelixPropertyStore(), + _helixManager.getConfigAccessor(), _helixManager.getClusterName(), instancePartitions); } return Pair.of(instancePartitions, instancePartitionsUnchanged); } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java index 8d21d18b1f..61f6112365 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java @@ -31,6 +31,7 @@ import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.model.IdealState; @@ -45,6 +46,7 @@ import org.apache.helix.zookeeper.impl.client.ZkClient; import org.apache.pinot.common.utils.helix.LeadControllerUtils; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator; +import org.apache.pinot.controller.helix.core.PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator; import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; import org.apache.pinot.spi.utils.CommonConstants; import org.slf4j.Logger; @@ -79,6 +81,8 @@ public class HelixSetupUtils { new HelixConfigScopeBuilder(ConfigScopeProperty.CLUSTER).forCluster(helixClusterName).build(); Map<String, String> configMap = new HashMap<>(); configMap.put(ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN, Boolean.toString(true)); + configMap.put(ClusterConfig.ClusterConfigProperty.PERSIST_BEST_POSSIBLE_ASSIGNMENT.name(), + Boolean.toString(true)); configMap.put(ENABLE_CASE_INSENSITIVE_KEY, Boolean.toString(DEFAULT_ENABLE_CASE_INSENSITIVE)); configMap.put(DEFAULT_HYPERLOGLOG_LOG2M_KEY, Integer.toString(DEFAULT_HYPERLOGLOG_LOG2M)); configMap.put(CommonConstants.Broker.CONFIG_OF_ENABLE_QUERY_LIMIT_OVERRIDE, Boolean.toString(false)); @@ -147,6 +151,21 @@ public class HelixSetupUtils { helixDataAccessor .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); } + + String offlineSegmentStateModelName = + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.PINOT_OFFLINE_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; + StateModelDefinition offlineStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, + offlineSegmentStateModelName); + if (offlineStateModelDefinition == null || isUpdateStateModel) { + if (stateModelDefinition == null) { + LOGGER.info("Adding offline segment state model: {} with CONSUMING state", offlineSegmentStateModelName); + } else { + LOGGER.info("Updating offline segment state model: {} to contain CONSUMING state", + offlineSegmentStateModelName); + } + helixDataAccessor.createStateModelDef( + PinotHelixOfflineSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); + } } private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin, diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java index 60b83ba24a..a882a47ec7 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java @@ -1221,9 +1221,11 @@ public class PinotLLCRealtimeSegmentManagerTest { void updateIdealStateOnSegmentCompletion(String realtimeTableName, String committingSegmentName, String newSegmentName, SegmentAssignment segmentAssignment, Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) { - updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), committingSegmentName, null, + updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), + _idealState.getRecord().getListFields(), committingSegmentName, null, segmentAssignment, instancePartitionsMap); - updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), null, newSegmentName, + updateInstanceStatesForNewConsumingSegment(_idealState.getRecord().getMapFields(), + _idealState.getRecord().getListFields(), null, newSegmentName, segmentAssignment, instancePartitionsMap); } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java index 37d3aa6ca3..9dec9df6b6 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/HelixZNodeSizeLimitTest.java @@ -19,7 +19,8 @@ package org.apache.pinot.integration.tests; -import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableList; +import java.util.List; import java.util.Map; import org.apache.helix.zookeeper.constant.ZkSystemPropertyKeys; import org.apache.pinot.common.utils.helix.HelixHelper; @@ -83,15 +84,23 @@ public class HelixZNodeSizeLimitTest extends BaseClusterIntegrationTest { try { HelixHelper.updateIdealState(_helixManager, tableNameWithType, idealState -> { Map<String, Map<String, String>> currentAssignment = idealState.getRecord().getMapFields(); + Map<String, List<String>> currentAssignmentList = idealState.getRecord().getListFields(); for (int i = 0; i < 500_000; i++) { - currentAssignment.put("segment_" + i, - ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 10), "ONLINE")); - currentAssignment.put("segment_" + i, - ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 9), "ONLINE")); + // currentAssignment.put("segment_" + i, + // ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 10), "ONLINE")); + // currentAssignment.put("segment_" + i, + // ImmutableMap.of("Server_with_some_reasonable_long_prefix_" + (i % 9), "ONLINE")); + currentAssignmentList.put("segment_" + i, + ImmutableList.of("Server_with_some_reasonable_long_prefix_" + (i % 10))); + currentAssignmentList.put("segment_" + i, + ImmutableList.of("Server_with_some_reasonable_long_prefix_" + (i % 9))); } return idealState; }); } catch (Exception e) { + System.out.println("exception: " + e); + System.out.println("exception: " + e.getMessage()); + System.out.println("exception: " + e.getCause()); Assert.fail("Exception shouldn't be thrown even if the data size of the ideal state is larger than 1M"); } } diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index 2b771707d2..6e0d157075 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -579,10 +579,15 @@ public abstract class BaseServerStarter implements ServiceStartable { Tracing.ThreadAccountantOps .initializeThreadAccountant(_serverConf.subset(CommonConstants.PINOT_QUERY_SCHEDULER_PREFIX), _instanceId); initSegmentFetcher(_serverConf); - StateModelFactory<?> stateModelFactory = + StateModelFactory<?> stateModelFactoryWithRealtime = new SegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); + StateModelFactory<?> stateModelFactory = + new OfflineSegmentOnlineOfflineStateModelFactory(_instanceId, instanceDataManager); + _helixManager.getStateMachineEngine() + .registerStateModelFactory(OfflineSegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory); _helixManager.getStateMachineEngine() - .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), stateModelFactory); + .registerStateModelFactory(SegmentOnlineOfflineStateModelFactory.getStateModelName(), + stateModelFactoryWithRealtime); // Start the data manager as a pre-connect callback so that it starts after connecting to the ZK in order to access // the property store, but before receiving state transitions _helixManager.addPreConnectCallback(_serverInstance::startDataManager); diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java new file mode 100644 index 0000000000..0a0608b44d --- /dev/null +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/OfflineSegmentOnlineOfflineStateModelFactory.java @@ -0,0 +1,162 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.server.starter.helix; + +import com.google.common.base.Preconditions; +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; +import org.apache.pinot.common.Utils; +import org.apache.pinot.common.restlet.resources.SegmentErrorInfo; +import org.apache.pinot.core.data.manager.InstanceDataManager; +import org.apache.pinot.segment.local.data.manager.TableDataManager; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.utils.builder.TableNameBuilder; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Data Server layer state model to take over how to operate on: + * 1. Add a new segment + * 2. Refresh an existed now serving segment. + * 3. Delete an existed segment. + * + * This only works for OFFLINE table segments and does not handle the CONSUMING state at all + */ +public class OfflineSegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { + private final String _instanceId; + private final InstanceDataManager _instanceDataManager; + + public OfflineSegmentOnlineOfflineStateModelFactory(String instanceId, InstanceDataManager instanceDataManager) { + _instanceId = instanceId; + _instanceDataManager = instanceDataManager; + } + + public static String getStateModelName() { + return "OfflineSegmentOnlineOfflineStateModel"; + } + + @Override + public StateModel createNewStateModel(String resourceName, String partitionName) { + return new OfflineSegmentOnlineOfflineStateModelFactory.OfflineSegmentOnlineOfflineStateModel(); + } + + + // Helix seems to need StateModelInfo annotation for 'initialState'. It does not use the 'states' field. + // The transitions in the helix messages indicate the from/to states, and helix uses the + // Transition annotations (but only if StateModelInfo is defined). + @SuppressWarnings("unused") + @StateModelInfo(states = "{'OFFLINE','ONLINE', 'DROPPED'}", initialState = "OFFLINE") + public class OfflineSegmentOnlineOfflineStateModel extends StateModel { + private final Logger _logger = LoggerFactory.getLogger(_instanceId + " - OfflineSegmentOnlineOfflineStateModel"); + + @Transition(from = "OFFLINE", to = "ONLINE") + public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { + _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOnlineFromOffline() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + Preconditions.checkArgument((tableType != null) && (tableType != TableType.REALTIME), + "TableType is null or is a REALTIME table, offline state model should not be called fo RT"); + try { + _instanceDataManager.addOrReplaceSegment(tableNameWithType, segmentName); + } catch (Exception e) { + String errorMessage = + String.format("Caught exception in state transition OFFLINE -> ONLINE for table: %s, segment: %s", + tableNameWithType, segmentName); + _logger.error(errorMessage, e); + TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(tableNameWithType); + if (tableDataManager != null) { + tableDataManager.addSegmentError(segmentName, + new SegmentErrorInfo(System.currentTimeMillis(), errorMessage, e)); + } + Utils.rethrowException(e); + } + } + + // Remove segment from InstanceDataManager. + // Still keep the data files in local. + @Transition(from = "ONLINE", to = "OFFLINE") + public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { + _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOfflineFromOnline() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.offloadSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition ONLINE -> OFFLINE for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + + // Delete segment from local directory. + @Transition(from = "OFFLINE", to = "DROPPED") + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromOffline() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition OFFLINE -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + + @Transition(from = "ONLINE", to = "DROPPED") + public void onBecomeDroppedFromOnline(Message message, NotificationContext context) { + _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromOnline() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.offloadSegment(tableNameWithType, segmentName); + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition ONLINE -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + + @Transition(from = "ERROR", to = "OFFLINE") + public void onBecomeOfflineFromError(Message message, NotificationContext context) { + _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeOfflineFromError() : " + message); + } + + @Transition(from = "ERROR", to = "DROPPED") + public void onBecomeDroppedFromError(Message message, NotificationContext context) { + _logger.info("OfflineSegmentOnlineOfflineStateModel.onBecomeDroppedFromError() : " + message); + String tableNameWithType = message.getResourceName(); + String segmentName = message.getPartitionName(); + try { + _instanceDataManager.deleteSegment(tableNameWithType, segmentName); + } catch (Exception e) { + _logger.error("Caught exception in state transition ERROR -> DROPPED for table: {}, segment: {}", + tableNameWithType, segmentName, e); + Utils.rethrowException(e); + } + } + } +} diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java index 42d1642c7b..76b6e779dc 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/SegmentOnlineOfflineStateModelFactory.java @@ -44,6 +44,14 @@ import org.slf4j.LoggerFactory; * 1. Add a new segment * 2. Refresh an existed now serving segment. * 3. Delete an existed segment. + * + * TODO: Assess how to handle this state model for FULL-AUTO. Today we have states such as A -> B, B -> C and A -> C. + * FULL-AUTO optimizes such transitions and only calls A -> C in such cases. Due to semantics we need to allow + * REALTIME segments to directly move from OFFLINE -> ONLINE for completed segments which violates what FULL-AUTO + * does. Due to this limitation we never see transitions from OFFLINE -> CONSUMING even though we need this + * transition for all new CONSUMING segments. + * To unblock the POC, for now we have moved the OFFLINE segment state model to a different + * class: OfflineSegmentOnlineOfflineStateModelFactory */ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<StateModel> { private final String _instanceId; @@ -76,6 +84,15 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta _logger.info("SegmentOnlineOfflineStateModel.onBecomeConsumingFromOffline() : " + message); String realtimeTableName = message.getResourceName(); String segmentName = message.getPartitionName(); + + // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now +// TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); +// Preconditions.checkNotNull(tableType); +// if (tableType == TableType.OFFLINE) { +// _logger.info("OFFLINE->CONSUMING state transition called for OFFLINE table, treat this as a no-op"); +// return; +// } + try { _instanceDataManager.addRealtimeSegment(realtimeTableName, segmentName); } catch (Exception e) { @@ -97,6 +114,26 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta _logger.info("SegmentOnlineOfflineStateModel.onBecomeOnlineFromConsuming() : " + message); String realtimeTableName = message.getResourceName(); String segmentName = message.getPartitionName(); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(realtimeTableName); + Preconditions.checkNotNull(tableType); + + // TODO: This may not be needed if we split the state models between OFFLINE and REALTIME. Commented out for now +// if (tableType == TableType.OFFLINE) { +// try { +// _instanceDataManager.addOrReplaceSegment(realtimeTableName, segmentName); +// } catch (Exception e) { +// String errorMessage = String.format( +// "Caught exception in state transition CONSUMING -> ONLINE for table: %s, segment: %s", +// realtimeTableName, segmentName); +// _logger.error(errorMessage, e); +// TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); +// if (tableDataManager != null) { +// tableDataManager.addSegmentError(segmentName, new SegmentErrorInfo(System.currentTimeMillis(), +// errorMessage, e)); +// } +// Utils.rethrowException(e); +// } +// } else { TableDataManager tableDataManager = _instanceDataManager.getTableDataManager(realtimeTableName); Preconditions.checkState(tableDataManager != null, "Failed to find table: %s", realtimeTableName); tableDataManager.onConsumingToOnline(segmentName); @@ -131,6 +168,7 @@ public class SegmentOnlineOfflineStateModelFactory extends StateModelFactory<Sta tableDataManager.releaseSegment(acquiredSegment); } } +// } @Transition(from = "CONSUMING", to = "OFFLINE") public void onBecomeOfflineFromConsuming(Message message, NotificationContext context) { diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java index e5bd332c2b..dc8285472a 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/HybridQuickstart.java @@ -51,7 +51,8 @@ public class HybridQuickstart extends Quickstart { public Map<String, Object> getConfigOverrides() { Map<String, Object> overrides = new HashMap<>(super.getConfigOverrides()); overrides.put("pinot.server.grpc.enable", "true"); - overrides.put("pinot.server.grpc.port", "8090"); + // Commenting out the below to allow running more than 1 server + // overrides.put("pinot.server.grpc.port", "8090"); return overrides; } @@ -114,7 +115,7 @@ public class HybridQuickstart extends Quickstart { quickstartTableRequests.addAll(bootstrapOfflineTableDirectories(quickstartTmpDir)); quickstartTableRequests.addAll(bootstrapStreamTableDirectories(quickstartTmpDir)); final QuickstartRunner runner = - new QuickstartRunner(new ArrayList<>(quickstartTableRequests), 1, 1, 1, 1, quickstartRunnerDir, + new QuickstartRunner(new ArrayList<>(quickstartTableRequests), 1, 1, 4, 1, quickstartRunnerDir, getConfigOverrides()); startKafka(); diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java index 4ece1f5abf..b14cb9e240 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/MoveReplicaGroup.java @@ -29,6 +29,7 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.List; @@ -45,6 +46,7 @@ import org.apache.pinot.common.utils.HashUtil; import org.apache.pinot.common.utils.config.TableConfigUtils; import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -210,9 +212,22 @@ public class MoveReplicaGroup extends AbstractBaseAdminCommand implements Comman @Override public IdealState apply(@Nullable IdealState input) { Map<String, Map<String, String>> existingMapField = input.getRecord().getMapFields(); + Map<String, List<String>> existingListField = input.getRecord().getListFields(); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(_tableName); for (Map.Entry<String, Map<String, String>> segmentEntry : proposedIdealState.entrySet()) { - existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue()); + // existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue()); + if (tableType == TableType.REALTIME) { + // TODO: Update listField only once REALTIME uses FULL-AUTO + existingMapField.put(segmentEntry.getKey(), segmentEntry.getValue()); + } else { + String segmentName = segmentEntry.getKey(); + Map<String, String> segmentMapping = segmentEntry.getValue(); + List<String> listOfHosts = new ArrayList<>(segmentMapping.keySet()); + Collections.sort(listOfHosts); + // TODO: Assess if we want to add the preferred list of hosts or not + existingListField.put(segmentName, Collections.emptyList() /* listOfHosts */); + } } return input; } diff --git a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java index e1e80e8ae0..310b6dc296 100644 --- a/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java +++ b/pinot-tools/src/main/java/org/apache/pinot/tools/admin/command/QuickstartRunner.java @@ -157,7 +157,7 @@ public class QuickstartRunner { .setZkAddress(_zkExternalAddress != null ? _zkExternalAddress : ZK_ADDRESS).setClusterName(CLUSTER_NAME) .setDataDir(new File(_tempDir, DEFAULT_SERVER_DATA_DIR + i).getAbsolutePath()) .setSegmentDir(new File(_tempDir, DEFAULT_SERVER_SEGMENT_DIR + i).getAbsolutePath()) - .setConfigOverrides(_configOverrides); + .setConfigOverrides(_configOverrides).setMultiStageServerPort(8040 + i).setMultiStageRunnerPort(8100 + i); if (!serverStarter.execute()) { throw new RuntimeException("Failed to start Server"); } diff --git a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json index 856719e058..12d29fc83d 100644 --- a/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json +++ b/pinot-tools/src/main/resources/examples/batch/airlineStats/airlineStats_offline_table_config.json @@ -6,7 +6,7 @@ "timeType": "DAYS", "segmentPushType": "APPEND", "segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy", - "replication": "1" + "replication": "3" }, "tenants": {}, "fieldConfigList": [ @@ -44,7 +44,7 @@ "coldTier": { "encodingType": "RAW", "indexes": { - "text": { + "inverted": { "enabled": "true" } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org