J-HowHuang commented on code in PR #16455:
URL: https://github.com/apache/pinot/pull/16455#discussion_r2294440247


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java:
##########
@@ -0,0 +1,307 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance.tenant;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.Set;
+import java.util.UUID;
+import javax.ws.rs.core.Response;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pinot.common.metrics.ControllerMetrics;
+import org.apache.pinot.controller.ControllerConf;
+import 
org.apache.pinot.controller.api.exception.ControllerApplicationException;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import 
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats;
+import org.apache.pinot.core.periodictask.BasePeriodicTask;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Periodic task to check if tenant rebalance jobs are stuck and retry them. 
Controller crashes or restarts could
+ * make a tenant rebalance job stuck.
+ * The task checks for each tenant rebalance job's metadata in ZK, look at the 
TenantTableRebalanceJobContext in
+ * ongoingJobsQueue, which is a list with table rebalance job ids that the 
controller is currently processing, to see
+ * if any of these table rebalance jobs has not updated their progress stats 
longer than the configured heartbeat
+ * timeout. If so, the tenant rebalance job is considered stuck, and the task 
will resume the tenant rebalance job by
+ * aborting all the ongoing table rebalance jobs, move the ongoing 
TenantTableRebalanceJobContext back to the head of
+ * the parallel queue, and then trigger the tenant rebalance job again with 
the updated context, with an attempt job ID
+ * <p>
+ * Notice that fundamentally this is not a retry but a resume, since we will 
not re-do the table rebalance for those
+ * tables that have already been processed.
+ */
+public class TenantRebalanceChecker extends BasePeriodicTask {
+  private final static String TASK_NAME = 
TenantRebalanceChecker.class.getSimpleName();
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(TenantRebalanceChecker.class);
+  private static final double RETRY_DELAY_SCALE_FACTOR = 2.0;
+  private final TenantRebalancer _tenantRebalancer;
+  private final PinotHelixResourceManager _pinotHelixResourceManager;
+
+  public TenantRebalanceChecker(ControllerConf config,
+      ControllerMetrics controllerMetrics,
+      PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer 
tenantRebalancer) {
+    super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(),
+        config.getTenantRebalanceCheckerInitialDelayInSeconds());
+    _pinotHelixResourceManager = pinotHelixResourceManager;
+    _tenantRebalancer = tenantRebalancer;
+  }
+
+  @Override
+  protected void runTask(Properties periodicTaskProperties) {
+    checkAndRetryTenantRebalance();
+  }
+
+  private void checkAndRetryTenantRebalance() {
+    Map<String, Map<String, String>> allJobMetadataByJobId =
+        
_pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE),
 x -> true);
+    for (Map.Entry<String, Map<String, String>> entry : 
allJobMetadataByJobId.entrySet()) {
+      String jobId = entry.getKey();
+      Map<String, String> jobZKMetadata = entry.getValue();
+
+      try {
+        // Check if the tenant rebalance job is stuck
+        String tenantRebalanceContextStr =
+            
jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT);
+        String tenantRebalanceProgressStatsStr =
+            
jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS);
+        if (StringUtils.isEmpty(tenantRebalanceContextStr) || 
StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) {
+          // Skip rebalance job: {} as it has no job context or progress stats
+          LOGGER.info("Skip checking tenant rebalance job: {} as it has no job 
context or progress stats", jobId);
+          continue;
+        }
+        DefaultTenantRebalanceContext tenantRebalanceContext =
+            JsonUtils.stringToObject(tenantRebalanceContextStr, 
DefaultTenantRebalanceContext.class);
+        TenantRebalanceProgressStats progressStats =
+            JsonUtils.stringToObject(tenantRebalanceProgressStatsStr, 
TenantRebalanceProgressStats.class);
+        long statsUpdatedAt = 
Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS));
+
+        DefaultTenantRebalanceContext retryTenantRebalanceContext =
+            prepareRetryIfTenantRebalanceJobStuck(jobZKMetadata, 
tenantRebalanceContext, statsUpdatedAt);
+        if (retryTenantRebalanceContext != null) {
+          for (TenantRebalancer.TenantTableRebalanceJobContext ctx;
+              (ctx = retryTenantRebalanceContext.getOngoingJobsQueue().poll()) 
!= null; ) {
+            abortTableRebalanceJob(ctx.getTableName());
+            // the existing table rebalance job is aborted, we need to run the 
rebalance job with a new job ID.
+            TenantRebalancer.TenantTableRebalanceJobContext newCtx =
+                new TenantRebalancer.TenantTableRebalanceJobContext(
+                    ctx.getTableName(), UUID.randomUUID().toString(), 
ctx.shouldRebalanceWithDowntime());
+            retryTenantRebalanceContext.getParallelQueue().addFirst(newCtx);
+          }
+          // the retry tenant rebalance job id has been created in ZK, we can 
safely mark the original job as
+          // aborted, so that this original job will not be picked up again in 
the future.

Review Comment:
   For job `jobId_123`, if it's stuck, the checker(controller) will:
   1. Creates a new retry job metadata `jobId_123_2` in ZK
   2. Abort all ongoing "_table_" rebalance jobs in `jobId_123`
   3. Clear all queues in `jobId_123`'s context, which is what I meant aborting 
the original tenant rebalance job
   4. Start retry `jobId_123_2`
   
   If the controller crashes happens:
   * Before 1:  the next round of the checker check will pick up `jobId_123` 
and retry it again.
   * Between 1 & 2: the next round of the checker check will pick up 
`jobId_123_2` and `jobId_123` as they're both considered stuck, but `jobId_123` 
will not be retried because `jobId_123_2` already exists on ZK. `jobId_123_3` 
will be created then that retries `jobId_123_2`
   * Between 2 & 3: I found a bug here, if the controller crashes here, the 
next round of the checker will see `jobId_123_2` as not stuck and will not 
retry it because all the jobs in ongoing job queue of `jobId_123_2` is 
`ABORTED` but an `ABORTED` table was not considered as stuck. I've changed the 
method `isTableRebalanceJobStuck` to cover this scenario, so now it should be 
considered stcuk, and `jobId_123_3` will spawn to retry.
   * Between 3 & 4: There's no change of metadata of `jobId_123_2` in step 3, 
so `jobId_123_2` will be considered stuck as the previous scenario. `jobId_123` 
will not be pick up because its queues are empty
   * After 4: Falls into the same scenario of a typical stuck tenant rebalance 
job.
   
   Let me know if this makes sense, these are only tested in my monkey brain 
sandbox



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to