This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 9353f158b6 Add new ImplicitRealtimeTablePartitionSelector strategy for
instance assignment (#15930)
9353f158b6 is described below
commit 9353f158b6f59dbc8d19c626f173f8b99f34b1ce
Author: Yash Mayya <[email protected]>
AuthorDate: Fri Jun 6 16:41:54 2025 +0100
Add new ImplicitRealtimeTablePartitionSelector strategy for instance
assignment (#15930)
---
.../ImplicitRealtimeTablePartitionSelector.java | 79 ++++++++
.../instance/InstanceAssignmentDriver.java | 22 +-
.../instance/InstancePartitionSelectorFactory.java | 7 +-
.../InstanceReplicaGroupPartitionSelector.java | 4 +-
.../helix/core/rebalance/RebalanceResult.java | 8 +
.../core/rebalance/RebalanceSummaryResult.java | 88 ++++++++
.../instance/InstanceAssignmentTest.java | 153 ++++++++++++++
.../TableRebalancerClusterStatelessTest.java | 221 ++++++++++++++++++++-
.../tests/TableRebalanceIntegrationTest.java | 80 +++++++-
.../segment/local/utils/TableConfigUtils.java | 30 +++
.../segment/local/utils/TableConfigUtilsTest.java | 71 +++++++
.../table/assignment/InstanceAssignmentConfig.java | 2 +-
12 files changed, 744 insertions(+), 21 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java
new file mode 100644
index 0000000000..1cb12a725a
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/ImplicitRealtimeTablePartitionSelector.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.assignment.instance;
+
+import com.google.common.annotations.VisibleForTesting;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.spi.config.table.TableConfig;
+import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+
+
+/**
+ * Variation of {@link InstanceReplicaGroupPartitionSelector} that uses the
number of partitions from the stream
+ * to determine the number of partitions in each replica group.
+ */
+public class ImplicitRealtimeTablePartitionSelector extends
InstanceReplicaGroupPartitionSelector {
+ private final int _numPartitions;
+
+ public ImplicitRealtimeTablePartitionSelector(TableConfig tableConfig,
+ InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig, String
tableNameWithType,
+ @Nullable InstancePartitions existingInstancePartitions, boolean
minimizeDataMovement) {
+ this(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions, minimizeDataMovement,
+ // Get the number of partitions from the first stream config
+ // TODO: Revisit this logic to better handle multiple streams in the
future - either validate that they
+ // all have the same number of partitions and use that or
disallow the use of this selector in case the
+ // partition counts differ.
+
StreamConsumerFactoryProvider.create(IngestionConfigUtils.getFirstStreamConfig(tableConfig))
+ .createStreamMetadataProvider(
+ ImplicitRealtimeTablePartitionSelector.class.getSimpleName() +
"-" + tableNameWithType)
+ );
+ }
+
+ @VisibleForTesting
+ ImplicitRealtimeTablePartitionSelector(InstanceReplicaGroupPartitionConfig
replicaGroupPartitionConfig,
+ String tableNameWithType, @Nullable InstancePartitions
existingInstancePartitions, boolean minimizeDataMovement,
+ StreamMetadataProvider streamMetadataProvider) {
+ super(replicaGroupPartitionConfig, tableNameWithType,
existingInstancePartitions, minimizeDataMovement);
+ _numPartitions = getStreamNumPartitions(streamMetadataProvider);
+ }
+
+ private int getStreamNumPartitions(StreamMetadataProvider
streamMetadataProvider) {
+ try (streamMetadataProvider) {
+ return streamMetadataProvider.fetchPartitionCount(10_000L);
+ } catch (Exception e) {
+ throw new RuntimeException("Failed to retrieve partition info for table:
" + _tableNameWithType, e);
+ }
+ }
+
+ @Override
+ protected int getNumPartitions() {
+ return _numPartitions;
+ }
+
+ @Override
+ protected int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) {
+ // This partition selector should only be used for CONSUMING instance
partitions, and we enforce a single instance
+ // per partition in this case.
+ return 1;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
index f76c9df6a4..04ce082fa6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentDriver.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix.core.assignment.instance;
+import com.google.common.annotations.VisibleForTesting;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
@@ -113,9 +114,26 @@ public class InstanceAssignmentDriver {
// if existingInstancePartitions is null.
boolean minimizeDataMovementFromTableConfig =
instanceAssignmentConfig.isMinimizeDataMovement();
boolean minimizeDataMovement =
minimizeDataMovementEnablement.isEnabled(minimizeDataMovementFromTableConfig);
+ InstancePartitionSelector instancePartitionSelector =
+ InstancePartitionSelectorFactory.getInstance(_tableConfig,
instanceAssignmentConfig.getPartitionSelector(),
+ instanceAssignmentConfig.getReplicaGroupPartitionConfig(),
tableNameWithType, existingInstancePartitions,
+ preConfiguredInstancePartitions, minimizeDataMovement);
+
LOGGER.info("Starting {} instance assignment for table: {} with
minimizeDataMovement: {} (from table config: {}, "
+ "override: {})", instancePartitionsName, tableNameWithType,
minimizeDataMovement,
minimizeDataMovementFromTableConfig, minimizeDataMovementEnablement);
+
+ return getInstancePartitions(instancePartitionsName,
instanceAssignmentConfig, instanceConfigs,
+ existingInstancePartitions, minimizeDataMovement,
instancePartitionSelector);
+ }
+
+ @VisibleForTesting
+ InstancePartitions getInstancePartitions(String instancePartitionsName,
+ InstanceAssignmentConfig instanceAssignmentConfig, List<InstanceConfig>
instanceConfigs,
+ @Nullable InstancePartitions existingInstancePartitions, boolean
minimizeDataMovement,
+ InstancePartitionSelector instancePartitionSelector) {
+ String tableNameWithType = _tableConfig.getTableName();
+
InstanceTagPoolSelector tagPoolSelector =
new
InstanceTagPoolSelector(instanceAssignmentConfig.getTagPoolConfig(),
tableNameWithType,
minimizeDataMovement, existingInstancePartitions);
@@ -132,10 +150,6 @@ public class InstanceAssignmentDriver {
poolToInstanceConfigsMap =
constraintApplier.applyConstraint(poolToInstanceConfigsMap);
}
- InstancePartitionSelector instancePartitionSelector =
-
InstancePartitionSelectorFactory.getInstance(instanceAssignmentConfig.getPartitionSelector(),
- instanceAssignmentConfig.getReplicaGroupPartitionConfig(),
tableNameWithType, existingInstancePartitions,
- preConfiguredInstancePartitions, minimizeDataMovement);
InstancePartitions instancePartitions = new
InstancePartitions(instancePartitionsName);
instancePartitionSelector.selectInstances(poolToInstanceConfigsMap,
instancePartitions);
return instancePartitions;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
index 8a343b1598..d038245913 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstancePartitionSelectorFactory.java
@@ -21,6 +21,7 @@ package
org.apache.pinot.controller.helix.core.assignment.instance;
import java.util.Arrays;
import javax.annotation.Nullable;
import org.apache.pinot.common.assignment.InstancePartitions;
+import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
@@ -30,7 +31,8 @@ public class InstancePartitionSelectorFactory {
private InstancePartitionSelectorFactory() {
}
- public static InstancePartitionSelector
getInstance(InstanceAssignmentConfig.PartitionSelector partitionSelector,
+ public static InstancePartitionSelector getInstance(TableConfig tableConfig,
+ InstanceAssignmentConfig.PartitionSelector partitionSelector,
InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig,
String tableNameWithType,
InstancePartitions existingInstancePartitions, @Nullable
InstancePartitions preConfiguredInstancePartitions,
boolean minimizeDataMovement) {
@@ -44,6 +46,9 @@ public class InstancePartitionSelectorFactory {
case MIRROR_SERVER_SET_PARTITION_SELECTOR:
return new
MirrorServerSetInstancePartitionSelector(instanceReplicaGroupPartitionConfig,
tableNameWithType,
existingInstancePartitions, preConfiguredInstancePartitions,
minimizeDataMovement);
+ case IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR:
+ return new ImplicitRealtimeTablePartitionSelector(tableConfig,
instanceReplicaGroupPartitionConfig,
+ tableNameWithType, existingInstancePartitions,
minimizeDataMovement);
default:
throw new IllegalStateException("Unexpected PartitionSelector: " +
partitionSelector + ", should be from"
+
Arrays.toString(InstanceAssignmentConfig.PartitionSelector.values()));
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
index b8c19ede69..4309d67b67 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceReplicaGroupPartitionSelector.java
@@ -248,7 +248,7 @@ public class InstanceReplicaGroupPartitionSelector extends
InstancePartitionSele
return numInstancesPerReplicaGroup;
}
- private int getNumPartitions() {
+ protected int getNumPartitions() {
// Assign instances within a replica-group to one partition if not
configured
int numPartitions = _replicaGroupPartitionConfig.getNumPartitions();
if (numPartitions <= 0) {
@@ -257,7 +257,7 @@ public class InstanceReplicaGroupPartitionSelector extends
InstancePartitionSele
return numPartitions;
}
- private int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) {
+ protected int getNumInstancesPerPartition(int numInstancesPerReplicaGroup) {
// Assign all instances within a replica-group to each partition if not
configured
int numInstancesPerPartition =
_replicaGroupPartitionConfig.getNumInstancesPerPartition();
if (numInstancesPerPartition > 0) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index f2737c265b..cfc38a8033 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -114,4 +114,12 @@ public class RebalanceResult {
// UNKNOWN_ERROR if the job hits on an unexpected exception.
NO_OP, DONE, FAILED, IN_PROGRESS, ABORTED, CANCELLED, UNKNOWN_ERROR
}
+
+ @Override
+ public String toString() {
+ return "RebalanceResult{" + "_jobId='" + _jobId + '\'' + ", _status=" +
_status + ", _description='" + _description
+ + '\'' + ", _instanceAssignment=" + _instanceAssignment + ",
_tierInstanceAssignment="
+ + _tierInstanceAssignment + ", _segmentAssignment=" +
_segmentAssignment + ", _preChecksResult="
+ + _preChecksResult + ", _rebalanceSummaryResult=" +
_rebalanceSummaryResult + '}';
+ }
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
index a7c5da9abf..d99cb55c09 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
@@ -137,6 +137,19 @@ public class RebalanceSummaryResult {
public List<String> getTagList() {
return _tagList;
}
+
+ @Override
+ public String toString() {
+ return "ServerSegmentChangeInfo{"
+ + "_serverStatus=" + _serverStatus
+ + ", _totalSegmentsAfterRebalance=" + _totalSegmentsAfterRebalance
+ + ", _totalSegmentsBeforeRebalance=" + _totalSegmentsBeforeRebalance
+ + ", _segmentsAdded=" + _segmentsAdded
+ + ", _segmentsDeleted=" + _segmentsDeleted
+ + ", _segmentsUnchanged=" + _segmentsUnchanged
+ + ", _tagList=" + _tagList
+ + '}';
+ }
}
public static class RebalanceChangeInfo {
@@ -164,6 +177,14 @@ public class RebalanceSummaryResult {
public int getExpectedValueAfterRebalance() {
return _expectedValueAfterRebalance;
}
+
+ @Override
+ public String toString() {
+ return "RebalanceChangeInfo{"
+ + "_valueBeforeRebalance=" + _valueBeforeRebalance
+ + ", _expectedValueAfterRebalance=" + _expectedValueAfterRebalance
+ + '}';
+ }
}
public static class TagInfo {
@@ -221,6 +242,16 @@ public class RebalanceSummaryResult {
public void increaseNumServerParticipants(int numServers) {
_numServerParticipants += numServers;
}
+
+ @Override
+ public String toString() {
+ return "TagInfo{"
+ + "_tagName='" + _tagName + '\''
+ + ", _numSegmentsUnchanged=" + _numSegmentsUnchanged
+ + ", _numSegmentsToDownload=" + _numSegmentsToDownload
+ + ", _numServerParticipants=" + _numServerParticipants
+ + '}';
+ }
}
@JsonInclude(JsonInclude.Include.NON_NULL)
@@ -295,6 +326,19 @@ public class RebalanceSummaryResult {
public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() {
return _serverSegmentChangeInfo;
}
+
+ @Override
+ public String toString() {
+ return "ServerInfo{"
+ + "_numServersGettingNewSegments=" + _numServersGettingNewSegments
+ + ", _numServers=" + _numServers
+ + ", _serversAdded=" + _serversAdded
+ + ", _serversRemoved=" + _serversRemoved
+ + ", _serversUnchanged=" + _serversUnchanged
+ + ", _serversGettingNewSegments=" + _serversGettingNewSegments
+ + ", _serverSegmentChangeInfo=" + _serverSegmentChangeInfo
+ + '}';
+ }
}
public static class ConsumingSegmentToBeMovedSummary {
@@ -407,6 +451,26 @@ public class RebalanceSummaryResult {
public int getTotalOffsetsToCatchUpAcrossAllConsumingSegments() {
return _totalOffsetsToCatchUpAcrossAllConsumingSegments;
}
+
+ @Override
+ public String toString() {
+ return "ConsumingSegmentSummaryPerServer{"
+ + "_numConsumingSegmentsToBeAdded=" +
_numConsumingSegmentsToBeAdded
+ + ", _totalOffsetsToCatchUpAcrossAllConsumingSegments=" +
_totalOffsetsToCatchUpAcrossAllConsumingSegments
+ + '}';
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "ConsumingSegmentToBeMovedSummary{"
+ + "_numConsumingSegmentsToBeMoved=" + _numConsumingSegmentsToBeMoved
+ + ", _numServersGettingConsumingSegmentsAdded=" +
_numServersGettingConsumingSegmentsAdded
+ + ", _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp="
+ + _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp
+ + ", _consumingSegmentsToBeMovedWithOldestAgeInMinutes=" +
_consumingSegmentsToBeMovedWithOldestAgeInMinutes
+ + ", _serverConsumingSegmentSummary=" +
_serverConsumingSegmentSummary
+ + '}';
}
}
@@ -501,6 +565,21 @@ public class RebalanceSummaryResult {
public ConsumingSegmentToBeMovedSummary
getConsumingSegmentToBeMovedSummary() {
return _consumingSegmentToBeMovedSummary;
}
+
+ @Override
+ public String toString() {
+ return "SegmentInfo{"
+ + "_totalSegmentsToBeMoved=" + _totalSegmentsToBeMoved
+ + ", _totalSegmentsToBeDeleted=" + _totalSegmentsToBeDeleted
+ + ", _maxSegmentsAddedToASingleServer=" +
_maxSegmentsAddedToASingleServer
+ + ", _estimatedAverageSegmentSizeInBytes=" +
_estimatedAverageSegmentSizeInBytes
+ + ", _totalEstimatedDataToBeMovedInBytes=" +
_totalEstimatedDataToBeMovedInBytes
+ + ", _replicationFactor=" + _replicationFactor
+ + ", _numSegmentsInSingleReplica=" + _numSegmentsInSingleReplica
+ + ", _numSegmentsAcrossAllReplicas=" + _numSegmentsAcrossAllReplicas
+ + ", _consumingSegmentToBeMovedSummary=" +
_consumingSegmentToBeMovedSummary
+ + '}';
+ }
}
public enum ServerStatus {
@@ -509,4 +588,13 @@ public class RebalanceSummaryResult {
// UNCHANGED if the server status is unchanged as part of rebalance;
ADDED, REMOVED, UNCHANGED
}
+
+ @Override
+ public String toString() {
+ return "RebalanceSummaryResult{"
+ + "_serverInfo=" + _serverInfo
+ + ", _segmentInfo=" + _segmentInfo
+ + ", _tagsInfo=" + _tagsInfo
+ + '}';
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
index 8f8819831e..74badfb7fc 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/assignment/instance/InstanceAssignmentTest.java
@@ -47,11 +47,15 @@ import
org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.CommonConstants.Segment.AssignmentStrategy;
import org.apache.pinot.spi.utils.Enablement;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
+import static org.mockito.ArgumentMatchers.anyLong;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.*;
@@ -59,6 +63,7 @@ public class InstanceAssignmentTest {
private static final String RAW_TABLE_NAME = "myTable";
private static final String TENANT_NAME = "tenant";
private static final String OFFLINE_TAG =
TagNameUtils.getOfflineTagForTenant(TENANT_NAME);
+ private static final String REALTIME_TAG =
TagNameUtils.getRealtimeTagForTenant(TENANT_NAME);
private static final String SERVER_INSTANCE_ID_PREFIX = "Server_localhost_";
private static final String SERVER_INSTANCE_POOL_PREFIX = "_pool_";
private static final String TABLE_NAME_ZERO_HASH_COMPLEMENT = "12";
@@ -499,6 +504,154 @@ public class InstanceAssignmentTest {
assertEquals(instancePartitions.getInstances(9, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
}
+ @Test
+ public void testMinimizeDataMovementImplicitRealtimeTablePartitionSelector()
{
+ int numReplicas = 2;
+ InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1,
true, null);
+ InstanceAssignmentConfig instanceAssignmentConfig =
+ new InstanceAssignmentConfig(new InstanceTagPoolConfig(REALTIME_TAG,
false, 0, null), null,
+ replicaGroupPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
true);
+ TableConfig tableConfig =
+ new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).setServerTenant(TENANT_NAME)
+ .setNumReplicas(numReplicas)
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
+ .build();
+
+ int numInstances = 12;
+ List<InstanceConfig> instanceConfigs = new ArrayList<>(numInstances);
+ for (int i = 0; i < numInstances; i++) {
+ InstanceConfig instanceConfig = new
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(REALTIME_TAG);
+ instanceConfigs.add(instanceConfig);
+ }
+
+ // Start without existing InstancePartitions:
+ // Instances should be assigned to 2 replica-groups in a round-robin
fashion, each with 6 instances. Then these 6
+ // instances should be assigned to 6 partitions, each with 1 instances
+ int numPartitions = 6;
+ StreamMetadataProvider streamMetadataProvider =
mock(StreamMetadataProvider.class);
+
when(streamMetadataProvider.fetchPartitionCount(anyLong())).thenReturn(numPartitions);
+ InstancePartitionSelector instancePartitionSelector =
+ new
ImplicitRealtimeTablePartitionSelector(replicaGroupPartitionConfig,
tableConfig.getTableName(), null, true,
+ streamMetadataProvider);
+ InstanceAssignmentDriver driver = new
InstanceAssignmentDriver(tableConfig);
+ InstancePartitions instancePartitions =
+
driver.getInstancePartitions(InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME),
+ instanceAssignmentConfig, instanceConfigs, null, true,
instancePartitionSelector);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+ // Math.abs("myTable_REALTIME".hashCode()) % 12 = 0
+
+ // [i0, i1, i10, i11, i2, i3, i4, i5, i6, i7, i8, i9]
+ // r0 r1 r0 r1 r0 r1 r0 r1 r0 r1 r0 r1
+ // r0: [i0, i10, i2, i4, i6, i8]
+ // p0 p1 p2 p3 p4 p5
+ //
+ // r1: [i1, i11, i3, i5, i7, i9]
+ // p0 p1 p2 p3 p4 p5
+
+ assertEquals(instancePartitions.getInstances(0, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(0, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ assertEquals(instancePartitions.getInstances(1, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 10));
+ assertEquals(instancePartitions.getInstances(1, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 11));
+ assertEquals(instancePartitions.getInstances(2, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ assertEquals(instancePartitions.getInstances(2, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(3, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(3, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+ assertEquals(instancePartitions.getInstances(4, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 6));
+ assertEquals(instancePartitions.getInstances(4, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
+ assertEquals(instancePartitions.getInstances(5, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 8));
+ assertEquals(instancePartitions.getInstances(5, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 9));
+
+ // Increase the number of partitions from 6 to 9. Expect no data movement
for existing partitions.
+ numPartitions = 9;
+
when(streamMetadataProvider.fetchPartitionCount(anyLong())).thenReturn(numPartitions);
+ instancePartitionSelector = new
ImplicitRealtimeTablePartitionSelector(replicaGroupPartitionConfig,
+ tableConfig.getTableName(), instancePartitions, true,
streamMetadataProvider);
+ instancePartitions = driver.getInstancePartitions(
+
InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME),
instanceAssignmentConfig,
+ instanceConfigs, instancePartitions, true, instancePartitionSelector);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+ // [i0, i1, i10, i11, i2, i3, i4, i5, i6, i7, i8, i9]
+ // r0 r1 r0 r1 r0 r1 r0 r1 r0 r1 r0 r1
+ // r0: [i0, i10, i2, i4, i6, i8]
+ // p0 p1 p2 p3 p4 p5
+ // p6 p7 p8
+ //
+ // r1: [i1, i11, i3, i5, i7, i9]
+ // p0 p1 p2 p3 p4 p5
+ // p6 p7 p8
+
+ assertEquals(instancePartitions.getInstances(0, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(0, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ assertEquals(instancePartitions.getInstances(1, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 10));
+ assertEquals(instancePartitions.getInstances(1, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 11));
+ assertEquals(instancePartitions.getInstances(2, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ assertEquals(instancePartitions.getInstances(2, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(3, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(3, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+ assertEquals(instancePartitions.getInstances(4, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 6));
+ assertEquals(instancePartitions.getInstances(4, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
+ assertEquals(instancePartitions.getInstances(5, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 8));
+ assertEquals(instancePartitions.getInstances(5, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 9));
+ assertEquals(instancePartitions.getInstances(6, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(6, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ assertEquals(instancePartitions.getInstances(7, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 10));
+ assertEquals(instancePartitions.getInstances(7, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 11));
+ assertEquals(instancePartitions.getInstances(8, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ assertEquals(instancePartitions.getInstances(8, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+
+ // Add 6 new instances
+ for (int i = numInstances; i < numInstances + 6; i++) {
+ InstanceConfig instanceConfig = new
InstanceConfig(SERVER_INSTANCE_ID_PREFIX + i);
+ instanceConfig.addTag(REALTIME_TAG);
+ instanceConfigs.add(instanceConfig);
+ }
+
+ instancePartitionSelector = new
ImplicitRealtimeTablePartitionSelector(replicaGroupPartitionConfig,
+ tableConfig.getTableName(), instancePartitions, true,
streamMetadataProvider);
+ instancePartitions = driver.getInstancePartitions(
+
InstancePartitionsType.CONSUMING.getInstancePartitionsName(RAW_TABLE_NAME),
instanceAssignmentConfig,
+ instanceConfigs, instancePartitions, true, instancePartitionSelector);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+ // We're using the minimize data movement based algorithm, so only the new
partitions should be moved to the new
+ // instances, and the existing partition assignments should remain
unchanged.
+ //
+ // [i0, i1, i10, i11, i2, i3, i4, i5, i6, i7, i8, i9]
+ // r0 r1 r0 r1 r0 r1 r0 r1 r0 r1 r0 r1
+ // r0: [i0, i10, i2, i4, i6, i8, i12, i14, i16]
+ // p0 p1 p2 p3 p4 p5 p6 p7 p8
+ //
+ // r1: [i1, i11, i3, i5, i7, i9, i13, i15, i17]
+ // p0 p1 p2 p3 p4 p5 p6 p7 p8
+
+ assertEquals(instancePartitions.getInstances(0, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 0));
+ assertEquals(instancePartitions.getInstances(0, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 1));
+ assertEquals(instancePartitions.getInstances(1, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 10));
+ assertEquals(instancePartitions.getInstances(1, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 11));
+ assertEquals(instancePartitions.getInstances(2, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 2));
+ assertEquals(instancePartitions.getInstances(2, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 3));
+ assertEquals(instancePartitions.getInstances(3, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 4));
+ assertEquals(instancePartitions.getInstances(3, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 5));
+ assertEquals(instancePartitions.getInstances(4, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 6));
+ assertEquals(instancePartitions.getInstances(4, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 7));
+ assertEquals(instancePartitions.getInstances(5, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 8));
+ assertEquals(instancePartitions.getInstances(5, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 9));
+ assertEquals(instancePartitions.getInstances(6, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 12));
+ assertEquals(instancePartitions.getInstances(6, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 13));
+ assertEquals(instancePartitions.getInstances(7, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 14));
+ assertEquals(instancePartitions.getInstances(7, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 15));
+ assertEquals(instancePartitions.getInstances(8, 0),
List.of(SERVER_INSTANCE_ID_PREFIX + 16));
+ assertEquals(instancePartitions.getInstances(8, 1),
List.of(SERVER_INSTANCE_ID_PREFIX + 17));
+ }
+
public void testMirrorServerSetBasedRandom()
throws FileNotFoundException {
testMirrorServerSetBasedRandomInner(10000000);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index a25d178091..64cbff92c0 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -24,8 +24,11 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -76,6 +79,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
private static final int NUM_REPLICAS = 3;
private static final String SEGMENT_NAME_PREFIX = "segment_";
+ private static final String PARTITION_COLUMN = "partitionColumn";
private static final String TIERED_TABLE_NAME = "testTable";
private static final String OFFLINE_TIERED_TABLE_NAME =
TableNameBuilder.OFFLINE.tableNameWithType(TIERED_TABLE_NAME);
@@ -95,15 +99,15 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
addFakeBrokerInstancesToAutoJoinHelixCluster(1, true);
}
- /**
- * Dropping instance from cluster requires waiting for live instance gone
and removing instance related ZNodes, which
- * are not the purpose of the test, so combine different rebalance scenarios
into one test:
- * 1. NO_OP rebalance
- * 2. Add servers and rebalance
- * 3. Migrate to replica-group based segment assignment and rebalance
- * 4. Migrate back to non-replica-group based segment assignment and
rebalance
- * 5. Remove (disable) servers and rebalance
- */
+ ///
+ /// Dropping instance from cluster requires waiting for live instance gone
and removing instance related ZNodes, which
+ /// are not the purpose of the test, so combine different rebalance
scenarios into one test:
+ /// 1. NO_OP rebalance
+ /// 2. Add servers and rebalance
+ /// 3. Migrate to replica-group based segment assignment and rebalance
+ /// 4. Migrate back to non-replica-group based segment assignment and
rebalance
+ /// 5. Remove (disable) servers and rebalance
+ ///
@Test
public void testRebalance()
throws Exception {
@@ -766,6 +770,205 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
}
}
+ @Test
+ public void
testRebalanceWithImplicitRealtimeTablePartitionSelectorAndMinimizeDataMovement()
+ throws Exception {
+ int numServers = 6;
+ int numPartitions = 18;
+ int numReplicas = 2;
+
+ for (int i = 0; i < numServers; i++) {
+ String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ }
+
+ InstanceReplicaGroupPartitionConfig replicaGroupPartitionConfig =
+ new InstanceReplicaGroupPartitionConfig(true, 0, numReplicas, 0, 0, 1,
false, null);
+ InstanceAssignmentConfig instanceAssignmentConfig =
+ new InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(null), false, 0,
null), null,
+ replicaGroupPartitionConfig,
+
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
true);
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplicas)
+ .setRoutingConfig(
+ new RoutingConfig(null, null,
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE, false))
+ .setStreamConfigs(
+
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(numPartitions).getStreamConfigsMap())
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
+ .build();
+
+ // Create the table
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Add the segments
+ int numSegmentsPerPartition = 4;
+ for (int i = 0; i < numPartitions; i++) {
+ for (int j = 0; j < numSegmentsPerPartition; j++) {
+ _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME,
+ SEGMENT_NAME_PREFIX + (i * numSegmentsPerPartition + j),
PARTITION_COLUMN, i), null);
+ }
+ }
+
+ Map<String, Map<String, String>> oldSegmentAssignment =
+
_helixResourceManager.getTableIdealState(REALTIME_TABLE_NAME).getRecord().getMapFields();
+ for (Map.Entry<String, Map<String, String>> entry :
oldSegmentAssignment.entrySet()) {
+ assertEquals(entry.getValue().size(), numReplicas);
+ }
+
+ // Verify that segments are distributed equally across servers
+ Map<String, Integer> numSegmentsPerServer =
getNumSegmentsPerServer(oldSegmentAssignment);
+ for (int i = 0; i < numServers; i++) {
+ String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+ assertTrue(numSegmentsPerServer.containsKey(instanceId));
+ // Total number of segments is numReplicas * numPartitions *
(numSegmentsPerPartition + 1) because of
+ // CONSUMING segments
+ assertEquals(numSegmentsPerServer.get(instanceId),
+ (numReplicas * numPartitions * (numSegmentsPerPartition + 1)) /
numServers);
+ }
+
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, null, null);
+ // Rebalance should return NO_OP status since there has been no change
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+
+ // All servers should be assigned to the table
+ Map<InstancePartitionsType, InstancePartitions> instanceAssignment =
rebalanceResult.getInstanceAssignment();
+ assertEquals(instanceAssignment.size(), 1);
+ InstancePartitions instancePartitions =
instanceAssignment.get(InstancePartitionsType.CONSUMING);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+ // Verify that replica partitions are distributed equally across servers
+ Map<String, Integer> numReplicaPartitionsPerServer =
getNumReplicaPartitionsPerServer(instancePartitions);
+ for (int i = 0; i < numServers; i++) {
+ String instanceId = SERVER_INSTANCE_ID_PREFIX + i;
+ assertTrue(numReplicaPartitionsPerServer.containsKey(instanceId));
+ // Total number of partitions is numReplicas * numPartitions
+ assertEquals(numReplicaPartitionsPerServer.get(instanceId), (numReplicas
* numPartitions) / numServers);
+ }
+
+ // Segment assignment should not change
+ assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
+
+ // Add two new servers
+ int numServersToAdd = 2;
+ Set<String> newServers = new HashSet<>();
+ for (int i = 0; i < numServersToAdd; i++) {
+ String instanceId = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ newServers.add(instanceId);
+ }
+
+ // Check number of segments moved when minimizeDataMovement is not enabled
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setIncludeConsuming(true);
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setMinimizeDataMovement(Enablement.DISABLE);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ // Most of the segments end up being moved when minimizeDataMovement is
not enabled due to the round robin way in
+ // which partitions are assigned to instances (see
InstanceReplicaGroupPartitionSelector)
+
assertEquals(rebalanceResult.getRebalanceSummaryResult().getSegmentInfo().getTotalSegmentsToBeMoved(),
130);
+
+ // Rebalance with reassignInstances and minimizeDataMovement enabled
+ rebalanceConfig.setMinimizeDataMovement(Enablement.ENABLE);
+ rebalanceConfig.setDryRun(false);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ instanceAssignment = rebalanceResult.getInstanceAssignment();
+ assertEquals(instanceAssignment.size(), 1);
+ instancePartitions =
instanceAssignment.get(InstancePartitionsType.CONSUMING);
+ assertEquals(instancePartitions.getNumReplicaGroups(), numReplicas);
+ assertEquals(instancePartitions.getNumPartitions(), numPartitions);
+
+ // Get number of segments moved
+ int numSegmentsMoved = getNumSegmentsMoved(oldSegmentAssignment,
rebalanceResult.getSegmentAssignment());
+ // This number is 130 when using the default partition selector in this
same scenario since more segment partitions
+ // will be moved when the instance partitions don't match the segment
partitions (we're setting numPartitions to
+ // the default value of 0 in the table's instance assignment config).
+ assertEquals(numSegmentsMoved, 30);
+
+ // "Repartition" and add two new partitions
+ int newNumPartitions = 20;
+ tableConfig.getIndexingConfig()
+ .setStreamConfigs(
+
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs(newNumPartitions).getStreamConfigsMap());
+ _helixResourceManager.updateTableConfig(tableConfig);
+
+ // Add segments for the new partitions
+ for (int i = numPartitions; i < newNumPartitions; i++) {
+ for (int j = 0; j < numSegmentsPerPartition; j++) {
+ _helixResourceManager.addNewSegment(REALTIME_TABLE_NAME,
+
SegmentMetadataMockUtils.mockSegmentMetadataWithPartitionInfo(RAW_TABLE_NAME,
+ SEGMENT_NAME_PREFIX + (i * numSegmentsPerPartition + j),
PARTITION_COLUMN, i), null);
+ }
+ }
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+ // Verify that the new partitions are assigned to the new servers. Due to
the minimizeDataMovement algorithm, the
+ // previous rebalance resulted in the older servers having 5 partition
replicas each with the newer ones having 3
+ // partition replicas each.
+ instancePartitions =
rebalanceResult.getInstanceAssignment().get(InstancePartitionsType.CONSUMING);
+ for (int i = numPartitions; i < newNumPartitions; i++) {
+ for (int j = 0; j < numReplicas; j++) {
+ for (String instanceId : instancePartitions.getInstances(i, j)) {
+ assertTrue(newServers.contains(instanceId),
+ "Expected new partition " + i + " to be assigned to a new
server, but found it on " + instanceId);
+ }
+ }
+ }
+
+ _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
+
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ stopAndDropFakeInstance(SERVER_INSTANCE_ID_PREFIX + i);
+ }
+ }
+
+ private Map<String, Integer> getNumSegmentsPerServer(Map<String, Map<String,
String>> segmentAssignment) {
+ Map<String, Integer> numSegmentsPerServer = new HashMap<>();
+ for (Map<String, String> instanceStateMap : segmentAssignment.values()) {
+ for (String instanceId : instanceStateMap.keySet()) {
+ numSegmentsPerServer.merge(instanceId, 1, Integer::sum);
+ }
+ }
+ return numSegmentsPerServer;
+ }
+
+ private Map<String, Integer>
getNumReplicaPartitionsPerServer(InstancePartitions instancePartitions) {
+ Map<String, Integer> numPartitionsPerServer = new HashMap<>();
+ for (int i = 0; i < instancePartitions.getNumReplicaGroups(); i++) {
+ for (int j = 0; j < instancePartitions.getNumPartitions(); j++) {
+ List<String> instances = instancePartitions.getInstances(j, i);
+ for (String instanceId : instances) {
+ numPartitionsPerServer.merge(instanceId, 1, Integer::sum);
+ }
+ }
+ }
+ return numPartitionsPerServer;
+ }
+
+ private int getNumSegmentsMoved(Map<String, Map<String, String>>
oldSegmentAssignment,
+ Map<String, Map<String, String>> newSegmentAssignment) {
+ int numSegmentsMoved = 0;
+ for (Map.Entry<String, Map<String, String>> entry :
newSegmentAssignment.entrySet()) {
+ String segmentName = entry.getKey();
+ Map<String, String> newInstanceStateMap = entry.getValue();
+ Map<String, String> oldInstanceStateMap =
oldSegmentAssignment.get(segmentName);
+ assertEquals(oldInstanceStateMap.size(), newInstanceStateMap.size());
+ Set<String> commonInstances = new
HashSet<>(newInstanceStateMap.keySet());
+ commonInstances.retainAll(oldInstanceStateMap.keySet());
+ numSegmentsMoved += newInstanceStateMap.size() - commonInstances.size();
+ }
+ return numSegmentsMoved;
+ }
+
@Test
public void testRebalanceBatchSizePerServerErrors()
throws Exception {
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
index 9c778908de..2f51fd6250 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.integration.tests;
+import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.net.URL;
import java.util.ArrayList;
@@ -25,6 +26,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.pinot.common.exception.HttpErrorStatusException;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.config.TagNameUtils;
@@ -46,6 +48,7 @@ import org.apache.pinot.spi.config.table.TenantConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceConstraintConfig;
+import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitionConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.data.FieldSpec;
@@ -58,10 +61,7 @@ import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.*;
public class TableRebalanceIntegrationTest extends
BaseHybridClusterIntegrationTest {
@@ -88,6 +88,78 @@ public class TableRebalanceIntegrationTest extends
BaseHybridClusterIntegrationT
+ "?type=" + tableType.toString() + "&" +
getQueryString(rebalanceConfig);
}
+ @Test
+ public void testImplicitRealtimeTableInstanceAssignment() throws Exception {
+ // Instance assignment not configured for the table initially, so
INSTANCE_PARTITIONS should not exist.
+ assertThrows("404", IOException.class,
+ () -> sendGetRequest(getControllerBaseApiUrl() + "/tables/" +
getTableName() + "/instancePartitions"));
+
+ // Update table config with instance assignment config, use
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR to
+ // create partitions in the replica group based on the number of stream
partitions.
+ TableConfig realtimeTableConfig =
getTableConfigBuilder(TableType.REALTIME).build();
+ realtimeTableConfig.setInstanceAssignmentConfigMap(
+ Map.of(InstancePartitionsType.CONSUMING.name(), new
InstanceAssignmentConfig(
+ new
InstanceTagPoolConfig(TagNameUtils.getRealtimeTagForTenant(getServerTenant()),
false, 0, null), null,
+ new InstanceReplicaGroupPartitionConfig(true, 0, 1, 0, 0, 0, true,
null),
+
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR.name(),
true))
+ );
+ updateTableConfig(realtimeTableConfig);
+
+ // Rebalance the table to reassign instances and create the
INSTANCE_PARTITIONS.
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceConfig.setMinAvailableReplicas(-1);
+ rebalanceConfig.setIncludeConsuming(true);
+ sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME));
+
+ // We're using IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR based instance
assignment for this table.
+ // This test verifies that INSTANCE_PARTITIONS is written to ZK after
instance assignment in the rebalance and has
+ // the expected number of partitions.
+
+ TestUtils.waitForCondition(
+ aVoid -> {
+ try {
+ sendGetRequest(getControllerBaseApiUrl() + "/tables/" +
getTableName() + "/instancePartitions");
+ } catch (Exception e) {
+ return false;
+ }
+ return true;
+ }, 10_000, "Expected INSTANCE_PARTITIONS to be created for table after
instance assignment in rebalance"
+ );
+
+ JsonNode instancePartitions = JsonUtils.stringToJsonNode(
+ sendGetRequest(getControllerBaseApiUrl() + "/tables/" + getTableName()
+ "/instancePartitions"));
+
+ assertNotNull(instancePartitions);
+ assertEquals(instancePartitions.size(), 1);
+
+ JsonNode partitionToInstancesMap =
+
instancePartitions.get(InstancePartitionsType.CONSUMING.name()).get("partitionToInstancesMap");
+
+ assertEquals(partitionToInstancesMap.size(), getNumKafkaPartitions()); //
single replica group
+ for (int i = 0; i < getNumKafkaPartitions(); i++) {
+ assertNotNull(partitionToInstancesMap.get(i + "_0")); // partition i,
replica group 0
+ }
+
+ // Reset the table config and rebalance
+ updateTableConfig(getTableConfigBuilder(TableType.REALTIME).build());
+ sendPostRequest(getRebalanceUrl(rebalanceConfig, TableType.REALTIME));
+
+ TestUtils.waitForCondition(
+ aVoid -> {
+ try {
+ sendGetRequest(getControllerBaseApiUrl() + "/tables/" +
getTableName() + "/instancePartitions");
+ } catch (Exception e) {
+ return e.getCause() instanceof HttpErrorStatusException
+ && ((HttpErrorStatusException) e.getCause()).getStatusCode()
== 404;
+ }
+ return false;
+ }, 10_000,
+ "Expected INSTANCE_PARTITIONS to be deleted for table after removing
instance assignment configs and "
+ + "rebalancing"
+ );
+ }
+
@Test
public void testRealtimeRebalancePreCheckMinimizeDataMovement()
throws Exception {
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index decda17d89..c4c08710f9 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -170,6 +170,7 @@ public final class TableConfigUtils {
validateFieldConfigList(tableConfig, schema);
validateInstancePartitionsTypeMapConfig(tableConfig);
validatePartitionedReplicaGroupInstance(tableConfig);
+ validateInstanceAssignmentConfigs(tableConfig);
if (!skipTypes.contains(ValidationType.UPSERT)) {
validateUpsertAndDedupConfig(tableConfig, schema);
validatePartialUpsertStrategies(tableConfig, schema);
@@ -913,6 +914,35 @@ public final class TableConfigUtils {
}
}
+ @VisibleForTesting
+ static void validateInstanceAssignmentConfigs(TableConfig tableConfig) {
+ if (tableConfig.getInstanceAssignmentConfigMap() == null) {
+ return;
+ }
+ for (Map.Entry<String, InstanceAssignmentConfig>
instanceAssignmentConfigMapEntry
+ : tableConfig.getInstanceAssignmentConfigMap().entrySet()) {
+ String instancePartitionsType =
instanceAssignmentConfigMapEntry.getKey();
+ InstanceAssignmentConfig instanceAssignmentConfig =
instanceAssignmentConfigMapEntry.getValue();
+ if (instanceAssignmentConfig.getPartitionSelector()
+ ==
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR)
{
+ Preconditions.checkState(tableConfig.getTableType() ==
TableType.REALTIME,
+ "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used for
REALTIME tables");
+
Preconditions.checkState(InstancePartitionsType.CONSUMING.name().equalsIgnoreCase(instancePartitionsType),
+ "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used for
CONSUMING instance partitions type");
+
Preconditions.checkState(instanceAssignmentConfig.getReplicaGroupPartitionConfig().isReplicaGroupBased(),
+ "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used with
replica group based instance assignment");
+
Preconditions.checkState(instanceAssignmentConfig.getReplicaGroupPartitionConfig().getNumPartitions()
== 0,
+ "numPartitions should not be explicitly set when using
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR");
+ // Allow 0 because that's the default (unset) value.
+ Preconditions.checkState(
+
instanceAssignmentConfig.getReplicaGroupPartitionConfig().getNumInstancesPerPartition()
== 0
+ ||
instanceAssignmentConfig.getReplicaGroupPartitionConfig().getNumInstancesPerPartition()
== 1,
+ "numInstancesPerPartition must be 1 when using
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR");
+ }
+ // TODO: Add more validations for other partition selectors here
+ }
+ }
+
/**
* Validates metrics aggregation when upsert config is enabled
* - Metrics aggregation cannot be enabled when Upsert Config is enabled.
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index 5d51854392..10eab60abf 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -76,7 +76,10 @@ import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
+import static org.mockito.Mockito.when;
import static org.testng.Assert.assertThrows;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.expectThrows;
/**
@@ -2570,6 +2573,74 @@ public class TableConfigUtilsTest {
}
}
+ @Test
+ public void testValidateImplicitRealtimeTablePartitionSelectorConfigs() {
+ InstanceAssignmentConfig instanceAssignmentConfig =
Mockito.mock(InstanceAssignmentConfig.class);
+ when(instanceAssignmentConfig.getPartitionSelector()).thenReturn(
+
InstanceAssignmentConfig.PartitionSelector.IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR);
+
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.OFFLINE).setTableName(TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
+ .build();
+ IllegalStateException e = expectThrows(IllegalStateException.class,
+ () -> TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig));
+ assertTrue(
+ e.getMessage().contains("IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR
can only be used for REALTIME tables"));
+
+ InstanceReplicaGroupPartitionConfig instanceReplicaGroupPartitionConfig =
+ Mockito.mock(InstanceReplicaGroupPartitionConfig.class);
+
when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(true);
+
when(instanceAssignmentConfig.getReplicaGroupPartitionConfig()).thenReturn(instanceReplicaGroupPartitionConfig);
+ TableConfig tableConfig2 = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.COMPLETED.name(),
instanceAssignmentConfig))
+ .build();
+ e = expectThrows(IllegalStateException.class,
+ () ->
TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig2));
+ assertTrue(e.getMessage()
+ .contains(
+ "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used for
CONSUMING instance partitions type"));
+
+
when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(false);
+
when(instanceAssignmentConfig.getReplicaGroupPartitionConfig()).thenReturn(instanceReplicaGroupPartitionConfig);
+ TableConfig tableConfig3 = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
+ .build();
+ e = expectThrows(IllegalStateException.class,
+ () ->
TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig3));
+ assertTrue(e.getMessage()
+ .contains(
+ "IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR can only be used with
replica group based instance "
+ + "assignment"));
+
+
when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(true);
+ when(instanceReplicaGroupPartitionConfig.getNumPartitions()).thenReturn(1);
+ TableConfig tableConfig4 = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
+ .build();
+ e = expectThrows(IllegalStateException.class,
+ () ->
TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig4));
+ assertTrue(e.getMessage()
+ .contains("numPartitions should not be explicitly set when using
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR"));
+
+
when(instanceReplicaGroupPartitionConfig.isReplicaGroupBased()).thenReturn(true);
+ when(instanceReplicaGroupPartitionConfig.getNumPartitions()).thenReturn(0);
+
when(instanceReplicaGroupPartitionConfig.getNumInstancesPerPartition()).thenReturn(2);
+ TableConfig tableConfig5 = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
+ .build();
+ e = expectThrows(IllegalStateException.class,
+ () ->
TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig5));
+ assertTrue(e.getMessage()
+ .contains("numInstancesPerPartition must be 1 when using
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR"));
+
+ when(instanceReplicaGroupPartitionConfig.getNumPartitions()).thenReturn(0);
+
when(instanceReplicaGroupPartitionConfig.getNumInstancesPerPartition()).thenReturn(0);
+ TableConfig tableConfig6 = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+
.setInstanceAssignmentConfigMap(Map.of(InstancePartitionsType.CONSUMING.name(),
instanceAssignmentConfig))
+ .build();
+ TableConfigUtils.validateInstanceAssignmentConfigs(tableConfig6);
+ }
+
private Map<String, String> getStreamConfigs() {
Map<String, String> streamConfigs = new HashMap<>();
streamConfigs.put("streamType", "kafka");
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
index ad4b22ecaf..973cf368c7 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/assignment/InstanceAssignmentConfig.java
@@ -87,6 +87,6 @@ public class InstanceAssignmentConfig extends BaseJsonConfig {
public enum PartitionSelector {
FD_AWARE_INSTANCE_PARTITION_SELECTOR,
INSTANCE_REPLICA_GROUP_PARTITION_SELECTOR,
- MIRROR_SERVER_SET_PARTITION_SELECTOR
+ MIRROR_SERVER_SET_PARTITION_SELECTOR,
IMPLICIT_REALTIME_TABLE_PARTITION_SELECTOR;
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]