This is an automated email from the ASF dual-hosted git repository.
yashmayya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new c7af34e7aa Reject rebalance requests for tables that already have an
in progress rebalance job (#15990)
c7af34e7aa is described below
commit c7af34e7aae0ec0b0772ef9d0842d5970ad491f0
Author: Yash Mayya <[email protected]>
AuthorDate: Tue Jun 10 17:53:46 2025 +0100
Reject rebalance requests for tables that already have an in progress
rebalance job (#15990)
---
.../exception/RebalanceInProgressException.java | 33 +++
.../pinot/controller/BaseControllerStarter.java | 47 +++-
.../api/resources/PinotRealtimeTableResource.java | 2 +-
.../api/resources/PinotTableRestletResource.java | 89 ++----
.../helix/core/PinotHelixResourceManager.java | 142 +---------
.../helix/core/rebalance/RebalanceChecker.java | 47 ++--
.../core/rebalance/TableRebalanceManager.java | 307 +++++++++++++++++++++
.../helix/core/rebalance/TableRebalancer.java | 7 +-
.../rebalance/ZkBasedTableRebalanceObserver.java | 16 +-
.../rebalance/tenant/DefaultTenantRebalancer.java | 19 +-
.../helix/core/relocation/SegmentRelocator.java | 19 +-
.../helix/core/util/ControllerZkHelixUtils.java | 111 ++++++++
.../pinot/controller/helix/ControllerTest.java | 25 ++
.../helix/core/rebalance/RebalanceCheckerTest.java | 31 ++-
.../TableRebalancerClusterStatelessTest.java | 19 +-
.../TestZkBasedTableRebalanceObserver.java | 19 +-
.../rebalance/tenant/TenantRebalancerTest.java | 3 +-
.../core/relocation/SegmentRelocatorTest.java | 11 +-
.../tests/TableRebalanceIntegrationTest.java | 58 ++++
.../apache/pinot/tools/PinotTableRebalancer.java | 22 +-
20 files changed, 732 insertions(+), 295 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/exception/RebalanceInProgressException.java
b/pinot-common/src/main/java/org/apache/pinot/common/exception/RebalanceInProgressException.java
new file mode 100644
index 0000000000..1b14e31394
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/exception/RebalanceInProgressException.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.exception;
+
+/**
+ * Exception thrown when a rebalance operation is attempted while another
rebalance is already in progress for the same
+ * table. This helps to prevent concurrent table rebalances.
+ */
+public class RebalanceInProgressException extends Exception {
+ public RebalanceInProgressException(String message) {
+ super(message);
+ }
+
+ public RebalanceInProgressException(String message, Throwable cause) {
+ super(message, cause);
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 6b9b88f88e..545ddd95aa 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -101,6 +101,9 @@ import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentMa
import org.apache.pinot.controller.helix.core.realtime.SegmentCompletionConfig;
import
org.apache.pinot.controller.helix.core.realtime.SegmentCompletionManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceChecker;
+import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker;
+import
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.DefaultTenantRebalancer;
import
org.apache.pinot.controller.helix.core.rebalance.tenant.TenantRebalancer;
import org.apache.pinot.controller.helix.core.relocation.SegmentRelocator;
@@ -203,11 +206,15 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
protected TaskMetricsEmitter _taskMetricsEmitter;
protected PoolingHttpClientConnectionManager _connectionManager;
protected TenantRebalancer _tenantRebalancer;
- protected ExecutorService _tenantRebalanceExecutorService;
+ // This executor should be used by all code paths for user initiated
rebalances, so that the controller config
+ // CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS is honored.
+ protected ExecutorService _rebalancerExecutorService;
protected TableSizeReader _tableSizeReader;
protected StorageQuotaChecker _storageQuotaChecker;
protected DiskUtilizationChecker _diskUtilizationChecker;
protected ResourceUtilizationManager _resourceUtilizationManager;
+ protected RebalancePreChecker _rebalancePreChecker;
+ protected TableRebalanceManager _tableRebalanceManager;
@Override
public void init(PinotConfiguration pinotConfiguration)
@@ -264,9 +271,6 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
// Do not use this before the invocation of {@link
PinotHelixResourceManager::start()}, which happens in {@link
// ControllerStarter::start()}
_helixResourceManager = createHelixResourceManager();
- _tenantRebalanceExecutorService =
createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
- "tenant-rebalance-thread-%d");
- _tenantRebalancer = new DefaultTenantRebalancer(_helixResourceManager,
_tenantRebalanceExecutorService);
}
// Initialize the table config tuner registry.
@@ -347,7 +351,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
* @return A new instance of PinotHelixResourceManager.
*/
protected PinotHelixResourceManager createHelixResourceManager() {
- return new PinotHelixResourceManager(_config, _executorService);
+ return new PinotHelixResourceManager(_config);
}
public PinotHelixResourceManager getHelixResourceManager() {
@@ -389,6 +393,14 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
return _staleInstancesCleanupTask;
}
+ public TableRebalanceManager getTableRebalanceManager() {
+ return _tableRebalanceManager;
+ }
+
+ public TableSizeReader getTableSizeReader() {
+ return _tableSizeReader;
+ }
+
@Override
public ServiceRole getServiceRole() {
return ServiceRole.CONTROLLER;
@@ -536,12 +548,20 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
_tableSizeReader =
new TableSizeReader(_executorService, _connectionManager,
_controllerMetrics, _helixResourceManager,
_leadControllerManager);
- _helixResourceManager.registerTableSizeReader(_tableSizeReader);
_storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader,
_controllerMetrics, _leadControllerManager,
_helixResourceManager, _config);
_diskUtilizationChecker = new
DiskUtilizationChecker(_helixResourceManager, _config);
_resourceUtilizationManager = new ResourceUtilizationManager(_config,
_diskUtilizationChecker);
+ _rebalancePreChecker =
RebalancePreCheckerFactory.create(_config.getRebalancePreCheckerClass());
+ _rebalancePreChecker.init(_helixResourceManager, _executorService,
_config.getDiskUtilizationThreshold());
+ _rebalancerExecutorService =
createExecutorService(_config.getControllerExecutorRebalanceNumThreads(),
+ "rebalance-thread-%d");
+ _tableRebalanceManager =
+ new TableRebalanceManager(_helixResourceManager, _controllerMetrics,
_rebalancePreChecker, _tableSizeReader,
+ _rebalancerExecutorService);
+ _tenantRebalancer =
+ new DefaultTenantRebalancer(_tableRebalanceManager,
_helixResourceManager, _rebalancerExecutorService);
// Setting up periodic tasks
List<PeriodicTask> controllerPeriodicTasks =
setupControllerPeriodicTasks();
@@ -579,6 +599,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
bind(_helixParticipantInstanceId).named(CONTROLLER_INSTANCE_ID);
bind(_helixResourceManager).to(PinotHelixResourceManager.class);
bind(_helixTaskResourceManager).to(PinotHelixTaskResourceManager.class);
+ bind(_tableRebalanceManager).to(TableRebalanceManager.class);
bind(_segmentCompletionManager).to(SegmentCompletionManager.class);
bind(_taskManager).to(PinotTaskManager.class);
bind(_taskManagerStatusCache).to(TaskManagerStatusCache.class);
@@ -857,15 +878,17 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
new SegmentStatusChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
_tableSizeReader);
periodicTasks.add(_segmentStatusChecker);
- _rebalanceChecker = new RebalanceChecker(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- _executorService);
+ _rebalanceChecker =
+ new RebalanceChecker(_tableRebalanceManager, _helixResourceManager,
_leadControllerManager, _config,
+ _controllerMetrics);
periodicTasks.add(_rebalanceChecker);
_realtimeConsumerMonitor =
new RealtimeConsumerMonitor(_config, _helixResourceManager,
_leadControllerManager, _controllerMetrics,
_executorService);
periodicTasks.add(_realtimeConsumerMonitor);
- _segmentRelocator = new SegmentRelocator(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics,
- _executorService, _connectionManager);
+ _segmentRelocator =
+ new SegmentRelocator(_tableRebalanceManager, _helixResourceManager,
_leadControllerManager, _config,
+ _controllerMetrics, _executorService, _connectionManager);
periodicTasks.add(_segmentRelocator);
_staleInstancesCleanupTask =
new StaleInstancesCleanupTask(_helixResourceManager,
_leadControllerManager, _config, _controllerMetrics);
@@ -965,8 +988,8 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
LOGGER.info("Shutting down executor service");
_executorService.shutdownNow();
_executorService.awaitTermination(10L, TimeUnit.SECONDS);
- _tenantRebalanceExecutorService.shutdownNow();
- _tenantRebalanceExecutorService.awaitTermination(10L, TimeUnit.SECONDS);
+ _rebalancerExecutorService.shutdownNow();
+ _rebalancerExecutorService.awaitTermination(10L, TimeUnit.SECONDS);
} catch (final Exception e) {
LOGGER.error("Caught exception while shutting down", e);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
index 595169ce44..ca6cd35bc0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotRealtimeTableResource.java
@@ -259,7 +259,7 @@ public class PinotRealtimeTableResource {
controllerJobZKMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
JsonUtils.objectToString(segmentsYetToBeCommitted));
_pinotHelixResourceManager.addControllerJobToZK(forceCommitJobId,
controllerJobZKMetadata,
- ControllerJobType.FORCE_COMMIT, prev -> true);
+ ControllerJobType.FORCE_COMMIT);
}
Map<String, Object> result = new HashMap<>(controllerJobZKMetadata);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 30d8401165..bdb82ce766 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -45,8 +45,8 @@ import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -73,6 +73,7 @@ import org.apache.helix.model.IdealState;
import org.apache.helix.store.zk.ZkHelixPropertyStore;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.exception.RebalanceInProgressException;
import org.apache.pinot.common.exception.SchemaNotFoundException;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
@@ -97,10 +98,8 @@ import
org.apache.pinot.controller.helix.core.PinotResourceManagerResponse;
import
org.apache.pinot.controller.helix.core.minion.PinotHelixTaskResourceManager;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
-import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
-import
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.recommender.RecommenderDriver;
import org.apache.pinot.controller.tuner.TableConfigTunerUtils;
@@ -171,6 +170,9 @@ public class PinotTableRestletResource {
@Inject
PinotHelixResourceManager _pinotHelixResourceManager;
+ @Inject
+ TableRebalanceManager _tableRebalanceManager;
+
@Inject
PinotHelixTaskResourceManager _pinotHelixTaskResourceManager;
@@ -183,9 +185,6 @@ public class PinotTableRestletResource {
@Inject
ControllerMetrics _controllerMetrics;
- @Inject
- ExecutorService _executorService;
-
@Inject
AccessControlFactory _accessControlFactory;
@@ -695,31 +694,28 @@ public class PinotTableRestletResource {
try {
if (dryRun || preChecks || downtime) {
- // For dry-run, preChecks or rebalance with downtime, directly return
the rebalance result as it should return
- // immediately
- return _pinotHelixResourceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, rebalanceJobId, false);
+ // For dry-run, preChecks or rebalance with downtime, it's fine to run
the rebalance synchronously since it
+ // should be a really short operation.
+ return _tableRebalanceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, rebalanceJobId, false);
} else {
// Make a dry-run first to get the target assignment
rebalanceConfig.setDryRun(true);
RebalanceResult dryRunResult =
- _pinotHelixResourceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, rebalanceJobId, false);
+ _tableRebalanceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, rebalanceJobId, false);
if (dryRunResult.getStatus() == RebalanceResult.Status.DONE) {
// If dry-run succeeded, run rebalance asynchronously
rebalanceConfig.setDryRun(false);
- Future<RebalanceResult> rebalanceResultFuture =
_executorService.submit(() -> {
- try {
- return _pinotHelixResourceManager.rebalanceTable(
- tableNameWithType, rebalanceConfig, rebalanceJobId, true);
- } catch (Throwable t) {
+ CompletableFuture<RebalanceResult> rebalanceResultFuture =
+ _tableRebalanceManager.rebalanceTableAsync(tableNameWithType,
rebalanceConfig, rebalanceJobId, true);
+ rebalanceResultFuture.whenComplete((rebalanceResult, throwable) -> {
+ if (throwable != null) {
String errorMsg = String.format("Caught exception/error while
rebalancing table: %s", tableNameWithType);
- LOGGER.error(errorMsg, t);
- return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, errorMsg, null, null, null,
- null, null);
+ LOGGER.error(errorMsg, throwable);
}
});
- boolean isJobIdPersisted = waitForRebalanceToPersist(
- dryRunResult.getJobId(), tableNameWithType,
rebalanceResultFuture);
+ boolean isJobIdPersisted =
+ waitForRebalanceToPersist(dryRunResult.getJobId(),
tableNameWithType, rebalanceResultFuture);
if (rebalanceResultFuture.isDone()) {
try {
@@ -744,6 +740,8 @@ public class PinotTableRestletResource {
}
} catch (TableNotFoundException e) {
throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.NOT_FOUND);
+ } catch (RebalanceInProgressException e) {
+ throw new ControllerApplicationException(LOGGER, e.getMessage(),
Response.Status.CONFLICT);
}
}
@@ -783,29 +781,7 @@ public class PinotTableRestletResource {
@Context HttpHeaders headers) {
tableName = DatabaseUtils.translateTableName(tableName, headers);
String tableNameWithType = constructTableNameWithType(tableName,
tableTypeStr);
- List<String> cancelledJobIds = new ArrayList<>();
- boolean updated =
- _pinotHelixResourceManager.updateJobsForTable(tableNameWithType,
ControllerJobType.TABLE_REBALANCE,
- jobMetadata -> {
- String jobId =
jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
- try {
- String jobStatsInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
- TableRebalanceProgressStats jobStats =
- JsonUtils.stringToObject(jobStatsInStr,
TableRebalanceProgressStats.class);
- if (jobStats.getStatus() !=
RebalanceResult.Status.IN_PROGRESS) {
- return;
- }
- cancelledJobIds.add(jobId);
- LOGGER.info("Cancel rebalance job: {} for table: {}", jobId,
tableNameWithType);
- jobStats.setStatus(RebalanceResult.Status.CANCELLED);
-
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
- JsonUtils.objectToString(jobStats));
- } catch (Exception e) {
- LOGGER.error("Failed to cancel rebalance job: {} for table:
{}", jobId, tableNameWithType, e);
- }
- });
- LOGGER.info("Tried to cancel existing jobs at best effort and done: {}",
updated);
- return cancelledJobIds;
+ return _tableRebalanceManager.cancelRebalance(tableNameWithType);
}
@GET
@@ -818,30 +794,7 @@ public class PinotTableRestletResource {
public ServerRebalanceJobStatusResponse rebalanceStatus(
@ApiParam(value = "Rebalance Job Id", required = true)
@PathParam("jobId") String jobId)
throws JsonProcessingException {
- Map<String, String> controllerJobZKMetadata =
getControllerJobMetadata(jobId);
-
- if (controllerJobZKMetadata == null) {
- throw new ControllerApplicationException(LOGGER, "Failed to find
controller job id: " + jobId,
- Response.Status.NOT_FOUND);
- }
- ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new
ServerRebalanceJobStatusResponse();
- TableRebalanceProgressStats tableRebalanceProgressStats =
JsonUtils.stringToObject(
-
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
- TableRebalanceProgressStats.class);
-
serverRebalanceJobStatusResponse.setTableRebalanceProgressStats(tableRebalanceProgressStats);
-
- long timeSinceStartInSecs = 0L;
- if (RebalanceResult.Status.DONE !=
tableRebalanceProgressStats.getStatus()) {
- timeSinceStartInSecs = (System.currentTimeMillis() -
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
- }
-
serverRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs);
-
- String jobCtxInStr =
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
- if (StringUtils.isNotEmpty(jobCtxInStr)) {
- TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr,
TableRebalanceContext.class);
- serverRebalanceJobStatusResponse.setTableRebalanceContext(jobCtx);
- }
- return serverRebalanceJobStatusResponse;
+ return _tableRebalanceManager.getRebalanceStatus(jobId);
}
@GET
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 21894be117..b097ecd05b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -48,7 +48,6 @@ import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.UUID;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -154,15 +153,8 @@ import
org.apache.pinot.controller.helix.core.lineage.LineageManager;
import org.apache.pinot.controller.helix.core.lineage.LineageManagerFactory;
import org.apache.pinot.controller.helix.core.minion.PinotTaskManager;
import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
-import org.apache.pinot.controller.helix.core.rebalance.RebalancePreChecker;
-import
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerFactory;
-import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
-import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
-import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
-import
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
+import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
import org.apache.pinot.controller.helix.starter.HelixConfig;
-import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.instance.Instance;
@@ -245,13 +237,10 @@ public class PinotHelixResourceManager {
private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
private TableCache _tableCache;
private final LineageManager _lineageManager;
- private final RebalancePreChecker _rebalancePreChecker;
- private TableSizeReader _tableSizeReader;
public PinotHelixResourceManager(String zkURL, String helixClusterName,
@Nullable String dataDir,
boolean isSingleTenantCluster, boolean enableBatchMessageMode, int
deletedSegmentsRetentionInDays,
- boolean enableTieredSegmentAssignment, LineageManager lineageManager,
RebalancePreChecker rebalancePreChecker,
- @Nullable ExecutorService executorService, double
diskUtilizationThreshold) {
+ boolean enableTieredSegmentAssignment, LineageManager lineageManager) {
_helixZkURL = HelixConfig.getAbsoluteZkPathForHelix(zkURL);
_helixClusterName = helixClusterName;
_dataDir = dataDir;
@@ -274,26 +263,13 @@ public class PinotHelixResourceManager {
_lineageUpdaterLocks[i] = new Object();
}
_lineageManager = lineageManager;
- _rebalancePreChecker = rebalancePreChecker;
- _rebalancePreChecker.init(this, executorService, diskUtilizationThreshold);
- }
-
- public PinotHelixResourceManager(ControllerConf controllerConf, @Nullable
ExecutorService executorService) {
- this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
controllerConf.getDataDir(),
- controllerConf.tenantIsolationEnabled(),
controllerConf.getEnableBatchMessageMode(),
- controllerConf.getDeletedSegmentsRetentionInDays(),
controllerConf.tieredSegmentAssignmentEnabled(),
- LineageManagerFactory.create(controllerConf),
-
RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()),
executorService,
- controllerConf.getRebalanceDiskUtilizationThreshold());
}
public PinotHelixResourceManager(ControllerConf controllerConf) {
this(controllerConf.getZkStr(), controllerConf.getHelixClusterName(),
controllerConf.getDataDir(),
controllerConf.tenantIsolationEnabled(),
controllerConf.getEnableBatchMessageMode(),
controllerConf.getDeletedSegmentsRetentionInDays(),
controllerConf.tieredSegmentAssignmentEnabled(),
- LineageManagerFactory.create(controllerConf),
-
RebalancePreCheckerFactory.create(controllerConf.getRebalancePreCheckerClass()),
null,
- controllerConf.getRebalanceDiskUtilizationThreshold());
+ LineageManagerFactory.create(controllerConf));
}
/**
@@ -447,24 +423,6 @@ public class PinotHelixResourceManager {
return _lineageManager;
}
- /**
- * Get the rebalance pre-checker
- *
- * @return rebalance pre-checker
- */
- public RebalancePreChecker getRebalancePreChecker() {
- return _rebalancePreChecker;
- }
-
- /**
- * Get the table size reader.
- *
- * @return table size reader
- */
- public TableSizeReader getTableSizeReader() {
- return _tableSizeReader;
- }
-
/**
* Instance related APIs
*/
@@ -2059,10 +2017,6 @@ public class PinotHelixResourceManager {
_pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
}
- public void registerTableSizeReader(TableSizeReader tableSizeReader) {
- _tableSizeReader = tableSizeReader;
- }
-
private void assignInstances(TableConfig tableConfig, boolean override) {
String tableNameWithType = tableConfig.getTableName();
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
@@ -2440,25 +2394,7 @@ public class PinotHelixResourceManager {
*/
public Map<String, Map<String, String>> getAllJobs(Set<String> jobTypes,
Predicate<Map<String, String>> jobMetadataChecker) {
- Map<String, Map<String, String>> controllerJobs = new HashMap<>();
- for (String jobType : jobTypes) {
- String jobResourcePath =
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
- ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, null,
AccessOption.PERSISTENT);
- if (jobsZnRecord == null) {
- continue;
- }
- Map<String, Map<String, String>> jobMetadataMap =
jobsZnRecord.getMapFields();
- for (Map.Entry<String, Map<String, String>> jobMetadataEntry :
jobMetadataMap.entrySet()) {
- String jobId = jobMetadataEntry.getKey();
- Map<String, String> jobMetadata = jobMetadataEntry.getValue();
-
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType),
- "Got unexpected jobType: %s at jobResourcePath: %s with jobId:
%s", jobType, jobResourcePath, jobId);
- if (jobMetadataChecker.test(jobMetadata)) {
- controllerJobs.put(jobId, jobMetadata);
- }
- }
- }
- return controllerJobs;
+ return ControllerZkHelixUtils.getAllControllerJobs(jobTypes,
jobMetadataChecker, _propertyStore);
}
/**
@@ -2538,37 +2474,12 @@ public class PinotHelixResourceManager {
* @param jobId job's UUID
* @param jobMetadata the job metadata
* @param jobType the type of the job to figure out where job metadata is
kept in ZK
- * @param prevJobMetadataChecker to check the previous job metadata before
adding new one
* @return boolean representing success / failure of the ZK write step
*/
public boolean addControllerJobToZK(String jobId, Map<String, String>
jobMetadata, String jobType,
Predicate<Map<String, String>> prevJobMetadataChecker) {
-
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)
!= null,
- "Submission Time in JobMetadata record not set. Cannot expire these
records");
- String jobResourcePath =
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
- Stat stat = new Stat();
- ZNRecord jobsZnRecord = _propertyStore.get(jobResourcePath, stat,
AccessOption.PERSISTENT);
- if (jobsZnRecord != null) {
- Map<String, Map<String, String>> jobMetadataMap =
jobsZnRecord.getMapFields();
- Map<String, String> prevJobMetadata = jobMetadataMap.get(jobId);
- if (!prevJobMetadataChecker.test(prevJobMetadata)) {
- return false;
- }
- jobMetadataMap.put(jobId, jobMetadata);
- if (jobMetadataMap.size() >
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
- jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) ->
Long.compare(
-
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
-
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
- .collect(Collectors.toList()).subList(0,
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
- .stream().collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
- }
- jobsZnRecord.setMapFields(jobMetadataMap);
- return _propertyStore.set(jobResourcePath, jobsZnRecord,
stat.getVersion(), AccessOption.PERSISTENT);
- } else {
- jobsZnRecord = new ZNRecord(jobResourcePath);
- jobsZnRecord.setMapField(jobId, jobMetadata);
- return _propertyStore.set(jobResourcePath, jobsZnRecord,
AccessOption.PERSISTENT);
- }
+ return ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId,
jobMetadata, jobType,
+ prevJobMetadataChecker);
}
/**
@@ -3874,47 +3785,10 @@ public class PinotHelixResourceManager {
"Instance: " + instanceName + (enableInstance ? " enable" : "
disable") + " failed, timeout");
}
- /**
- * Entry point for table Rebalacing.
- * @param tableNameWithType
- * @param rebalanceConfig
- * @param trackRebalanceProgress whether to track rebalance progress stats
- * @return RebalanceResult
- * @throws TableNotFoundException
- */
- public RebalanceResult rebalanceTable(String tableNameWithType,
RebalanceConfig rebalanceConfig,
- String rebalanceJobId, boolean trackRebalanceProgress)
- throws TableNotFoundException {
- TableConfig tableConfig = getTableConfig(tableNameWithType);
- if (tableConfig == null) {
- throw new TableNotFoundException("Failed to find table config for table:
" + tableNameWithType);
- }
- Preconditions.checkState(rebalanceJobId != null, "RebalanceId not
populated in the rebalanceConfig");
- ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
- if (trackRebalanceProgress) {
- zkBasedTableRebalanceObserver = new
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
- TableRebalanceContext.forInitialAttempt(rebalanceJobId,
rebalanceConfig), this);
- }
- return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId,
rebalanceConfig,
- zkBasedTableRebalanceObserver);
- }
-
- public RebalanceResult rebalanceTable(String tableNameWithType, TableConfig
tableConfig, String rebalanceJobId,
- RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver
zkBasedTableRebalanceObserver) {
- Map<String, Set<String>> tierToSegmentsMap = null;
- if (rebalanceConfig.isUpdateTargetTier()) {
- tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType,
tableConfig);
- }
- TableRebalancer tableRebalancer =
- new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver,
_controllerMetrics, _rebalancePreChecker,
- _tableSizeReader);
- return tableRebalancer.rebalance(tableConfig, rebalanceConfig,
rebalanceJobId, tierToSegmentsMap);
- }
-
/// Calculates the target tier for the segments within a table, updates the
segment ZK metadata and persists the
/// update to ZK.
- @VisibleForTesting
- Map<String, Set<String>> updateTargetTier(String rebalanceJobId, String
tableNameWithType, TableConfig tableConfig) {
+ public Map<String, Set<String>> updateTargetTier(String rebalanceJobId,
String tableNameWithType,
+ TableConfig tableConfig) {
List<Tier> sortedTiers =
CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList()) ?
getSortedTiers(tableConfig) : List.of();
LOGGER.info("For rebalanceId: {}, updating target tiers for segments of
table: {} with tierConfigs: {}",
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
index 817c4129b4..b16c6b8197 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceChecker.java
@@ -27,10 +27,10 @@ import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
+import org.apache.pinot.common.exception.RebalanceInProgressException;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMeter;
@@ -54,15 +54,15 @@ import org.slf4j.LoggerFactory;
public class RebalanceChecker extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER =
LoggerFactory.getLogger(RebalanceChecker.class);
private static final double RETRY_DELAY_SCALE_FACTOR = 2.0;
- private final ExecutorService _executorService;
+ private final TableRebalanceManager _tableRebalanceManager;
- public RebalanceChecker(PinotHelixResourceManager pinotHelixResourceManager,
- LeadControllerManager leadControllerManager, ControllerConf config,
ControllerMetrics controllerMetrics,
- ExecutorService executorService) {
+ public RebalanceChecker(TableRebalanceManager tableRebalanceManager,
+ PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager,
+ ControllerConf config, ControllerMetrics controllerMetrics) {
super(RebalanceChecker.class.getSimpleName(),
config.getRebalanceCheckerFrequencyInSeconds(),
config.getRebalanceCheckerInitialDelayInSeconds(),
pinotHelixResourceManager, leadControllerManager,
controllerMetrics);
- _executorService = executorService;
+ _tableRebalanceManager = tableRebalanceManager;
}
@Override
@@ -152,14 +152,7 @@ public class RebalanceChecker extends
ControllerPeriodicTask<Void> {
// thread, in order to avoid unnecessary ZK reads and making too many ZK
reads in a short time.
TableConfig tableConfig =
_pinotHelixResourceManager.getTableConfig(tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: %s", tableNameWithType);
- _executorService.submit(() -> {
- // Retry rebalance in another thread as rebalance can take time.
- try {
- retryRebalanceTableWithContext(tableNameWithType, tableConfig, jobCtx);
- } catch (Throwable t) {
- LOGGER.error("Failed to retry rebalance for table: {} asynchronously",
tableNameWithType, t);
- }
- });
+ retryRebalanceTableWithContext(tableNameWithType, tableConfig, jobCtx);
}
private void retryRebalanceTableWithContext(String tableNameWithType,
TableConfig tableConfig,
@@ -173,12 +166,23 @@ public class RebalanceChecker extends
ControllerPeriodicTask<Void> {
attemptJobId);
_controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.TABLE_REBALANCE_RETRY, 1L);
ZkBasedTableRebalanceObserver observer =
- new ZkBasedTableRebalanceObserver(tableNameWithType, attemptJobId,
retryCtx, _pinotHelixResourceManager);
- RebalanceResult result =
- _pinotHelixResourceManager.rebalanceTable(tableNameWithType,
tableConfig, attemptJobId, rebalanceConfig,
- observer);
- LOGGER.info("New attempt: {} for table: {} is done with result status:
{}", attemptJobId, tableNameWithType,
- result.getStatus());
+ new ZkBasedTableRebalanceObserver(tableNameWithType, attemptJobId,
retryCtx,
+ _pinotHelixResourceManager.getPropertyStore());
+
+ try {
+ _tableRebalanceManager.rebalanceTableAsync(tableNameWithType,
tableConfig, attemptJobId, rebalanceConfig,
+ observer)
+ .whenComplete((result, throwable) -> {
+ if (throwable != null) {
+ LOGGER.error("Failed to retry rebalance for table: {}",
tableNameWithType, throwable);
+ } else {
+ LOGGER.info("New attempt: {} for table: {} is done with result
status: {}", attemptJobId,
+ tableNameWithType, result.getStatus());
+ }
+ });
+ } catch (RebalanceInProgressException e) {
+ LOGGER.warn("Rebalance job for table: {} is already in progress.
Skipping retry.", tableNameWithType, e);
+ }
}
@VisibleForTesting
@@ -309,7 +313,8 @@ public class RebalanceChecker extends
ControllerPeriodicTask<Void> {
}
// The job is considered failed, but it's possible it is still running,
then we might end up with more than one
// rebalance jobs running in parallel for a table. The rebalance
algorithm is idempotent, so this should be fine
- // for the correctness.
+ // for the correctness. Note that we do still abort this job before
retrying, because we don't allow more than
+ // one actively running rebalance job (as per ZK) for a table.
LOGGER.info("Found stuck rebalance job: {} for original job: {}", jobId,
originalJobId);
candidates.computeIfAbsent(originalJobId, (k) -> new
HashSet<>()).add(Pair.of(jobCtx, jobStartTimeMs));
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
new file mode 100644
index 0000000000..87de12b3d7
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalanceManager.java
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+import javax.ws.rs.NotFoundException;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.exception.RebalanceInProgressException;
+import org.apache.pinot.common.exception.TableNotFoundException;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
+import org.apache.pinot.controller.util.TableSizeReader;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Single entry point for all table rebalance related operations. This class
should be used to initiate table rebalance
+ * operations, rather than directly instantiating objects of {@link
TableRebalancer}.
+ */
+public class TableRebalanceManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(TableRebalanceManager.class);
+
+ private final PinotHelixResourceManager _resourceManager;
+ private final ControllerMetrics _controllerMetrics;
+ private final RebalancePreChecker _rebalancePreChecker;
+ private final TableSizeReader _tableSizeReader;
+ private final ExecutorService _executorService;
+
+ public TableRebalanceManager(PinotHelixResourceManager resourceManager,
ControllerMetrics controllerMetrics,
+ RebalancePreChecker rebalancePreChecker, TableSizeReader
tableSizeReader, ExecutorService executorService) {
+ _resourceManager = resourceManager;
+ _controllerMetrics = controllerMetrics;
+ _rebalancePreChecker = rebalancePreChecker;
+ _tableSizeReader = tableSizeReader;
+ _executorService = executorService;
+ }
+
+ /**
+ * Rebalance the table with the given name and type synchronously. It's the
responsibility of the caller to ensure
+ * that this rebalance is run on the rebalance thread pool in the controller
that respects the configuration
+ * {@link
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}.
+ *
+ * @param tableNameWithType name of the table to rebalance
+ * @param rebalanceConfig configuration for the rebalance operation
+ * @param rebalanceJobId ID of the rebalance job, which is used to track the
progress of the rebalance operation
+ * @param trackRebalanceProgress whether to track rebalance progress stats
in ZK
+ * @return result of the rebalance operation
+ * @throws TableNotFoundException if the table does not exist
+ */
+ public RebalanceResult rebalanceTable(String tableNameWithType,
RebalanceConfig rebalanceConfig,
+ String rebalanceJobId, boolean trackRebalanceProgress)
+ throws TableNotFoundException, RebalanceInProgressException {
+ TableConfig tableConfig =
_resourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ throw new TableNotFoundException("Failed to find table config for table:
" + tableNameWithType);
+ }
+ Preconditions.checkState(rebalanceJobId != null, "RebalanceId not
populated in the rebalanceConfig");
+ ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver = null;
+ if (trackRebalanceProgress) {
+ zkBasedTableRebalanceObserver = new
ZkBasedTableRebalanceObserver(tableNameWithType, rebalanceJobId,
+ TableRebalanceContext.forInitialAttempt(rebalanceJobId,
rebalanceConfig),
+ _resourceManager.getPropertyStore());
+ }
+ return rebalanceTable(tableNameWithType, tableConfig, rebalanceJobId,
rebalanceConfig,
+ zkBasedTableRebalanceObserver);
+ }
+
+ /**
+ * Rebalance the table with the given name and type asynchronously. The
number of concurrent rebalances permitted
+ * on this controller is configured by
+ * {@link
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+ *
+ * @param tableNameWithType name of the table to rebalance
+ * @param rebalanceConfig configuration for the rebalance operation
+ * @param rebalanceJobId ID of the rebalance job, which is used to track the
progress of the rebalance operation
+ * @param trackRebalanceProgress whether to track rebalance progress stats
in ZK
+ * @return a CompletableFuture that will complete with the result of the
rebalance operation
+ * @throws TableNotFoundException if the table does not exist
+ */
+ public CompletableFuture<RebalanceResult> rebalanceTableAsync(String
tableNameWithType,
+ RebalanceConfig rebalanceConfig, String rebalanceJobId, boolean
trackRebalanceProgress)
+ throws TableNotFoundException, RebalanceInProgressException {
+ TableConfig tableConfig =
_resourceManager.getTableConfig(tableNameWithType);
+ if (tableConfig == null) {
+ throw new TableNotFoundException("Failed to find table config for table:
" + tableNameWithType);
+ }
+ if (!rebalanceConfig.isDryRun()) {
+ checkRebalanceJobInProgress(tableNameWithType);
+ }
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return rebalanceTable(tableNameWithType, rebalanceConfig,
rebalanceJobId, trackRebalanceProgress);
+ } catch (TableNotFoundException e) {
+ // Should not happen since we already checked for table existence
+ throw new RuntimeException(e);
+ } catch (RebalanceInProgressException e) {
+ throw new RuntimeException(e);
+ }
+ },
+ _executorService);
+ }
+
+ /**
+ * Rebalance the table with the given name and type asynchronously. The
number of concurrent rebalances permitted
+ * on this controller is configured by
+ * {@link
org.apache.pinot.controller.ControllerConf#CONTROLLER_EXECUTOR_REBALANCE_NUM_THREADS}
+ *
+ * @param tableNameWithType name of the table to rebalance
+ * @param tableConfig configuration for the table to rebalance
+ * @param rebalanceJobId ID of the rebalance job, which is used to track the
progress of the rebalance operation
+ * @param rebalanceConfig configuration for the rebalance operation
+ * @param zkBasedTableRebalanceObserver observer to track rebalance progress
in ZK
+ * @return a CompletableFuture that will complete with the result of the
rebalance operation
+ */
+ public CompletableFuture<RebalanceResult> rebalanceTableAsync(String
tableNameWithType, TableConfig tableConfig,
+ String rebalanceJobId, RebalanceConfig rebalanceConfig,
+ @Nullable ZkBasedTableRebalanceObserver zkBasedTableRebalanceObserver)
+ throws RebalanceInProgressException {
+ if (!rebalanceConfig.isDryRun()) {
+ checkRebalanceJobInProgress(tableNameWithType);
+ }
+
+ return CompletableFuture.supplyAsync(
+ () -> {
+ try {
+ return rebalanceTable(tableNameWithType, tableConfig,
rebalanceJobId, rebalanceConfig,
+ zkBasedTableRebalanceObserver);
+ } catch (RebalanceInProgressException e) {
+ throw new RuntimeException(e);
+ }
+ },
+ _executorService);
+ }
+
+ @VisibleForTesting
+ RebalanceResult rebalanceTable(String tableNameWithType, TableConfig
tableConfig, String rebalanceJobId,
+ RebalanceConfig rebalanceConfig, @Nullable ZkBasedTableRebalanceObserver
zkBasedTableRebalanceObserver)
+ throws RebalanceInProgressException {
+
+ if (!rebalanceConfig.isDryRun()) {
+ checkRebalanceJobInProgress(tableNameWithType);
+ }
+
+ Map<String, Set<String>> tierToSegmentsMap;
+ if (rebalanceConfig.isUpdateTargetTier()) {
+ tierToSegmentsMap = _resourceManager.updateTargetTier(rebalanceJobId,
tableNameWithType, tableConfig);
+ } else {
+ tierToSegmentsMap = null;
+ }
+ TableRebalancer tableRebalancer =
+ new TableRebalancer(_resourceManager.getHelixZkManager(),
zkBasedTableRebalanceObserver, _controllerMetrics,
+ _rebalancePreChecker, _tableSizeReader);
+
+ return tableRebalancer.rebalance(tableConfig, rebalanceConfig,
rebalanceJobId, tierToSegmentsMap);
+ }
+
+ /**
+ * Cancels ongoing rebalance jobs (if any) for the given table.
+ *
+ * @param tableNameWithType name of the table for which to cancel any
ongoing rebalance job
+ * @return the list of job IDs that were cancelled
+ */
+ public List<String> cancelRebalance(String tableNameWithType) {
+ List<String> cancelledJobIds = new ArrayList<>();
+ boolean updated = _resourceManager.updateJobsForTable(tableNameWithType,
ControllerJobType.TABLE_REBALANCE,
+ jobMetadata -> {
+ String jobId = jobMetadata.get(CommonConstants.ControllerJob.JOB_ID);
+ try {
+ String jobStatsInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+ TableRebalanceProgressStats jobStats =
+ JsonUtils.stringToObject(jobStatsInStr,
TableRebalanceProgressStats.class);
+ if (jobStats.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
+ return;
+ }
+
+ LOGGER.info("Cancelling rebalance job: {} for table: {}", jobId,
tableNameWithType);
+ jobStats.setStatus(RebalanceResult.Status.CANCELLED);
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(jobStats));
+ cancelledJobIds.add(jobId);
+ } catch (Exception e) {
+ LOGGER.error("Failed to cancel rebalance job: {} for table: {}",
jobId, tableNameWithType, e);
+ }
+ });
+ LOGGER.info("Tried to cancel existing rebalance jobs for table: {} at best
effort and done: {}", tableNameWithType,
+ updated);
+ return cancelledJobIds;
+ }
+
+ /**
+ * Gets the status of the rebalance job with the given ID.
+ *
+ * @param jobId ID of the rebalance job to get the status for
+ * @return response containing the status of the rebalance job
+ * @throws JsonProcessingException if there is an error processing the
rebalance progress stats from ZK
+ * @throws NotFoundException if the rebalance job with the given ID does not
exist
+ */
+ public ServerRebalanceJobStatusResponse getRebalanceStatus(String jobId)
+ throws JsonProcessingException {
+ Map<String, String> controllerJobZKMetadata =
+ _resourceManager.getControllerJobZKMetadata(jobId,
ControllerJobType.TABLE_REBALANCE);
+ if (controllerJobZKMetadata == null) {
+ LOGGER.warn("Rebalance job with ID: {} not found", jobId);
+ throw new NotFoundException("Rebalance job with ID: " + jobId + " not
found");
+ }
+ ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse = new
ServerRebalanceJobStatusResponse();
+ TableRebalanceProgressStats tableRebalanceProgressStats =
JsonUtils.stringToObject(
+
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS),
+ TableRebalanceProgressStats.class);
+
serverRebalanceJobStatusResponse.setTableRebalanceProgressStats(tableRebalanceProgressStats);
+
+ long timeSinceStartInSecs = 0L;
+ if (RebalanceResult.Status.DONE !=
tableRebalanceProgressStats.getStatus()) {
+ timeSinceStartInSecs = (System.currentTimeMillis() -
tableRebalanceProgressStats.getStartTimeMs()) / 1000;
+ }
+
serverRebalanceJobStatusResponse.setTimeElapsedSinceStartInSeconds(timeSinceStartInSecs);
+
+ String jobCtxInStr =
controllerJobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+ if (StringUtils.isNotEmpty(jobCtxInStr)) {
+ TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr,
TableRebalanceContext.class);
+ serverRebalanceJobStatusResponse.setTableRebalanceContext(jobCtx);
+ }
+ return serverRebalanceJobStatusResponse;
+ }
+
+ private void checkRebalanceJobInProgress(String tableNameWithType)
+ throws RebalanceInProgressException {
+ String rebalanceJobInProgress = rebalanceJobInProgress(tableNameWithType,
_resourceManager.getPropertyStore());
+ if (rebalanceJobInProgress != null) {
+ String errorMsg = "Rebalance job is already in progress for table: " +
tableNameWithType + ", jobId: "
+ + rebalanceJobInProgress + ". Please wait for the job to complete or
cancel it before starting a new one.";
+ throw new RebalanceInProgressException(errorMsg);
+ }
+ }
+
+ /**
+ * Checks if there is an ongoing rebalance job for the given table.
+ *
+ * @param tableNameWithType name of the table to check for ongoing rebalance
jobs
+ * @param propertyStore ZK property store to read the job metadata from
+ * @return jobId of the ongoing rebalance job if one exists, {@code null}
otherwise
+ */
+ @Nullable
+ public static String rebalanceJobInProgress(String tableNameWithType,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ // Get all jobMetadata for the given table with a single ZK read.
+ Map<String, Map<String, String>> allJobMetadataByJobId =
+
ControllerZkHelixUtils.getAllControllerJobs(Collections.singleton(ControllerJobType.TABLE_REBALANCE),
+ jobMetadata -> tableNameWithType.equals(
+
jobMetadata.get(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE)),
propertyStore);
+
+ for (Map.Entry<String, Map<String, String>> entry :
allJobMetadataByJobId.entrySet()) {
+ String jobId = entry.getKey();
+ Map<String, String> jobMetadata = entry.getValue();
+ String jobStatsInStr =
jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+
+ TableRebalanceProgressStats jobStats;
+ try {
+ jobStats = JsonUtils.stringToObject(jobStatsInStr,
TableRebalanceProgressStats.class);
+ } catch (Exception e) {
+ // If the job stats cannot be parsed, let's assume that the job is not
in progress.
+ continue;
+ }
+
+ if (jobStats.getStatus() == RebalanceResult.Status.IN_PROGRESS) {
+ return jobId;
+ }
+ }
+
+ return null;
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 3bf80c9f60..cf3874ce2b 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -30,6 +30,7 @@ import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
@@ -156,11 +157,7 @@ public class TableRebalancer {
@Nullable ControllerMetrics controllerMetrics, @Nullable
RebalancePreChecker rebalancePreChecker,
@Nullable TableSizeReader tableSizeReader) {
_helixManager = helixManager;
- if (tableRebalanceObserver != null) {
- _tableRebalanceObserver = tableRebalanceObserver;
- } else {
- _tableRebalanceObserver = new NoOpTableRebalanceObserver();
- }
+ _tableRebalanceObserver =
Objects.requireNonNullElseGet(tableRebalanceObserver,
NoOpTableRebalanceObserver::new);
_helixDataAccessor = helixManager.getHelixDataAccessor();
_controllerMetrics = controllerMetrics;
_rebalancePreChecker = rebalancePreChecker;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
index 042be1a93d..e1cfdfc833 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/ZkBasedTableRebalanceObserver.java
@@ -25,10 +25,12 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.slf4j.Logger;
@@ -43,7 +45,7 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
private static final Logger LOGGER =
LoggerFactory.getLogger(ZkBasedTableRebalanceObserver.class);
private final String _tableNameWithType;
private final String _rebalanceJobId;
- private final PinotHelixResourceManager _pinotHelixResourceManager;
+ private final ZkHelixPropertyStore<ZNRecord> _propertyStore;
private final TableRebalanceProgressStats _tableRebalanceProgressStats;
private final TableRebalanceContext _tableRebalanceContext;
// These previous stats are used for rollback scenarios where the IdealState
update fails due to a version
@@ -59,13 +61,13 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
private final ControllerMetrics _controllerMetrics;
public ZkBasedTableRebalanceObserver(String tableNameWithType, String
rebalanceJobId,
- TableRebalanceContext tableRebalanceContext, PinotHelixResourceManager
pinotHelixResourceManager) {
+ TableRebalanceContext tableRebalanceContext,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
Preconditions.checkState(tableNameWithType != null, "Table name cannot be
null");
Preconditions.checkState(rebalanceJobId != null, "rebalanceId cannot be
null");
- Preconditions.checkState(pinotHelixResourceManager != null,
"PinotHelixManager cannot be null");
+ Preconditions.checkState(propertyStore != null, "ZkHelixPropertyStore
cannot be null");
_tableNameWithType = tableNameWithType;
_rebalanceJobId = rebalanceJobId;
- _pinotHelixResourceManager = pinotHelixResourceManager;
+ _propertyStore = propertyStore;
_tableRebalanceProgressStats = new TableRebalanceProgressStats();
_tableRebalanceContext = tableRebalanceContext;
_previousStepStats = new
TableRebalanceProgressStats.RebalanceProgressStats();
@@ -279,8 +281,8 @@ public class ZkBasedTableRebalanceObserver implements
TableRebalanceObserver {
private void trackStatsInZk() {
Map<String, String> jobMetadata =
createJobMetadata(_tableNameWithType, _rebalanceJobId,
_tableRebalanceProgressStats, _tableRebalanceContext);
- _pinotHelixResourceManager.addControllerJobToZK(_rebalanceJobId,
jobMetadata, ControllerJobType.TABLE_REBALANCE,
- prevJobMetadata -> {
+ ControllerZkHelixUtils.addControllerJobToZK(_propertyStore,
_rebalanceJobId, jobMetadata,
+ ControllerJobType.TABLE_REBALANCE, prevJobMetadata -> {
// In addition to updating job progress status, the observer also
checks if the job status is IN_PROGRESS.
// If not, then no need to update the job status, and we keep this
status to end the job promptly.
if (prevJobMetadata == null) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
index 661fff70d9..369024faa6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -29,20 +29,25 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.pinot.common.exception.RebalanceInProgressException;
import org.apache.pinot.common.exception.TableNotFoundException;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
import org.apache.pinot.spi.config.table.TableConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DefaultTenantRebalancer implements TenantRebalancer {
private static final Logger LOGGER =
LoggerFactory.getLogger(DefaultTenantRebalancer.class);
- PinotHelixResourceManager _pinotHelixResourceManager;
- ExecutorService _executorService;
+ private final TableRebalanceManager _tableRebalanceManager;
+ private final PinotHelixResourceManager _pinotHelixResourceManager;
+ private final ExecutorService _executorService;
- public DefaultTenantRebalancer(PinotHelixResourceManager
pinotHelixResourceManager, ExecutorService executorService) {
+ public DefaultTenantRebalancer(TableRebalanceManager tableRebalanceManager,
+ PinotHelixResourceManager pinotHelixResourceManager, ExecutorService
executorService) {
+ _tableRebalanceManager = tableRebalanceManager;
_pinotHelixResourceManager = pinotHelixResourceManager;
_executorService = executorService;
}
@@ -56,13 +61,13 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config);
rebalanceConfig.setDryRun(true);
rebalanceResult.put(table,
- _pinotHelixResourceManager.rebalanceTable(table, rebalanceConfig,
createUniqueRebalanceJobIdentifier(),
- false));
- } catch (TableNotFoundException exception) {
+ _tableRebalanceManager.rebalanceTable(table, rebalanceConfig,
createUniqueRebalanceJobIdentifier(), false));
+ } catch (TableNotFoundException | RebalanceInProgressException
exception) {
rebalanceResult.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
null, null, null, null, null));
}
});
+
if (config.isDryRun()) {
return new TenantRebalanceResult(null, rebalanceResult,
config.isVerboseResult());
} else {
@@ -198,7 +203,7 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
TenantRebalanceObserver observer) {
try {
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER,
tableName, rebalanceJobId);
- RebalanceResult result =
_pinotHelixResourceManager.rebalanceTable(tableName, config, rebalanceJobId,
true);
+ RebalanceResult result =
_tableRebalanceManager.rebalanceTable(tableName, config, rebalanceJobId, true);
if (result.getStatus().equals(RebalanceResult.Status.DONE)) {
observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_COMPLETED_TRIGGER,
tableName, null);
} else {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
index 17dd44378b..b2144260c0 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocator.java
@@ -45,6 +45,7 @@ import
org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.periodictask.ControllerPeriodicTask;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.controller.util.TableTierReader;
import org.apache.pinot.spi.config.table.TableConfig;
@@ -65,6 +66,7 @@ import org.slf4j.LoggerFactory;
public class SegmentRelocator extends ControllerPeriodicTask<Void> {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentRelocator.class);
+ private final TableRebalanceManager _tableRebalanceManager;
private final ExecutorService _executorService;
private final HttpClientConnectionManager _connectionManager;
private final boolean _enableLocalTierMigration;
@@ -87,12 +89,14 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
@Nullable
private final Set<String> _tablesUndergoingRebalance;
- public SegmentRelocator(PinotHelixResourceManager pinotHelixResourceManager,
- LeadControllerManager leadControllerManager, ControllerConf config,
ControllerMetrics controllerMetrics,
- ExecutorService executorService, HttpClientConnectionManager
connectionManager) {
+ public SegmentRelocator(TableRebalanceManager tableRebalanceManager,
+ PinotHelixResourceManager pinotHelixResourceManager,
LeadControllerManager leadControllerManager,
+ ControllerConf config, ControllerMetrics controllerMetrics,
ExecutorService executorService,
+ HttpClientConnectionManager connectionManager) {
super(SegmentRelocator.class.getSimpleName(),
config.getSegmentRelocatorFrequencyInSeconds(),
config.getSegmentRelocatorInitialDelayInSeconds(),
pinotHelixResourceManager, leadControllerManager,
controllerMetrics);
+ _tableRebalanceManager = tableRebalanceManager;
_executorService = executorService;
_connectionManager = connectionManager;
_enableLocalTierMigration =
config.enableSegmentRelocatorLocalTierMigration();
@@ -145,9 +149,9 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
} else {
LOGGER.info("The previous rebalance has not yet completed, skip
rebalancing table {}", tableNameWithType);
}
- return;
+ } else {
+ putTableToWait(tableNameWithType);
}
- putTableToWait(tableNameWithType);
}
@VisibleForTesting
@@ -225,7 +229,10 @@ public class SegmentRelocator extends
ControllerPeriodicTask<Void> {
// all segments are put on the right servers. If any segments are not on
their target tier, the server local
// tier migration is triggered for them, basically asking the hosting
servers to reload them. The segment
// target tier may get changed between the two sequential actions, but
cluster states converge eventually.
- RebalanceResult rebalance =
_pinotHelixResourceManager.rebalanceTable(tableNameWithType, rebalanceConfig,
+
+ // We're not using the async rebalance API here because we want to run
this on a separate thread pool from the
+ // rebalance thread pool that is used for user initiated rebalances.
+ RebalanceResult rebalance =
_tableRebalanceManager.rebalanceTable(tableNameWithType, rebalanceConfig,
TableRebalancer.createUniqueRebalanceJobIdentifier(), false);
switch (rebalance.getStatus()) {
case NO_OP:
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
new file mode 100644
index 0000000000..f5f7ee12e8
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/ControllerZkHelixUtils.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.util;
+
+import com.google.common.base.Preconditions;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.helix.AccessOption;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.zookeeper.data.Stat;
+
+
+public class ControllerZkHelixUtils {
+
+ private ControllerZkHelixUtils() {
+ // Utility class
+ }
+
+ /**
+ * Adds a new job metadata entry for a controller job like table rebalance
or segment reload into ZK
+ *
+ * @param propertyStore the ZK property store to write to
+ * @param jobId job's UUID
+ * @param jobMetadata the job metadata
+ * @param jobType the type of the job to figure out where the job metadata
is kept in ZK
+ * @param prevJobMetadataChecker to check the previous job metadata before
adding new one
+ * @return boolean representing success / failure of the ZK write step
+ */
+ public static boolean addControllerJobToZK(ZkHelixPropertyStore<ZNRecord>
propertyStore, String jobId,
+ Map<String, String> jobMetadata, String jobType, Predicate<Map<String,
String>> prevJobMetadataChecker) {
+
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)
!= null,
+ CommonConstants.ControllerJob.SUBMISSION_TIME_MS
+ + " in JobMetadata record not set. Cannot expire these records");
+ String jobResourcePath =
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+ Stat stat = new Stat();
+ ZNRecord jobsZnRecord = propertyStore.get(jobResourcePath, stat,
AccessOption.PERSISTENT);
+ if (jobsZnRecord != null) {
+ Map<String, Map<String, String>> jobMetadataMap =
jobsZnRecord.getMapFields();
+ Map<String, String> prevJobMetadata = jobMetadataMap.get(jobId);
+ if (!prevJobMetadataChecker.test(prevJobMetadata)) {
+ return false;
+ }
+ jobMetadataMap.put(jobId, jobMetadata);
+ if (jobMetadataMap.size() >
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK) {
+ jobMetadataMap = jobMetadataMap.entrySet().stream().sorted((v1, v2) ->
Long.compare(
+
Long.parseLong(v2.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)),
+
Long.parseLong(v1.getValue().get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS))))
+ .collect(Collectors.toList()).subList(0,
CommonConstants.ControllerJob.MAXIMUM_CONTROLLER_JOBS_IN_ZK)
+ .stream().collect(Collectors.toMap(Map.Entry::getKey,
Map.Entry::getValue));
+ }
+ jobsZnRecord.setMapFields(jobMetadataMap);
+ return propertyStore.set(jobResourcePath, jobsZnRecord,
stat.getVersion(), AccessOption.PERSISTENT);
+ } else {
+ jobsZnRecord = new ZNRecord(jobResourcePath);
+ jobsZnRecord.setMapField(jobId, jobMetadata);
+ return propertyStore.set(jobResourcePath, jobsZnRecord,
AccessOption.PERSISTENT);
+ }
+ }
+
+ /**
+ * Get all controller jobs from ZK for a given set of job types.
+ * @param jobTypes the set of job types to filter
+ * @param jobMetadataChecker a predicate to filter the job metadata
+ * @param propertyStore the ZK property store to read from
+ * @return a map of jobId to job metadata for all the jobs that match the
given job types and metadata checker
+ */
+ public static Map<String, Map<String, String>>
getAllControllerJobs(Set<String> jobTypes,
+ Predicate<Map<String, String>> jobMetadataChecker,
ZkHelixPropertyStore<ZNRecord> propertyStore) {
+ Map<String, Map<String, String>> controllerJobs = new HashMap<>();
+ for (String jobType : jobTypes) {
+ String jobResourcePath =
ZKMetadataProvider.constructPropertyStorePathForControllerJob(jobType);
+ ZNRecord jobsZnRecord = propertyStore.get(jobResourcePath, null,
AccessOption.PERSISTENT);
+ if (jobsZnRecord == null) {
+ continue;
+ }
+ Map<String, Map<String, String>> jobMetadataMap =
jobsZnRecord.getMapFields();
+ for (Map.Entry<String, Map<String, String>> jobMetadataEntry :
jobMetadataMap.entrySet()) {
+ String jobId = jobMetadataEntry.getKey();
+ Map<String, String> jobMetadata = jobMetadataEntry.getValue();
+
Preconditions.checkState(jobMetadata.get(CommonConstants.ControllerJob.JOB_TYPE).equals(jobType),
+ "Got unexpected jobType: %s at jobResourcePath: %s with jobId:
%s", jobType, jobResourcePath, jobId);
+ if (jobMetadataChecker.test(jobMetadata)) {
+ controllerJobs.put(jobId, jobMetadata);
+ }
+ }
+ }
+ return controllerJobs;
+ }
+}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
index 55e0bb1772..176cec96ae 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java
@@ -34,6 +34,7 @@ import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.hc.client5.http.entity.EntityBuilder;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
@@ -69,6 +70,8 @@ import
org.apache.pinot.controller.api.access.AllowAllAccessFactory;
import org.apache.pinot.controller.api.resources.PauseStatusDetails;
import org.apache.pinot.controller.api.resources.TableViews;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
+import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
import org.apache.pinot.spi.config.table.QueryConfig;
import org.apache.pinot.spi.config.table.QuotaConfig;
@@ -158,6 +161,8 @@ public class ControllerTest {
protected HelixDataAccessor _helixDataAccessor;
protected HelixAdmin _helixAdmin;
protected ZkHelixPropertyStore<ZNRecord> _propertyStore;
+ protected TableRebalanceManager _tableRebalanceManager;
+ protected TableSizeReader _tableSizeReader;
/**
* Acquire the {@link ControllerTest} default instance that can be shared
across different test cases.
@@ -303,6 +308,8 @@ public class ControllerTest {
_controllerDataDir = _controllerConfig.getDataDir();
_helixResourceManager = _controllerStarter.getHelixResourceManager();
_helixManager = _controllerStarter.getHelixControllerManager();
+ _tableRebalanceManager = _controllerStarter.getTableRebalanceManager();
+ _tableSizeReader = _controllerStarter.getTableSizeReader();
_helixDataAccessor = _helixManager.getHelixDataAccessor();
ConfigAccessor configAccessor = _helixManager.getConfigAccessor();
// HelixResourceManager is null in Helix only mode, while HelixManager
is null in Pinot only mode.
@@ -1028,6 +1035,24 @@ public class ControllerTest {
}
}
+ /**
+ * Sends a POST request to the specified URL with the given payload and
returns the status code along with the
+ * stringified response.
+ * @param urlString the URL to send the POST request to
+ * @param payload the payload to send in the POST request
+ * @return a Pair containing the status code and the stringified response
+ */
+ public static Pair<Integer, String> postRequestWithStatusCode(String
urlString, String payload)
+ throws IOException {
+ try {
+ SimpleHttpResponse resp =
+ getHttpClient().sendJsonPostRequest(new URL(urlString).toURI(),
payload, Collections.emptyMap());
+ return Pair.of(resp.getStatusCode(), constructResponse(resp));
+ } catch (URISyntaxException e) {
+ throw new IOException(e);
+ }
+ }
+
public static String sendPostRequestRaw(String urlString, String payload,
Map<String, String> headers)
throws IOException {
try {
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
index 22663fa333..d88b1319c1 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceCheckerTest.java
@@ -26,6 +26,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
@@ -49,10 +50,7 @@ import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
@@ -248,14 +246,21 @@ public class RebalanceCheckerTest {
PinotHelixResourceManager helixManager =
mock(PinotHelixResourceManager.class);
when(helixManager.getTableConfig(tableName)).thenReturn(tableConfig);
when(helixManager.getAllJobs(any(), any())).thenReturn(allJobMetadata);
- RebalanceChecker checker = new RebalanceChecker(helixManager,
leadController, cfg, metrics, exec);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(helixManager.getPropertyStore()).thenReturn(propertyStore);
+ TableRebalanceManager tableRebalanceManager =
mock(TableRebalanceManager.class);
+ when(tableRebalanceManager.rebalanceTableAsync(anyString(),
any(TableConfig.class), anyString(),
+ any(RebalanceConfig.class),
any(ZkBasedTableRebalanceObserver.class))).thenReturn(
+ CompletableFuture.completedFuture(null));
+ RebalanceChecker checker = new RebalanceChecker(tableRebalanceManager,
helixManager, leadController, cfg, metrics);
// Although job1_3 was submitted most recently but job1 had exceeded
maxAttempts. Chose job3 to retry, which got
// stuck at in progress status.
checker.retryRebalanceTable(tableName, allJobMetadata);
// The new retry job is for job3 and attemptId is increased to 2.
ArgumentCaptor<ZkBasedTableRebalanceObserver> observerCaptor =
ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
- verify(helixManager, times(1)).rebalanceTable(eq(tableName), any(),
anyString(), any(), observerCaptor.capture());
+ verify(tableRebalanceManager, times(1)).rebalanceTableAsync(eq(tableName),
any(), anyString(), any(),
+ observerCaptor.capture());
ZkBasedTableRebalanceObserver observer = observerCaptor.getValue();
jobCtx = observer.getTableRebalanceContext();
assertEquals(jobCtx.getOriginalJobId(), "job3");
@@ -286,12 +291,19 @@ public class RebalanceCheckerTest {
PinotHelixResourceManager helixManager =
mock(PinotHelixResourceManager.class);
TableConfig tableConfig = mock(TableConfig.class);
when(helixManager.getTableConfig(tableName)).thenReturn(tableConfig);
- RebalanceChecker checker = new RebalanceChecker(helixManager,
leadController, cfg, metrics, exec);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
+ when(helixManager.getPropertyStore()).thenReturn(propertyStore);
+ TableRebalanceManager tableRebalanceManager =
mock(TableRebalanceManager.class);
+ when(tableRebalanceManager.rebalanceTableAsync(anyString(),
any(TableConfig.class), anyString(),
+ any(RebalanceConfig.class),
any(ZkBasedTableRebalanceObserver.class))).thenReturn(
+ CompletableFuture.completedFuture(null));
+ RebalanceChecker checker = new RebalanceChecker(tableRebalanceManager,
helixManager, leadController, cfg, metrics);
checker.retryRebalanceTable(tableName, allJobMetadata);
// Retry for job1 is delayed with 5min backoff.
ArgumentCaptor<ZkBasedTableRebalanceObserver> observerCaptor =
ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
- verify(helixManager, times(0)).rebalanceTable(eq(tableName), any(),
anyString(), any(), observerCaptor.capture());
+ verify(tableRebalanceManager, never()).rebalanceTable(eq(tableName),
any(), anyString(), any(),
+ observerCaptor.capture());
// Set initial delay to 0 to disable retry backoff.
jobCfg.setRetryInitialDelayInMs(0);
@@ -300,7 +312,8 @@ public class RebalanceCheckerTest {
checker.retryRebalanceTable(tableName, allJobMetadata);
// Retry for job1 is delayed with 0 backoff.
observerCaptor =
ArgumentCaptor.forClass(ZkBasedTableRebalanceObserver.class);
- verify(helixManager, times(1)).rebalanceTable(eq(tableName), any(),
anyString(), any(), observerCaptor.capture());
+ verify(tableRebalanceManager, times(1)).rebalanceTableAsync(eq(tableName),
any(), anyString(), any(),
+ observerCaptor.capture());
}
@Test
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 64cbff92c0..64b8a6e5f5 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -128,7 +128,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService, 1);
TableRebalancer tableRebalancer =
- new TableRebalancer(_helixManager, null, null, preChecker,
_helixResourceManager.getTableSizeReader());
+ new TableRebalancer(_helixManager, null, null, preChecker,
_tableSizeReader);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
@@ -699,8 +699,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService, 1);
- TableRebalancer tableRebalancer = new TableRebalancer(_helixManager,
null, null, preChecker,
- _helixResourceManager.getTableSizeReader());
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager,
null, null, preChecker, _tableSizeReader);
// Set up the table with 1 replication factor and strict replica group
enabled
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1)
@@ -982,8 +981,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService, 1);
- TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker,
- _helixResourceManager.getTableSizeReader());
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker, _tableSizeReader);
// Set up the table with 1 replication factor and strict replica group
enabled
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(1)
@@ -1042,7 +1040,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService, 0.5);
TableRebalancer tableRebalancer =
- new TableRebalancer(_helixManager, null, null, preChecker,
_helixResourceManager.getTableSizeReader());
+ new TableRebalancer(_helixManager, null, null, preChecker,
_tableSizeReader);
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
@@ -1142,7 +1140,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService, 0.5);
TableRebalancer tableRebalancer =
- new TableRebalancer(_helixManager, null, null, preChecker,
_helixResourceManager.getTableSizeReader());
+ new TableRebalancer(_helixManager, null, null, preChecker,
_tableSizeReader);
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
.setNumReplicas(2)
@@ -1557,7 +1555,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
ExecutorService executorService = Executors.newFixedThreadPool(10);
preChecker.init(_helixResourceManager, executorService, 1);
TableRebalancer tableRebalancer =
- new TableRebalancer(_helixManager, null, null, preChecker,
_helixResourceManager.getTableSizeReader());
+ new TableRebalancer(_helixManager, null, null, preChecker,
_tableSizeReader);
// Try dry-run summary mode
RebalanceConfig rebalanceConfig = new RebalanceConfig();
@@ -2114,7 +2112,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
ConsumingSegmentInfoReader mockConsumingSegmentInfoReader =
Mockito.mock(ConsumingSegmentInfoReader.class);
TableRebalancer tableRebalancerOriginal =
- new TableRebalancer(_helixManager, null, null, null,
_helixResourceManager.getTableSizeReader());
+ new TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
.setNumReplicas(numReplica)
@@ -2233,9 +2231,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
}
- ConsumingSegmentInfoReader mockConsumingSegmentInfoReader =
Mockito.mock(ConsumingSegmentInfoReader.class);
TableRebalancer tableRebalancerOriginal =
- new TableRebalancer(_helixManager, null, null, null,
_helixResourceManager.getTableSizeReader());
+ new TableRebalancer(_helixManager, null, null, null, _tableSizeReader);
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
.setNumReplicas(numReplica)
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
index b0d86702e9..f8db37a5aa 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TestZkBasedTableRebalanceObserver.java
@@ -24,9 +24,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import org.apache.helix.store.zk.ZkHelixPropertyStore;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
-import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import org.testng.annotations.Test;
@@ -35,6 +36,8 @@ import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.Segmen
import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.OFFLINE;
import static
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyInt;
+import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
@@ -45,14 +48,15 @@ import static org.testng.Assert.assertTrue;
public class TestZkBasedTableRebalanceObserver {
@Test
void testZkObserverProgressStats() {
- PinotHelixResourceManager pinotHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
// Mocking this. We will verify using numZkUpdate stat
- when(pinotHelixResourceManager.addControllerJobToZK(any(), any(),
any())).thenReturn(true);
+ when(propertyStore.set(anyString(), any(), anyInt())).thenReturn(true);
+
ControllerMetrics controllerMetrics = ControllerMetrics.get();
TableRebalanceContext retryCtx = new TableRebalanceContext();
retryCtx.setConfig(new RebalanceConfig());
ZkBasedTableRebalanceObserver observer =
- new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx,
pinotHelixResourceManager);
+ new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx,
propertyStore);
Map<String, Map<String, String>> source = new TreeMap<>();
Map<String, Map<String, String>> target = new TreeMap<>();
Map<String, Map<String, String>> targetIntermediate = new TreeMap<>();
@@ -206,14 +210,15 @@ public class TestZkBasedTableRebalanceObserver {
// This is a test to verify if Zk stats are pushed out correctly
@Test
void testZkObserverTracking() {
- PinotHelixResourceManager pinotHelixResourceManager =
mock(PinotHelixResourceManager.class);
+ ZkHelixPropertyStore<ZNRecord> propertyStore =
mock(ZkHelixPropertyStore.class);
// Mocking this. We will verify using numZkUpdate stat
- when(pinotHelixResourceManager.addControllerJobToZK(any(), any(),
any())).thenReturn(true);
+ when(propertyStore.set(anyString(), any(), anyInt())).thenReturn(true);
+
ControllerMetrics controllerMetrics = ControllerMetrics.get();
TableRebalanceContext retryCtx = new TableRebalanceContext();
retryCtx.setConfig(new RebalanceConfig());
ZkBasedTableRebalanceObserver observer =
- new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx,
pinotHelixResourceManager);
+ new ZkBasedTableRebalanceObserver("dummy", "dummyId", retryCtx,
propertyStore);
Map<String, Map<String, String>> source = new TreeMap<>();
Map<String, Map<String, String>> target = new TreeMap<>();
target.put("segment1",
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
index 76189e9268..abac12b974 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancerTest.java
@@ -74,7 +74,8 @@ public class TenantRebalancerTest extends ControllerTest {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
}
- TenantRebalancer tenantRebalancer = new
DefaultTenantRebalancer(_helixResourceManager, _executorService);
+ TenantRebalancer tenantRebalancer =
+ new DefaultTenantRebalancer(_tableRebalanceManager,
_helixResourceManager, _executorService);
// tag all servers and brokers to test tenant
addTenantTagToInstances(TENANT_NAME);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
index 3c08b79d92..e6bbcdd026 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/relocation/SegmentRelocatorTest.java
@@ -37,6 +37,7 @@ import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
import org.apache.pinot.controller.util.TableTierReader;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.util.TestUtils;
@@ -119,8 +120,9 @@ public class SegmentRelocatorTest {
ControllerConf conf = mock(ControllerConf.class);
when(conf.isSegmentRelocatorRebalanceTablesSequentially()).thenReturn(true);
SegmentRelocator relocator =
- new SegmentRelocator(mock(PinotHelixResourceManager.class),
mock(LeadControllerManager.class), conf,
- mock(ControllerMetrics.class), mock(ExecutorService.class),
mock(HttpClientConnectionManager.class));
+ new SegmentRelocator(mock(TableRebalanceManager.class),
mock(PinotHelixResourceManager.class),
+ mock(LeadControllerManager.class), conf,
mock(ControllerMetrics.class), mock(ExecutorService.class),
+ mock(HttpClientConnectionManager.class));
int cnt = 10;
Random random = new Random();
for (int i = 0; i < cnt; i++) {
@@ -150,8 +152,9 @@ public class SegmentRelocatorTest {
ControllerConf conf = mock(ControllerConf.class);
when(conf.isSegmentRelocatorRebalanceTablesSequentially()).thenReturn(true);
SegmentRelocator relocator =
- new SegmentRelocator(mock(PinotHelixResourceManager.class),
mock(LeadControllerManager.class), conf,
- mock(ControllerMetrics.class), mock(ExecutorService.class),
mock(HttpClientConnectionManager.class));
+ new SegmentRelocator(mock(TableRebalanceManager.class),
mock(PinotHelixResourceManager.class),
+ mock(LeadControllerManager.class), conf,
mock(ControllerMetrics.class), mock(ExecutorService.class),
+ mock(HttpClientConnectionManager.class));
ExecutorService runner = Executors.newCachedThreadPool();
Random random = new Random();
int cnt = 10;
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
index 2f51fd6250..11f16e1e53 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/TableRebalanceIntegrationTest.java
@@ -26,7 +26,10 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.exception.HttpErrorStatusException;
+import org.apache.pinot.common.metadata.controllerjob.ControllerJobType;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.config.TagNameUtils;
@@ -37,9 +40,13 @@ import org.apache.pinot.common.utils.regex.Pattern;
import
org.apache.pinot.controller.api.resources.ServerReloadControllerJobStatusResponse;
import
org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
import
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerResult;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
+import
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.controller.helix.core.util.ControllerZkHelixUtils;
import org.apache.pinot.server.starter.helix.BaseServerStarter;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -54,9 +61,11 @@ import
org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.data.MetricFieldSpec;
import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.Enablement;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.StringUtil;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -1258,6 +1267,55 @@ public class TableRebalanceIntegrationTest extends
BaseHybridClusterIntegrationT
assertEquals(numServersUnchanged,
summaryResult.getServerInfo().getServersUnchanged().size());
}
+ @Test
+ public void testDisallowMultipleConcurrentRebalancesOnSameTable() throws
Exception {
+ // Manually write an IN_PROGRESS rebalance job to ZK instead of trying to
collide multiple actual rebalance
+ // attempts which will be prone to race conditions and cause this test to
be flaky. We only reject a rebalance job
+ // if there is an IN_PROGRESS rebalance job for the same table in ZK, so
we could actually end up with more than
+ // one active rebalance job if both are started at the exact same time
since the progress stats are written to ZK
+ // after some initial pre-checks are done. However, rebalances are
idempotent, and we don't actually care too much
+ // about avoiding this edge case race condition as long as in most cases
we are able to prevent users from
+ // triggering a rebalance for a table that already has an in-progress
rebalance job.
+ String jobId = TableRebalancer.createUniqueRebalanceJobIdentifier();
+ String tableNameWithType =
TableNameBuilder.forType(TableType.OFFLINE).tableNameWithType(getTableName());
+ TableRebalanceProgressStats progressStats = new
TableRebalanceProgressStats();
+ progressStats.setStatus(RebalanceResult.Status.IN_PROGRESS);
+ Map<String, String> jobMetadata = new HashMap<>();
+ jobMetadata.put(CommonConstants.ControllerJob.TABLE_NAME_WITH_TYPE,
tableNameWithType);
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_ID, jobId);
+ jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(System.currentTimeMillis()));
+ jobMetadata.put(CommonConstants.ControllerJob.JOB_TYPE,
ControllerJobType.TABLE_REBALANCE);
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(progressStats));
+ ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId,
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+ prevJobMetadata -> true);
+
+ // Add a new server (to force change in instance assignment) and enable
reassignInstances to ensure that the
+ // rebalance is not a NO_OP
+ BaseServerStarter serverStarter = startOneServer(NUM_SERVERS);
+ createServerTenant(getServerTenant(), 1, 0);
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setReassignInstances(true);
+
+ Pair<Integer, String> response =
+ postRequestWithStatusCode(getRebalanceUrl(rebalanceConfig,
TableType.OFFLINE), null);
+ assertEquals(response.getLeft(), Response.Status.CONFLICT.getStatusCode());
+ assertTrue(response.getRight().contains("Rebalance job is already in
progress for table"));
+
+ // Update the job status to DONE to allow other tests to run
+ progressStats.setStatus(RebalanceResult.Status.DONE);
+
jobMetadata.put(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS,
+ JsonUtils.objectToString(progressStats));
+ ControllerZkHelixUtils.addControllerJobToZK(_propertyStore, jobId,
jobMetadata, ControllerJobType.TABLE_REBALANCE,
+ prevJobMetadata -> true);
+
+ // Stop the added server
+ serverStarter.stop();
+ TestUtils.waitForCondition(
+ aVoid ->
getHelixResourceManager().dropInstance(serverStarter.getInstanceId()).isSuccessful(),
+ 60_000L, "Failed to drop added server");
+ }
+
private String getReloadJobIdFromResponse(String response) {
Pattern pattern = new
JavaUtilPattern("([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})");
Matcher matcher = pattern.matcher(response);
diff --git
a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
index 39ccd02602..f935a22ba7 100644
--- a/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
+++ b/pinot-tools/src/main/java/org/apache/pinot/tools/PinotTableRebalancer.java
@@ -22,7 +22,10 @@ import com.google.common.base.Preconditions;
import org.apache.pinot.common.metadata.ZKMetadataProvider;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
+import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceManager;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.utils.Enablement;
@@ -55,7 +58,22 @@ public class PinotTableRebalancer extends PinotZKChanger {
public RebalanceResult rebalance(String tableNameWithType) {
TableConfig tableConfig =
ZKMetadataProvider.getTableConfig(_propertyStore, tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table config
for table: " + tableNameWithType);
- return new TableRebalancer(_helixManager).rebalance(tableConfig,
_rebalanceConfig,
- TableRebalancer.createUniqueRebalanceJobIdentifier());
+
+ String jobId = TableRebalancer.createUniqueRebalanceJobIdentifier();
+
+ if (!_rebalanceConfig.isDryRun()) {
+ String rebalanceJobInProgress =
TableRebalanceManager.rebalanceJobInProgress(tableNameWithType, _propertyStore);
+ if (rebalanceJobInProgress != null) {
+ String errorMsg = "Rebalance job is already in progress for table: " +
tableNameWithType + ", jobId: "
+ + rebalanceJobInProgress + ". Please wait for the job to complete
or cancel it before starting a new one.";
+ return new RebalanceResult(jobId, RebalanceResult.Status.FAILED,
errorMsg, null, null, null, null, null);
+ }
+ }
+
+ ZkBasedTableRebalanceObserver rebalanceObserver = new
ZkBasedTableRebalanceObserver(tableNameWithType, jobId,
+ TableRebalanceContext.forInitialAttempt(jobId, _rebalanceConfig),
_propertyStore);
+
+ return new TableRebalancer(_helixManager, rebalanceObserver, null, null,
null)
+ .rebalance(tableConfig, _rebalanceConfig, jobId);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]