This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 3b80705f89 Fix bug in colocated tenant creation (#10098)
3b80705f89 is described below
commit 3b80705f8963f9e957ab28ed3a26bdc0eb14230d
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Wed Jan 11 11:21:46 2023 -0800
Fix bug in colocated tenant creation (#10098)
---
.../helix/core/PinotHelixResourceManager.java | 74 ++++++++++------------
.../PinotHelixResourceManagerStatelessTest.java | 41 ++++++++++++
2 files changed, 76 insertions(+), 39 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index efb209795e..bca97f48c1 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -990,8 +990,16 @@ public class PinotHelixResourceManager {
}
private void retagInstance(String instanceName, String oldTag, String
newTag) {
- _helixAdmin.removeInstanceTag(_helixClusterName, instanceName, oldTag);
- _helixAdmin.addInstanceTag(_helixClusterName, instanceName, newTag);
+ PropertyKey instanceConfigKey = _keyBuilder.instanceConfig(instanceName);
+ InstanceConfig instanceConfig =
_helixDataAccessor.getProperty(instanceConfigKey);
+ if (instanceConfig == null) {
+ throw new NotFoundException("Failed to find instance config for
instance: " + instanceName);
+ }
+ instanceConfig.removeTag(oldTag);
+ instanceConfig.addTag(newTag);
+ if (!_helixDataAccessor.setProperty(instanceConfigKey, instanceConfig)) {
+ throw new RuntimeException("Failed to set instance config for instance:
" + instanceName);
+ }
}
public PinotResourceManagerResponse updateServerTenant(Tenant serverTenant) {
@@ -1162,51 +1170,39 @@ public class PinotHelixResourceManager {
}
public PinotResourceManagerResponse createServerTenant(Tenant serverTenant) {
- int numberOfInstances = serverTenant.getNumberOfInstances();
- List<String> unTaggedInstanceList = getOnlineUnTaggedServerInstanceList();
- if (unTaggedInstanceList.size() < numberOfInstances) {
+ int numInstances = serverTenant.getNumberOfInstances();
+ int numOfflineInstances = serverTenant.getOfflineInstances();
+ int numRealtimeInstances = serverTenant.getRealtimeInstances();
+ if (numInstances < numOfflineInstances || numInstances <
numRealtimeInstances) {
+ throw new BadRequestException(
+ String.format("Cannot request more offline instances: %d or realtime
instances: %d than total instances: %d",
+ numOfflineInstances, numRealtimeInstances, numInstances));
+ }
+ // TODO: Consider throwing BadRequestException
+ List<String> untaggedInstances = getOnlineUnTaggedServerInstanceList();
+ if (untaggedInstances.size() < numInstances) {
String message = "Failed to allocate server instances to Tag : " +
serverTenant.getTenantName()
- + ", Current number of untagged server instances : " +
unTaggedInstanceList.size()
+ + ", Current number of untagged server instances : " +
untaggedInstances.size()
+ ", Request asked number is : " +
serverTenant.getNumberOfInstances();
LOGGER.error(message);
return PinotResourceManagerResponse.failure(message);
- } else {
- if (serverTenant.isCoLocated()) {
- assignColocatedServerTenant(serverTenant, numberOfInstances,
unTaggedInstanceList);
- } else {
- assignIndependentServerTenant(serverTenant, numberOfInstances,
unTaggedInstanceList);
- }
}
- return PinotResourceManagerResponse.SUCCESS;
- }
-
- private void assignIndependentServerTenant(Tenant serverTenant, int
numberOfInstances,
- List<String> unTaggedInstanceList) {
- String offlineServerTag =
TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName());
- for (int i = 0; i < serverTenant.getOfflineInstances(); i++) {
- retagInstance(unTaggedInstanceList.get(i),
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
- }
- String realtimeServerTag =
TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName());
- for (int i = 0; i < serverTenant.getRealtimeInstances(); i++) {
- retagInstance(unTaggedInstanceList.get(i +
serverTenant.getOfflineInstances()), Helix.UNTAGGED_SERVER_INSTANCE,
- realtimeServerTag);
- }
- }
-
- private void assignColocatedServerTenant(Tenant serverTenant, int
numberOfInstances,
- List<String> unTaggedInstanceList) {
- int cnt = 0;
- String offlineServerTag =
TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName());
- for (int i = 0; i < serverTenant.getOfflineInstances(); i++) {
- retagInstance(unTaggedInstanceList.get(cnt++),
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+ int index = 0;
+ if (numOfflineInstances > 0) {
+ String offlineServerTag =
TagNameUtils.getOfflineTagForTenant(serverTenant.getTenantName());
+ for (int i = 0; i < numOfflineInstances; i++) {
+ retagInstance(untaggedInstances.get(index),
Helix.UNTAGGED_SERVER_INSTANCE, offlineServerTag);
+ index = (index + 1) % numInstances;
+ }
}
- String realtimeServerTag =
TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName());
- for (int i = 0; i < serverTenant.getRealtimeInstances(); i++) {
- retagInstance(unTaggedInstanceList.get(cnt++),
Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
- if (cnt == numberOfInstances) {
- cnt = 0;
+ if (numRealtimeInstances > 0) {
+ String realtimeServerTag =
TagNameUtils.getRealtimeTagForTenant(serverTenant.getTenantName());
+ for (int i = 0; i < numRealtimeInstances; i++) {
+ retagInstance(untaggedInstances.get(index),
Helix.UNTAGGED_SERVER_INSTANCE, realtimeServerTag);
+ index = (index + 1) % numInstances;
}
}
+ return PinotResourceManagerResponse.SUCCESS;
}
public PinotResourceManagerResponse createBrokerTenant(Tenant brokerTenant) {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index 6569a6a4be..3e66b5c6ac 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -588,6 +588,47 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
resetServerTags();
}
+ @Test
+ public void testCreateColocatedTenant() {
+ untagServers();
+ Tenant serverTenant = new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME,
NUM_SERVER_INSTANCES, NUM_SERVER_INSTANCES,
+ NUM_SERVER_INSTANCES);
+
assertTrue(_helixResourceManager.createServerTenant(serverTenant).isSuccessful());
+ assertEquals(
+
_helixResourceManager.getInstancesWithTag(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME)).size(),
+ NUM_SERVER_INSTANCES);
+ assertEquals(
+
_helixResourceManager.getInstancesWithTag(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME)).size(),
+ NUM_SERVER_INSTANCES);
+
assertTrue(_helixResourceManager.getOnlineUnTaggedServerInstanceList().isEmpty());
+
+ untagServers();
+ serverTenant = new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME,
NUM_SERVER_INSTANCES, NUM_SERVER_INSTANCES - 1,
+ NUM_SERVER_INSTANCES - 1);
+
assertTrue(_helixResourceManager.createServerTenant(serverTenant).isSuccessful());
+ assertEquals(
+
_helixResourceManager.getInstancesWithTag(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME)).size(),
+ NUM_SERVER_INSTANCES - 1);
+ assertEquals(
+
_helixResourceManager.getInstancesWithTag(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME)).size(),
+ NUM_SERVER_INSTANCES - 1);
+
assertTrue(_helixResourceManager.getOnlineUnTaggedServerInstanceList().isEmpty());
+
+ untagServers();
+ serverTenant = new Tenant(TenantRole.SERVER, SERVER_TENANT_NAME,
NUM_SERVER_INSTANCES - 1, NUM_SERVER_INSTANCES - 1,
+ NUM_SERVER_INSTANCES - 1);
+
assertTrue(_helixResourceManager.createServerTenant(serverTenant).isSuccessful());
+ assertEquals(
+
_helixResourceManager.getInstancesWithTag(TagNameUtils.getOfflineTagForTenant(SERVER_TENANT_NAME)).size(),
+ NUM_SERVER_INSTANCES - 1);
+ assertEquals(
+
_helixResourceManager.getInstancesWithTag(TagNameUtils.getRealtimeTagForTenant(SERVER_TENANT_NAME)).size(),
+ NUM_SERVER_INSTANCES - 1);
+
assertEquals(_helixResourceManager.getOnlineUnTaggedServerInstanceList().size(),
1);
+
+ resetServerTags();
+ }
+
@Test
public void testLeadControllerResource() {
IdealState leadControllerResourceIdealState =
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]