Copilot commented on code in PR #16455: URL: https://github.com/apache/pinot/pull/16455#discussion_r2244467978
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java: ########## @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Periodic task to check if tenant rebalance jobs are stuck and retry them. Controller crashes or restarts could + * make a tenant rebalance job stuck. + * The task checks for each tenant rebalance job's metadata in ZK, look at the TenantTableRebalanceJobContext in + * ongoingJobsQueue, which is a list with table rebalance job ids that the controller is currently processing, to see + * if any of these table rebalance jobs has not updated their progress stats longer than the configured heartbeat + * timeout. If so, the tenant rebalance job is considered stuck, and the task will resume the tenant rebalance job by + * aborting all the ongoing table rebalance jobs, move the ongoing TenantTableRebalanceJobContext back to the head of + * the parallel queue, and then trigger the tenant rebalance job again with the updated context. + * <p> + * Notice that this is not a retry but a resume, since we will not re-do the table rebalance for those tables that have + * already been processed. + */ +public class TenantRebalanceChecker extends BasePeriodicTask { + private final static String TASK_NAME = TenantRebalanceChecker.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(TenantRebalanceChecker.class); + private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; + private final TenantRebalancer _tenantRebalancer; + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public TenantRebalanceChecker(ControllerConf config, + ControllerMetrics controllerMetrics, + PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer tenantRebalancer) { + super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(), + config.getTenantRebalanceCheckerInitialDelayInSeconds()); + _pinotHelixResourceManager = pinotHelixResourceManager; + _tenantRebalancer = tenantRebalancer; + } + + @Override + protected void runTask(Properties periodicTaskProperties) { + checkAndRetryTenantRebalance(); + } + + private void checkAndRetryTenantRebalance() { + Map<String, Map<String, String>> allJobMetadataByJobId = + _pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE), x -> true); + for (Map.Entry<String, Map<String, String>> entry : allJobMetadataByJobId.entrySet()) { + String jobId = entry.getKey(); + Map<String, String> jobZKMetadata = entry.getValue(); + + try { + // Check if the tenant rebalance job is stuck + String tenantRebalanceContextStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + String tenantRebalanceProgressStatsStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + if (StringUtils.isEmpty(tenantRebalanceContextStr) || StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) { + // Skip rebalance job: {} as it has no job context or progress stats + LOGGER.info("Skip checking tenant rebalance job: {} as it has no job context or progress stats", jobId); + continue; + } + DefaultTenantRebalanceContext tenantRebalanceContext = + JsonUtils.stringToObject(tenantRebalanceContextStr, DefaultTenantRebalanceContext.class); + TenantRebalanceProgressStats progressStats = + JsonUtils.stringToObject(tenantRebalanceProgressStatsStr, TenantRebalanceProgressStats.class); + long statsUpdatedAt = Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); + + if (isTenantRebalanceJobStuck(tenantRebalanceContext, statsUpdatedAt)) { + for (TenantRebalancer.TenantTableRebalanceJobContext ctx; + (ctx = tenantRebalanceContext.getOngoingJobsQueue().poll()) != null; ) { + abortTableRebalanceJob(ctx.getTableName()); + tenantRebalanceContext.getParallelQueue().addFirst(ctx); + } + // If the job is stuck, we retry it + resumeTenantRebalanceJob(tenantRebalanceContext, progressStats); + } else { + LOGGER.info("Tenant rebalance job: {} is not stuck", jobId); + } + } catch (JsonProcessingException e) { + // If we cannot parse the job metadata, we skip this job + LOGGER.warn("Failed to parse tenant rebalance context for job: {}, skipping", jobId); + } + } + } + + private void resumeTenantRebalanceJob(DefaultTenantRebalanceContext tenantRebalanceContext, + TenantRebalanceProgressStats progressStats) { + DefaultTenantRebalanceContext tenantRebalanceContextForRetry = + DefaultTenantRebalanceContext.forRetry(tenantRebalanceContext.getOriginalJobId(), + tenantRebalanceContext.getConfig(), tenantRebalanceContext.getAttemptId() + 1, + tenantRebalanceContext.getParallelQueue(), + tenantRebalanceContext.getSequentialQueue(), tenantRebalanceContext.getOngoingJobsQueue()); + + ZkBasedTenantRebalanceObserver observer = + new ZkBasedTenantRebalanceObserver(tenantRebalanceContext.getJobId(), + tenantRebalanceContext.getConfig().getTenantName(), + progressStats, tenantRebalanceContextForRetry, _pinotHelixResourceManager); + ((DefaultTenantRebalancer) _tenantRebalancer).rebalanceWithContext(tenantRebalanceContextForRetry, observer); Review Comment: The cast to DefaultTenantRebalancer indicates a potential design issue. Consider adding the rebalanceWithContext method to the TenantRebalancer interface to avoid casting and improve type safety. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalancer.java: ########## @@ -35,7 +39,9 @@ class TenantTableRebalanceJobContext { * @param withDowntime Whether the rebalance should be done with downtime or minAvailableReplicas=0. * @return The result of the rebalance operation. */ Review Comment: The @JsonCreator annotation should include a comment explaining why this constructor needs explicit JSON mapping, especially since it's part of a serialization chain for ZooKeeper persistence. ```suggestion */ /** * The @JsonCreator annotation is required to explicitly define how this constructor maps JSON properties * to the class fields. This is necessary because this class is part of a serialization chain for * ZooKeeper persistence, and explicit mapping ensures correct deserialization of JSON objects. */ ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java: ########## @@ -142,20 +129,92 @@ public TenantRebalanceResult rebalance(TenantRebalanceConfig config) { return new TenantRebalanceResult(tenantRebalanceJobId, rebalanceResults, config.isVerboseResult()); } - private void doConsumeTablesFromQueue(Queue<TenantTableRebalanceJobContext> queue, RebalanceConfig config, - TenantRebalanceObserver observer) { + /** + * Spins up threads to rebalance the tenant with the given context and observer. + * The rebalance operation is performed in parallel for the tables in the parallel queue, then, sequentially for the + * tables in the sequential queue. + * The observer should be initiated with the tenantRebalanceContext in order to track the progress properly. + * + * @param tenantRebalanceContext The context containing the configuration and queues for the rebalance operation. + * @param observer The observer to notify about the rebalance progress and results. + */ + public void rebalanceWithContext(DefaultTenantRebalanceContext tenantRebalanceContext, + ZkBasedTenantRebalanceObserver observer) { + TenantRebalanceConfig config = tenantRebalanceContext.getConfig(); + ConcurrentLinkedDeque<TenantTableRebalanceJobContext> parallelQueue = tenantRebalanceContext.getParallelQueue(); + Queue<TenantTableRebalanceJobContext> sequentialQueue = tenantRebalanceContext.getSequentialQueue(); + ConcurrentLinkedQueue<TenantTableRebalanceJobContext> ongoingJobs = tenantRebalanceContext.getOngoingJobsQueue(); + + observer.onTrigger(TenantRebalanceObserver.Trigger.START_TRIGGER, null, null); + + // ensure atleast 1 thread is created to run the sequential table rebalance operations + int parallelism = Math.max(config.getDegreeOfParallelism(), 1); + AtomicInteger activeThreads = new AtomicInteger(parallelism); + try { + for (int i = 0; i < parallelism; i++) { + _executorService.submit(() -> { + doConsumeTablesFromQueueAndRebalance(parallelQueue, ongoingJobs, config, observer); + if (activeThreads.decrementAndGet() == 0) { + doConsumeTablesFromQueueAndRebalance(sequentialQueue, ongoingJobs, config, observer); + observer.onSuccess(String.format("Successfully rebalanced tenant %s.", config.getTenantName())); + } + }); + } + } catch (Exception exception) { + observer.onError(String.format("Failed to rebalance the tenant %s. Cause: %s", config.getTenantName(), + exception.getMessage())); + } + } + + /** + * Consumes tables from the given queue from the DefaultTenantRebalanceContext that is being monitored by the + * observer and rebalances them using the provided config. + * The ongoing jobs are tracked in the ongoingJobs queue, which is also from the monitored + * DefaultTenantRebalanceContext. + * + * @param queue The queue of TenantTableRebalanceJobContext to consume tables from. + * @param ongoingJobs The queue to track ongoing rebalance jobs. + * @param config The rebalance configuration to use for the rebalancing. + * @param observer The observer to notify about the rebalance progress and results, should be initiated with the + * DefaultTenantRebalanceContext that contains `queue` and `ongoingJobs`. + */ + private void doConsumeTablesFromQueueAndRebalance(Queue<TenantTableRebalanceJobContext> queue, + Queue<TenantTableRebalanceJobContext> ongoingJobs, RebalanceConfig config, + ZkBasedTenantRebalanceObserver observer) { while (true) { TenantTableRebalanceJobContext jobContext = queue.poll(); if (jobContext == null) { break; } - String table = jobContext.getTableName(); + ongoingJobs.add(jobContext); + String tableName = jobContext.getTableName(); + String rebalanceJobId = jobContext.getJobId(); RebalanceConfig rebalanceConfig = RebalanceConfig.copy(config); rebalanceConfig.setDryRun(false); if (jobContext.shouldRebalanceWithDowntime()) { rebalanceConfig.setMinAvailableReplicas(0); } - rebalanceTable(table, rebalanceConfig, jobContext.getJobId(), observer); + try { + observer.onTrigger(TenantRebalanceObserver.Trigger.REBALANCE_STARTED_TRIGGER, tableName, rebalanceJobId); + // Disallow TABLE rebalance checker to retry the rebalance job here, since we want TENANT rebalance checker + // to do so Review Comment: The comment mentions coordination between TABLE and TENANT rebalance checkers but doesn't explain the specific behavior or potential race conditions. This coordination logic should be better documented. ```suggestion // Disallow TABLE rebalance checker to retry the rebalance job here. This is because the TENANT rebalance // checker is responsible for managing the overall rebalancing process across multiple tables within a tenant. // Allowing the TABLE rebalance checker to retry could lead to conflicts or redundant operations, as it does // not have visibility into the broader tenant-level context. By centralizing retry logic in the TENANT // rebalance checker, we ensure that retries are coordinated and aligned with the tenant's rebalancing strategy. // This approach also helps prevent potential race conditions where both checkers might attempt to retry the // same job simultaneously, leading to inconsistent states or errors. ``` ########## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceCheckerTest.java: ########## @@ -0,0 +1,513 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; Review Comment: This import appears to be unused since the code uses ConcurrentLinkedDeque in the actual implementation. Consider removing unused imports. ```suggestion ``` ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java: ########## @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Periodic task to check if tenant rebalance jobs are stuck and retry them. Controller crashes or restarts could + * make a tenant rebalance job stuck. + * The task checks for each tenant rebalance job's metadata in ZK, look at the TenantTableRebalanceJobContext in + * ongoingJobsQueue, which is a list with table rebalance job ids that the controller is currently processing, to see + * if any of these table rebalance jobs has not updated their progress stats longer than the configured heartbeat + * timeout. If so, the tenant rebalance job is considered stuck, and the task will resume the tenant rebalance job by + * aborting all the ongoing table rebalance jobs, move the ongoing TenantTableRebalanceJobContext back to the head of + * the parallel queue, and then trigger the tenant rebalance job again with the updated context. + * <p> + * Notice that this is not a retry but a resume, since we will not re-do the table rebalance for those tables that have + * already been processed. + */ +public class TenantRebalanceChecker extends BasePeriodicTask { + private final static String TASK_NAME = TenantRebalanceChecker.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(TenantRebalanceChecker.class); + private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; + private final TenantRebalancer _tenantRebalancer; + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public TenantRebalanceChecker(ControllerConf config, + ControllerMetrics controllerMetrics, + PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer tenantRebalancer) { + super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(), + config.getTenantRebalanceCheckerInitialDelayInSeconds()); + _pinotHelixResourceManager = pinotHelixResourceManager; + _tenantRebalancer = tenantRebalancer; + } + + @Override + protected void runTask(Properties periodicTaskProperties) { + checkAndRetryTenantRebalance(); + } + + private void checkAndRetryTenantRebalance() { + Map<String, Map<String, String>> allJobMetadataByJobId = + _pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE), x -> true); + for (Map.Entry<String, Map<String, String>> entry : allJobMetadataByJobId.entrySet()) { + String jobId = entry.getKey(); + Map<String, String> jobZKMetadata = entry.getValue(); + + try { + // Check if the tenant rebalance job is stuck + String tenantRebalanceContextStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + String tenantRebalanceProgressStatsStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + if (StringUtils.isEmpty(tenantRebalanceContextStr) || StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) { + // Skip rebalance job: {} as it has no job context or progress stats + LOGGER.info("Skip checking tenant rebalance job: {} as it has no job context or progress stats", jobId); + continue; + } + DefaultTenantRebalanceContext tenantRebalanceContext = + JsonUtils.stringToObject(tenantRebalanceContextStr, DefaultTenantRebalanceContext.class); + TenantRebalanceProgressStats progressStats = + JsonUtils.stringToObject(tenantRebalanceProgressStatsStr, TenantRebalanceProgressStats.class); + long statsUpdatedAt = Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); + + if (isTenantRebalanceJobStuck(tenantRebalanceContext, statsUpdatedAt)) { + for (TenantRebalancer.TenantTableRebalanceJobContext ctx; + (ctx = tenantRebalanceContext.getOngoingJobsQueue().poll()) != null; ) { + abortTableRebalanceJob(ctx.getTableName()); + tenantRebalanceContext.getParallelQueue().addFirst(ctx); + } + // If the job is stuck, we retry it + resumeTenantRebalanceJob(tenantRebalanceContext, progressStats); + } else { + LOGGER.info("Tenant rebalance job: {} is not stuck", jobId); + } + } catch (JsonProcessingException e) { + // If we cannot parse the job metadata, we skip this job + LOGGER.warn("Failed to parse tenant rebalance context for job: {}, skipping", jobId); + } + } + } + + private void resumeTenantRebalanceJob(DefaultTenantRebalanceContext tenantRebalanceContext, + TenantRebalanceProgressStats progressStats) { + DefaultTenantRebalanceContext tenantRebalanceContextForRetry = + DefaultTenantRebalanceContext.forRetry(tenantRebalanceContext.getOriginalJobId(), + tenantRebalanceContext.getConfig(), tenantRebalanceContext.getAttemptId() + 1, + tenantRebalanceContext.getParallelQueue(), + tenantRebalanceContext.getSequentialQueue(), tenantRebalanceContext.getOngoingJobsQueue()); + + ZkBasedTenantRebalanceObserver observer = + new ZkBasedTenantRebalanceObserver(tenantRebalanceContext.getJobId(), + tenantRebalanceContext.getConfig().getTenantName(), + progressStats, tenantRebalanceContextForRetry, _pinotHelixResourceManager); + ((DefaultTenantRebalancer) _tenantRebalancer).rebalanceWithContext(tenantRebalanceContextForRetry, observer); + } + + private boolean isTenantRebalanceJobStuck( + DefaultTenantRebalanceContext tenantRebalanceContext, long statsUpdatedAt) { + if (tenantRebalanceContext.getOngoingJobsQueue().isEmpty()) { + if (tenantRebalanceContext.getParallelQueue().isEmpty() && tenantRebalanceContext.getSequentialQueue() + .isEmpty()) { + return false; + } + // If there are no ongoing jobs but in parallel or sequential queue, it could be because the tenant rebalancer + // is in the interval of the previous done job and consumption of the next job. We need to check the heartbeat + // timeout to be sure that it's actually stuck at this state for a long while. + long heartbeatTimeoutMs = tenantRebalanceContext.getConfig().getHeartbeatTimeoutInMs(); + return (System.currentTimeMillis() - statsUpdatedAt >= heartbeatTimeoutMs); + } + + // Check if there's any stuck ongoing table rebalance jobs + for (TenantRebalancer.TenantTableRebalanceJobContext ctx : new ArrayList<>( + tenantRebalanceContext.getOngoingJobsQueue())) { + if (isTableRebalanceJobStuck(ctx.getJobId())) { + // If any of the table rebalance jobs is stuck, we consider the tenant rebalance job as stuck. + LOGGER.info("Found stuck table rebalance job: {} for tenant: {}", ctx.getJobId(), + tenantRebalanceContext.getConfig().getTenantName()); + return true; + } + } + return false; + } + + private boolean isTableRebalanceJobStuck(String jobId) { + Map<String, String> jobMetadata = + _pinotHelixResourceManager.getControllerJobZKMetadata(jobId, ControllerJobTypes.TABLE_REBALANCE); + if (jobMetadata == null) { + return false; + } + long statsUpdatedAt = Long.parseLong(jobMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); + String jobStatsInStr = jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + if (StringUtils.isEmpty(jobStatsInStr)) { + // Skip rebalance job as it has no job progress stats + return false; + } + String jobCtxInStr = jobMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + if (StringUtils.isEmpty(jobCtxInStr)) { + // Skip rebalance job: {} as it has no job context + return false; + } + + try { + TableRebalanceProgressStats jobStats = JsonUtils.stringToObject(jobStatsInStr, TableRebalanceProgressStats.class); + TableRebalanceContext jobCtx = JsonUtils.stringToObject(jobCtxInStr, TableRebalanceContext.class); + if (jobStats.getStatus() == RebalanceResult.Status.IN_PROGRESS) { + long heartbeatTimeoutMs = jobCtx.getConfig().getHeartbeatTimeoutInMs(); + if (System.currentTimeMillis() - statsUpdatedAt >= heartbeatTimeoutMs) { + LOGGER.info("Found stuck rebalance job: {} that has not updated its status in ZK within " + + "heartbeat timeout: {}", jobId, heartbeatTimeoutMs); + return true; + } + } + } catch (JsonProcessingException e) { + throw new ControllerApplicationException(LOGGER, "Failed to parse table rebalance context for job: " + jobId, + Response.Status.INTERNAL_SERVER_ERROR, e); + } + return false; + } + + private void abortTableRebalanceJob(String tableNameWithType) { + // TODO: This is a duplicate of a private method in RebalanceChecker, we should refactor it to a common place. Review Comment: The TODO comment indicates code duplication that should be addressed. Consider extracting the abortTableRebalanceJob method to a shared utility class to reduce duplication and improve maintainability. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
