yashmayya commented on code in PR #16455: URL: https://github.com/apache/pinot/pull/16455#discussion_r2293550323
########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalanceContext.java: ########## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * Default implementation of TenantRebalanceContext that includes parallel and sequential queues + * for managing tenant rebalance operations. + */ +public class DefaultTenantRebalanceContext extends TenantRebalanceContext { + private final ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> _parallelQueue; + private final Queue<TenantRebalancer.TenantTableRebalanceJobContext> _sequentialQueue; + private final ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext> _ongoingJobsQueue; + + public DefaultTenantRebalanceContext() { + super(); + _parallelQueue = new ConcurrentLinkedDeque<>(); + _sequentialQueue = new LinkedList<>(); + _ongoingJobsQueue = new ConcurrentLinkedQueue<>(); + } + + public DefaultTenantRebalanceContext(String originalJobId, TenantRebalanceConfig config, int attemptId, + boolean allowRetries, ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> parallelQueue, Review Comment: What's `allowRetries` being used for in this context? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java: ########## @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Periodic task to check if tenant rebalance jobs are stuck and retry them. Controller crashes or restarts could + * make a tenant rebalance job stuck. + * The task checks for each tenant rebalance job's metadata in ZK, look at the TenantTableRebalanceJobContext in + * ongoingJobsQueue, which is a list with table rebalance job ids that the controller is currently processing, to see + * if any of these table rebalance jobs has not updated their progress stats longer than the configured heartbeat + * timeout. If so, the tenant rebalance job is considered stuck, and the task will resume the tenant rebalance job by + * aborting all the ongoing table rebalance jobs, move the ongoing TenantTableRebalanceJobContext back to the head of + * the parallel queue, and then trigger the tenant rebalance job again with the updated context, with an attempt job ID + * <p> + * Notice that fundamentally this is not a retry but a resume, since we will not re-do the table rebalance for those + * tables that have already been processed. + */ +public class TenantRebalanceChecker extends BasePeriodicTask { + private final static String TASK_NAME = TenantRebalanceChecker.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(TenantRebalanceChecker.class); + private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; + private final TenantRebalancer _tenantRebalancer; + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public TenantRebalanceChecker(ControllerConf config, + ControllerMetrics controllerMetrics, Review Comment: `ControllerMetrics` is unused here? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalanceContext.java: ########## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * Default implementation of TenantRebalanceContext that includes parallel and sequential queues + * for managing tenant rebalance operations. + */ +public class DefaultTenantRebalanceContext extends TenantRebalanceContext { + private final ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> _parallelQueue; + private final Queue<TenantRebalancer.TenantTableRebalanceJobContext> _sequentialQueue; + private final ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext> _ongoingJobsQueue; + + public DefaultTenantRebalanceContext() { Review Comment: Unused? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java: ########## @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Periodic task to check if tenant rebalance jobs are stuck and retry them. Controller crashes or restarts could + * make a tenant rebalance job stuck. + * The task checks for each tenant rebalance job's metadata in ZK, look at the TenantTableRebalanceJobContext in + * ongoingJobsQueue, which is a list with table rebalance job ids that the controller is currently processing, to see + * if any of these table rebalance jobs has not updated their progress stats longer than the configured heartbeat + * timeout. If so, the tenant rebalance job is considered stuck, and the task will resume the tenant rebalance job by + * aborting all the ongoing table rebalance jobs, move the ongoing TenantTableRebalanceJobContext back to the head of + * the parallel queue, and then trigger the tenant rebalance job again with the updated context, with an attempt job ID + * <p> + * Notice that fundamentally this is not a retry but a resume, since we will not re-do the table rebalance for those + * tables that have already been processed. + */ +public class TenantRebalanceChecker extends BasePeriodicTask { + private final static String TASK_NAME = TenantRebalanceChecker.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(TenantRebalanceChecker.class); + private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; + private final TenantRebalancer _tenantRebalancer; + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public TenantRebalanceChecker(ControllerConf config, + ControllerMetrics controllerMetrics, + PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer tenantRebalancer) { + super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(), + config.getTenantRebalanceCheckerInitialDelayInSeconds()); + _pinotHelixResourceManager = pinotHelixResourceManager; + _tenantRebalancer = tenantRebalancer; + } + + @Override + protected void runTask(Properties periodicTaskProperties) { + checkAndRetryTenantRebalance(); + } + + private void checkAndRetryTenantRebalance() { + Map<String, Map<String, String>> allJobMetadataByJobId = + _pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE), x -> true); + for (Map.Entry<String, Map<String, String>> entry : allJobMetadataByJobId.entrySet()) { + String jobId = entry.getKey(); + Map<String, String> jobZKMetadata = entry.getValue(); + + try { + // Check if the tenant rebalance job is stuck + String tenantRebalanceContextStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + String tenantRebalanceProgressStatsStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + if (StringUtils.isEmpty(tenantRebalanceContextStr) || StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) { + // Skip rebalance job: {} as it has no job context or progress stats + LOGGER.info("Skip checking tenant rebalance job: {} as it has no job context or progress stats", jobId); + continue; + } + DefaultTenantRebalanceContext tenantRebalanceContext = + JsonUtils.stringToObject(tenantRebalanceContextStr, DefaultTenantRebalanceContext.class); + TenantRebalanceProgressStats progressStats = + JsonUtils.stringToObject(tenantRebalanceProgressStatsStr, TenantRebalanceProgressStats.class); + long statsUpdatedAt = Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); + + DefaultTenantRebalanceContext retryTenantRebalanceContext = + prepareRetryIfTenantRebalanceJobStuck(jobZKMetadata, tenantRebalanceContext, statsUpdatedAt); + if (retryTenantRebalanceContext != null) { + for (TenantRebalancer.TenantTableRebalanceJobContext ctx; + (ctx = retryTenantRebalanceContext.getOngoingJobsQueue().poll()) != null; ) { + abortTableRebalanceJob(ctx.getTableName()); + // the existing table rebalance job is aborted, we need to run the rebalance job with a new job ID. + TenantRebalancer.TenantTableRebalanceJobContext newCtx = + new TenantRebalancer.TenantTableRebalanceJobContext( + ctx.getTableName(), UUID.randomUUID().toString(), ctx.shouldRebalanceWithDowntime()); + retryTenantRebalanceContext.getParallelQueue().addFirst(newCtx); + } + // the retry tenant rebalance job id has been created in ZK, we can safely mark the original job as + // aborted, so that this original job will not be picked up again in the future. + markTenantRebalanceJobAsAborted(jobId, jobZKMetadata, tenantRebalanceContext, progressStats); + retryTenantRebalanceJob(retryTenantRebalanceContext, progressStats); + } else { + LOGGER.info("Tenant rebalance job: {} is not stuck", jobId); + } + } catch (JsonProcessingException e) { + // If we cannot parse the job metadata, we skip this job + LOGGER.warn("Failed to parse tenant rebalance context for job: {}, skipping", jobId); + } + } + } + + private void retryTenantRebalanceJob(DefaultTenantRebalanceContext tenantRebalanceContextForRetry, + TenantRebalanceProgressStats progressStats) { + ZkBasedTenantRebalanceObserver observer = + new ZkBasedTenantRebalanceObserver(tenantRebalanceContextForRetry.getJobId(), + tenantRebalanceContextForRetry.getConfig().getTenantName(), + progressStats, tenantRebalanceContextForRetry, _pinotHelixResourceManager); + ((DefaultTenantRebalancer) _tenantRebalancer).rebalanceWithContext(tenantRebalanceContextForRetry, observer); + } + + /** + * Check if the tenant rebalance job is stuck, and prepare to retry it if necessary. + * A tenant rebalance job is considered stuck if: + * 1. There are no ongoing jobs, but there are jobs in the parallel or sequential queue, and the stats have not been + * updated for longer than the heartbeat timeout. + * 2. There are ongoing table rebalance jobs, and at least one of them has not updated its status for longer than the + * heartbeat timeout. + * The retry is prepared by creating a new tenant rebalance job with an incremented attempt ID, and persisting it to + * ZK. + * + * @param jobZKMetadata The ZK metadata of the tenant rebalance job. + * @param tenantRebalanceContext The context of the tenant rebalance job. + * @param statsUpdatedAt The timestamp when the stats were last updated. + * @return The TenantRebalanceContext for retry if the tenant rebalance job is stuck and should be retried, null if + * other controller has prepared the retry first. Review Comment: > null if other controller has prepared the retry first Or if tenant rebalance is not stuck and no retry is needed? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java: ########## @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Periodic task to check if tenant rebalance jobs are stuck and retry them. Controller crashes or restarts could + * make a tenant rebalance job stuck. + * The task checks for each tenant rebalance job's metadata in ZK, look at the TenantTableRebalanceJobContext in + * ongoingJobsQueue, which is a list with table rebalance job ids that the controller is currently processing, to see + * if any of these table rebalance jobs has not updated their progress stats longer than the configured heartbeat + * timeout. If so, the tenant rebalance job is considered stuck, and the task will resume the tenant rebalance job by + * aborting all the ongoing table rebalance jobs, move the ongoing TenantTableRebalanceJobContext back to the head of + * the parallel queue, and then trigger the tenant rebalance job again with the updated context, with an attempt job ID + * <p> + * Notice that fundamentally this is not a retry but a resume, since we will not re-do the table rebalance for those + * tables that have already been processed. + */ +public class TenantRebalanceChecker extends BasePeriodicTask { + private final static String TASK_NAME = TenantRebalanceChecker.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(TenantRebalanceChecker.class); + private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; + private final TenantRebalancer _tenantRebalancer; + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public TenantRebalanceChecker(ControllerConf config, + ControllerMetrics controllerMetrics, + PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer tenantRebalancer) { + super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(), + config.getTenantRebalanceCheckerInitialDelayInSeconds()); + _pinotHelixResourceManager = pinotHelixResourceManager; + _tenantRebalancer = tenantRebalancer; + } + + @Override + protected void runTask(Properties periodicTaskProperties) { + checkAndRetryTenantRebalance(); + } + + private void checkAndRetryTenantRebalance() { + Map<String, Map<String, String>> allJobMetadataByJobId = + _pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE), x -> true); + for (Map.Entry<String, Map<String, String>> entry : allJobMetadataByJobId.entrySet()) { + String jobId = entry.getKey(); + Map<String, String> jobZKMetadata = entry.getValue(); + + try { + // Check if the tenant rebalance job is stuck + String tenantRebalanceContextStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + String tenantRebalanceProgressStatsStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + if (StringUtils.isEmpty(tenantRebalanceContextStr) || StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) { + // Skip rebalance job: {} as it has no job context or progress stats + LOGGER.info("Skip checking tenant rebalance job: {} as it has no job context or progress stats", jobId); + continue; + } + DefaultTenantRebalanceContext tenantRebalanceContext = + JsonUtils.stringToObject(tenantRebalanceContextStr, DefaultTenantRebalanceContext.class); + TenantRebalanceProgressStats progressStats = + JsonUtils.stringToObject(tenantRebalanceProgressStatsStr, TenantRebalanceProgressStats.class); + long statsUpdatedAt = Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); + + DefaultTenantRebalanceContext retryTenantRebalanceContext = + prepareRetryIfTenantRebalanceJobStuck(jobZKMetadata, tenantRebalanceContext, statsUpdatedAt); + if (retryTenantRebalanceContext != null) { + for (TenantRebalancer.TenantTableRebalanceJobContext ctx; + (ctx = retryTenantRebalanceContext.getOngoingJobsQueue().poll()) != null; ) { + abortTableRebalanceJob(ctx.getTableName()); + // the existing table rebalance job is aborted, we need to run the rebalance job with a new job ID. + TenantRebalancer.TenantTableRebalanceJobContext newCtx = + new TenantRebalancer.TenantTableRebalanceJobContext( + ctx.getTableName(), UUID.randomUUID().toString(), ctx.shouldRebalanceWithDowntime()); + retryTenantRebalanceContext.getParallelQueue().addFirst(newCtx); + } + // the retry tenant rebalance job id has been created in ZK, we can safely mark the original job as + // aborted, so that this original job will not be picked up again in the future. + markTenantRebalanceJobAsAborted(jobId, jobZKMetadata, tenantRebalanceContext, progressStats); + retryTenantRebalanceJob(retryTenantRebalanceContext, progressStats); + } else { + LOGGER.info("Tenant rebalance job: {} is not stuck", jobId); + } + } catch (JsonProcessingException e) { + // If we cannot parse the job metadata, we skip this job + LOGGER.warn("Failed to parse tenant rebalance context for job: {}, skipping", jobId); + } + } + } + + private void retryTenantRebalanceJob(DefaultTenantRebalanceContext tenantRebalanceContextForRetry, + TenantRebalanceProgressStats progressStats) { + ZkBasedTenantRebalanceObserver observer = + new ZkBasedTenantRebalanceObserver(tenantRebalanceContextForRetry.getJobId(), + tenantRebalanceContextForRetry.getConfig().getTenantName(), + progressStats, tenantRebalanceContextForRetry, _pinotHelixResourceManager); + ((DefaultTenantRebalancer) _tenantRebalancer).rebalanceWithContext(tenantRebalanceContextForRetry, observer); + } + + /** + * Check if the tenant rebalance job is stuck, and prepare to retry it if necessary. + * A tenant rebalance job is considered stuck if: + * 1. There are no ongoing jobs, but there are jobs in the parallel or sequential queue, and the stats have not been + * updated for longer than the heartbeat timeout. + * 2. There are ongoing table rebalance jobs, and at least one of them has not updated its status for longer than the Review Comment: I think these checks make sense but do add one line explaining the logic here (for instance, why we can't simply use heartbeat timeout check on the overall tenant rebalance stats). It's usually more useful to describe _why_ something is being done rather than simply describing _what_ is being done. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalanceContext.java: ########## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * Default implementation of TenantRebalanceContext that includes parallel and sequential queues + * for managing tenant rebalance operations. + */ +public class DefaultTenantRebalanceContext extends TenantRebalanceContext { Review Comment: Why do we need the base abstract `TenantRebalanceContext` class if this is the only implementation? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java: ########## @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Periodic task to check if tenant rebalance jobs are stuck and retry them. Controller crashes or restarts could + * make a tenant rebalance job stuck. + * The task checks for each tenant rebalance job's metadata in ZK, look at the TenantTableRebalanceJobContext in + * ongoingJobsQueue, which is a list with table rebalance job ids that the controller is currently processing, to see + * if any of these table rebalance jobs has not updated their progress stats longer than the configured heartbeat + * timeout. If so, the tenant rebalance job is considered stuck, and the task will resume the tenant rebalance job by + * aborting all the ongoing table rebalance jobs, move the ongoing TenantTableRebalanceJobContext back to the head of + * the parallel queue, and then trigger the tenant rebalance job again with the updated context, with an attempt job ID + * <p> + * Notice that fundamentally this is not a retry but a resume, since we will not re-do the table rebalance for those + * tables that have already been processed. + */ +public class TenantRebalanceChecker extends BasePeriodicTask { + private final static String TASK_NAME = TenantRebalanceChecker.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(TenantRebalanceChecker.class); + private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; + private final TenantRebalancer _tenantRebalancer; + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public TenantRebalanceChecker(ControllerConf config, + ControllerMetrics controllerMetrics, + PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer tenantRebalancer) { + super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(), + config.getTenantRebalanceCheckerInitialDelayInSeconds()); + _pinotHelixResourceManager = pinotHelixResourceManager; + _tenantRebalancer = tenantRebalancer; + } + + @Override + protected void runTask(Properties periodicTaskProperties) { + checkAndRetryTenantRebalance(); + } + + private void checkAndRetryTenantRebalance() { + Map<String, Map<String, String>> allJobMetadataByJobId = + _pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE), x -> true); + for (Map.Entry<String, Map<String, String>> entry : allJobMetadataByJobId.entrySet()) { + String jobId = entry.getKey(); + Map<String, String> jobZKMetadata = entry.getValue(); + + try { + // Check if the tenant rebalance job is stuck + String tenantRebalanceContextStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + String tenantRebalanceProgressStatsStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + if (StringUtils.isEmpty(tenantRebalanceContextStr) || StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) { + // Skip rebalance job: {} as it has no job context or progress stats + LOGGER.info("Skip checking tenant rebalance job: {} as it has no job context or progress stats", jobId); + continue; + } + DefaultTenantRebalanceContext tenantRebalanceContext = + JsonUtils.stringToObject(tenantRebalanceContextStr, DefaultTenantRebalanceContext.class); + TenantRebalanceProgressStats progressStats = + JsonUtils.stringToObject(tenantRebalanceProgressStatsStr, TenantRebalanceProgressStats.class); + long statsUpdatedAt = Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); + + DefaultTenantRebalanceContext retryTenantRebalanceContext = + prepareRetryIfTenantRebalanceJobStuck(jobZKMetadata, tenantRebalanceContext, statsUpdatedAt); + if (retryTenantRebalanceContext != null) { + for (TenantRebalancer.TenantTableRebalanceJobContext ctx; + (ctx = retryTenantRebalanceContext.getOngoingJobsQueue().poll()) != null; ) { + abortTableRebalanceJob(ctx.getTableName()); + // the existing table rebalance job is aborted, we need to run the rebalance job with a new job ID. + TenantRebalancer.TenantTableRebalanceJobContext newCtx = + new TenantRebalancer.TenantTableRebalanceJobContext( + ctx.getTableName(), UUID.randomUUID().toString(), ctx.shouldRebalanceWithDowntime()); + retryTenantRebalanceContext.getParallelQueue().addFirst(newCtx); + } + // the retry tenant rebalance job id has been created in ZK, we can safely mark the original job as + // aborted, so that this original job will not be picked up again in the future. Review Comment: What happens if the controller crashes after writing the retry job metadata to ZK, but before marking the original as aborted? What if it crashes after writing + aborting, but before retrying (I guess this will simply manifest as a subsequent stuck job + another retry)? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java: ########## @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Periodic task to check if tenant rebalance jobs are stuck and retry them. Controller crashes or restarts could + * make a tenant rebalance job stuck. + * The task checks for each tenant rebalance job's metadata in ZK, look at the TenantTableRebalanceJobContext in + * ongoingJobsQueue, which is a list with table rebalance job ids that the controller is currently processing, to see + * if any of these table rebalance jobs has not updated their progress stats longer than the configured heartbeat + * timeout. If so, the tenant rebalance job is considered stuck, and the task will resume the tenant rebalance job by + * aborting all the ongoing table rebalance jobs, move the ongoing TenantTableRebalanceJobContext back to the head of + * the parallel queue, and then trigger the tenant rebalance job again with the updated context, with an attempt job ID + * <p> + * Notice that fundamentally this is not a retry but a resume, since we will not re-do the table rebalance for those + * tables that have already been processed. + */ +public class TenantRebalanceChecker extends BasePeriodicTask { + private final static String TASK_NAME = TenantRebalanceChecker.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(TenantRebalanceChecker.class); + private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; Review Comment: Unused? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java: ########## @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Periodic task to check if tenant rebalance jobs are stuck and retry them. Controller crashes or restarts could + * make a tenant rebalance job stuck. + * The task checks for each tenant rebalance job's metadata in ZK, look at the TenantTableRebalanceJobContext in + * ongoingJobsQueue, which is a list with table rebalance job ids that the controller is currently processing, to see + * if any of these table rebalance jobs has not updated their progress stats longer than the configured heartbeat + * timeout. If so, the tenant rebalance job is considered stuck, and the task will resume the tenant rebalance job by + * aborting all the ongoing table rebalance jobs, move the ongoing TenantTableRebalanceJobContext back to the head of + * the parallel queue, and then trigger the tenant rebalance job again with the updated context. + * <p> + * Notice that this is not a retry but a resume, since we will not re-do the table rebalance for those tables that have + * already been processed. + */ +public class TenantRebalanceChecker extends BasePeriodicTask { + private final static String TASK_NAME = TenantRebalanceChecker.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(TenantRebalanceChecker.class); + private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; + private final TenantRebalancer _tenantRebalancer; + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public TenantRebalanceChecker(ControllerConf config, + ControllerMetrics controllerMetrics, + PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer tenantRebalancer) { + super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(), + config.getTenantRebalanceCheckerInitialDelayInSeconds()); + _pinotHelixResourceManager = pinotHelixResourceManager; + _tenantRebalancer = tenantRebalancer; + } + + @Override + protected void runTask(Properties periodicTaskProperties) { + checkAndRetryTenantRebalance(); + } + + private void checkAndRetryTenantRebalance() { + Map<String, Map<String, String>> allJobMetadataByJobId = + _pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE), x -> true); + for (Map.Entry<String, Map<String, String>> entry : allJobMetadataByJobId.entrySet()) { + String jobId = entry.getKey(); + Map<String, String> jobZKMetadata = entry.getValue(); + + try { + // Check if the tenant rebalance job is stuck + String tenantRebalanceContextStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + String tenantRebalanceProgressStatsStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + if (StringUtils.isEmpty(tenantRebalanceContextStr) || StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) { + // Skip rebalance job: {} as it has no job context or progress stats + LOGGER.info("Skip checking tenant rebalance job: {} as it has no job context or progress stats", jobId); + continue; + } + DefaultTenantRebalanceContext tenantRebalanceContext = + JsonUtils.stringToObject(tenantRebalanceContextStr, DefaultTenantRebalanceContext.class); + TenantRebalanceProgressStats progressStats = + JsonUtils.stringToObject(tenantRebalanceProgressStatsStr, TenantRebalanceProgressStats.class); + long statsUpdatedAt = Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); + + if (isTenantRebalanceJobStuck(tenantRebalanceContext, statsUpdatedAt)) { + for (TenantRebalancer.TenantTableRebalanceJobContext ctx; + (ctx = tenantRebalanceContext.getOngoingJobsQueue().poll()) != null; ) { + abortTableRebalanceJob(ctx.getTableName()); + tenantRebalanceContext.getParallelQueue().addFirst(ctx); + } + // If the job is stuck, we retry it + resumeTenantRebalanceJob(tenantRebalanceContext, progressStats); + } else { + LOGGER.info("Tenant rebalance job: {} is not stuck", jobId); + } + } catch (JsonProcessingException e) { + // If we cannot parse the job metadata, we skip this job + LOGGER.warn("Failed to parse tenant rebalance context for job: {}, skipping", jobId); + } + } + } + + private void resumeTenantRebalanceJob(DefaultTenantRebalanceContext tenantRebalanceContext, + TenantRebalanceProgressStats progressStats) { + DefaultTenantRebalanceContext tenantRebalanceContextForRetry = + DefaultTenantRebalanceContext.forRetry(tenantRebalanceContext.getOriginalJobId(), + tenantRebalanceContext.getConfig(), tenantRebalanceContext.getAttemptId() + 1, + tenantRebalanceContext.getParallelQueue(), + tenantRebalanceContext.getSequentialQueue(), tenantRebalanceContext.getOngoingJobsQueue()); + + ZkBasedTenantRebalanceObserver observer = + new ZkBasedTenantRebalanceObserver(tenantRebalanceContext.getJobId(), + tenantRebalanceContext.getConfig().getTenantName(), + progressStats, tenantRebalanceContextForRetry, _pinotHelixResourceManager); + ((DefaultTenantRebalancer) _tenantRebalancer).rebalanceWithContext(tenantRebalanceContextForRetry, observer); Review Comment: Yes, +1 on consolidating - I made a similar comment elsewhere. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java: ########## @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Periodic task to check if tenant rebalance jobs are stuck and retry them. Controller crashes or restarts could + * make a tenant rebalance job stuck. + * The task checks for each tenant rebalance job's metadata in ZK, look at the TenantTableRebalanceJobContext in + * ongoingJobsQueue, which is a list with table rebalance job ids that the controller is currently processing, to see + * if any of these table rebalance jobs has not updated their progress stats longer than the configured heartbeat + * timeout. If so, the tenant rebalance job is considered stuck, and the task will resume the tenant rebalance job by + * aborting all the ongoing table rebalance jobs, move the ongoing TenantTableRebalanceJobContext back to the head of + * the parallel queue, and then trigger the tenant rebalance job again with the updated context, with an attempt job ID + * <p> + * Notice that fundamentally this is not a retry but a resume, since we will not re-do the table rebalance for those + * tables that have already been processed. + */ +public class TenantRebalanceChecker extends BasePeriodicTask { + private final static String TASK_NAME = TenantRebalanceChecker.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(TenantRebalanceChecker.class); + private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; + private final TenantRebalancer _tenantRebalancer; + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public TenantRebalanceChecker(ControllerConf config, + ControllerMetrics controllerMetrics, + PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer tenantRebalancer) { + super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(), + config.getTenantRebalanceCheckerInitialDelayInSeconds()); + _pinotHelixResourceManager = pinotHelixResourceManager; + _tenantRebalancer = tenantRebalancer; + } + + @Override + protected void runTask(Properties periodicTaskProperties) { + checkAndRetryTenantRebalance(); + } + + private void checkAndRetryTenantRebalance() { + Map<String, Map<String, String>> allJobMetadataByJobId = + _pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE), x -> true); + for (Map.Entry<String, Map<String, String>> entry : allJobMetadataByJobId.entrySet()) { + String jobId = entry.getKey(); + Map<String, String> jobZKMetadata = entry.getValue(); + + try { + // Check if the tenant rebalance job is stuck + String tenantRebalanceContextStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + String tenantRebalanceProgressStatsStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + if (StringUtils.isEmpty(tenantRebalanceContextStr) || StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) { + // Skip rebalance job: {} as it has no job context or progress stats + LOGGER.info("Skip checking tenant rebalance job: {} as it has no job context or progress stats", jobId); + continue; + } + DefaultTenantRebalanceContext tenantRebalanceContext = + JsonUtils.stringToObject(tenantRebalanceContextStr, DefaultTenantRebalanceContext.class); + TenantRebalanceProgressStats progressStats = + JsonUtils.stringToObject(tenantRebalanceProgressStatsStr, TenantRebalanceProgressStats.class); + long statsUpdatedAt = Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); + + DefaultTenantRebalanceContext retryTenantRebalanceContext = + prepareRetryIfTenantRebalanceJobStuck(jobZKMetadata, tenantRebalanceContext, statsUpdatedAt); + if (retryTenantRebalanceContext != null) { + for (TenantRebalancer.TenantTableRebalanceJobContext ctx; + (ctx = retryTenantRebalanceContext.getOngoingJobsQueue().poll()) != null; ) { + abortTableRebalanceJob(ctx.getTableName()); + // the existing table rebalance job is aborted, we need to run the rebalance job with a new job ID. + TenantRebalancer.TenantTableRebalanceJobContext newCtx = + new TenantRebalancer.TenantTableRebalanceJobContext( + ctx.getTableName(), UUID.randomUUID().toString(), ctx.shouldRebalanceWithDowntime()); + retryTenantRebalanceContext.getParallelQueue().addFirst(newCtx); Review Comment: Why is this always added to the parallel queue? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalanceContext.java: ########## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentLinkedQueue; + + +/** + * Default implementation of TenantRebalanceContext that includes parallel and sequential queues + * for managing tenant rebalance operations. + */ +public class DefaultTenantRebalanceContext extends TenantRebalanceContext { + private final ConcurrentLinkedDeque<TenantRebalancer.TenantTableRebalanceJobContext> _parallelQueue; + private final Queue<TenantRebalancer.TenantTableRebalanceJobContext> _sequentialQueue; + private final ConcurrentLinkedQueue<TenantRebalancer.TenantTableRebalanceJobContext> _ongoingJobsQueue; Review Comment: Can you add some comments here explaining how `_parallelQueue` and `_ongoingJobsQueue` are being used and why they need to be concurrent / thread-safe data structures? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceContext.java: ########## @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.annotation.JsonProperty; + + +/** + * Abstract class for tracking job configs and attempt numbers as part of the job ZK metadata to retry failed tenant + * rebalance. + */ +public abstract class TenantRebalanceContext { + protected static final int INITIAL_ATTEMPT_ID = 1; + @JsonProperty("jobId") + private final String _jobId; + @JsonProperty("originalJobId") + private final String _originalJobId; + @JsonProperty("config") + private final TenantRebalanceConfig _config; + @JsonProperty("attemptId") + private final int _attemptId; + // Default to true for all user initiated rebalances, so that they can be retried if they fail or get stuck. + @JsonProperty("allowRetries") + private final boolean _allowRetries; Review Comment: Aren't tenant rebalances always user initiated? ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java: ########## @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Periodic task to check if tenant rebalance jobs are stuck and retry them. Controller crashes or restarts could + * make a tenant rebalance job stuck. + * The task checks for each tenant rebalance job's metadata in ZK, look at the TenantTableRebalanceJobContext in + * ongoingJobsQueue, which is a list with table rebalance job ids that the controller is currently processing, to see + * if any of these table rebalance jobs has not updated their progress stats longer than the configured heartbeat + * timeout. If so, the tenant rebalance job is considered stuck, and the task will resume the tenant rebalance job by + * aborting all the ongoing table rebalance jobs, move the ongoing TenantTableRebalanceJobContext back to the head of + * the parallel queue, and then trigger the tenant rebalance job again with the updated context, with an attempt job ID + * <p> + * Notice that fundamentally this is not a retry but a resume, since we will not re-do the table rebalance for those + * tables that have already been processed. + */ +public class TenantRebalanceChecker extends BasePeriodicTask { + private final static String TASK_NAME = TenantRebalanceChecker.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(TenantRebalanceChecker.class); + private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; + private final TenantRebalancer _tenantRebalancer; + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public TenantRebalanceChecker(ControllerConf config, + ControllerMetrics controllerMetrics, + PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer tenantRebalancer) { + super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(), + config.getTenantRebalanceCheckerInitialDelayInSeconds()); + _pinotHelixResourceManager = pinotHelixResourceManager; + _tenantRebalancer = tenantRebalancer; + } + + @Override + protected void runTask(Properties periodicTaskProperties) { + checkAndRetryTenantRebalance(); + } + + private void checkAndRetryTenantRebalance() { + Map<String, Map<String, String>> allJobMetadataByJobId = + _pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE), x -> true); + for (Map.Entry<String, Map<String, String>> entry : allJobMetadataByJobId.entrySet()) { + String jobId = entry.getKey(); + Map<String, String> jobZKMetadata = entry.getValue(); + + try { + // Check if the tenant rebalance job is stuck + String tenantRebalanceContextStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + String tenantRebalanceProgressStatsStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + if (StringUtils.isEmpty(tenantRebalanceContextStr) || StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) { + // Skip rebalance job: {} as it has no job context or progress stats + LOGGER.info("Skip checking tenant rebalance job: {} as it has no job context or progress stats", jobId); + continue; + } + DefaultTenantRebalanceContext tenantRebalanceContext = + JsonUtils.stringToObject(tenantRebalanceContextStr, DefaultTenantRebalanceContext.class); + TenantRebalanceProgressStats progressStats = + JsonUtils.stringToObject(tenantRebalanceProgressStatsStr, TenantRebalanceProgressStats.class); + long statsUpdatedAt = Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); + + DefaultTenantRebalanceContext retryTenantRebalanceContext = + prepareRetryIfTenantRebalanceJobStuck(jobZKMetadata, tenantRebalanceContext, statsUpdatedAt); + if (retryTenantRebalanceContext != null) { + for (TenantRebalancer.TenantTableRebalanceJobContext ctx; + (ctx = retryTenantRebalanceContext.getOngoingJobsQueue().poll()) != null; ) { + abortTableRebalanceJob(ctx.getTableName()); + // the existing table rebalance job is aborted, we need to run the rebalance job with a new job ID. + TenantRebalancer.TenantTableRebalanceJobContext newCtx = + new TenantRebalancer.TenantTableRebalanceJobContext( + ctx.getTableName(), UUID.randomUUID().toString(), ctx.shouldRebalanceWithDowntime()); + retryTenantRebalanceContext.getParallelQueue().addFirst(newCtx); + } + // the retry tenant rebalance job id has been created in ZK, we can safely mark the original job as + // aborted, so that this original job will not be picked up again in the future. + markTenantRebalanceJobAsAborted(jobId, jobZKMetadata, tenantRebalanceContext, progressStats); + retryTenantRebalanceJob(retryTenantRebalanceContext, progressStats); + } else { + LOGGER.info("Tenant rebalance job: {} is not stuck", jobId); + } + } catch (JsonProcessingException e) { + // If we cannot parse the job metadata, we skip this job + LOGGER.warn("Failed to parse tenant rebalance context for job: {}, skipping", jobId); + } + } + } + + private void retryTenantRebalanceJob(DefaultTenantRebalanceContext tenantRebalanceContextForRetry, + TenantRebalanceProgressStats progressStats) { + ZkBasedTenantRebalanceObserver observer = + new ZkBasedTenantRebalanceObserver(tenantRebalanceContextForRetry.getJobId(), + tenantRebalanceContextForRetry.getConfig().getTenantName(), + progressStats, tenantRebalanceContextForRetry, _pinotHelixResourceManager); + ((DefaultTenantRebalancer) _tenantRebalancer).rebalanceWithContext(tenantRebalanceContextForRetry, observer); + } + + /** + * Check if the tenant rebalance job is stuck, and prepare to retry it if necessary. + * A tenant rebalance job is considered stuck if: + * 1. There are no ongoing jobs, but there are jobs in the parallel or sequential queue, and the stats have not been + * updated for longer than the heartbeat timeout. + * 2. There are ongoing table rebalance jobs, and at least one of them has not updated its status for longer than the Review Comment: nit: either use `///` style for a markdown style comment (https://openjdk.org/jeps/467) or else use formatting tags like `<ol>`, `<li>`, `<p>` etc. ########## pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceChecker.java: ########## @@ -0,0 +1,307 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.helix.core.rebalance.tenant; + +import com.fasterxml.jackson.core.JsonProcessingException; +import java.util.ArrayList; +import java.util.Map; +import java.util.Objects; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; +import javax.ws.rs.core.Response; +import org.apache.commons.lang3.StringUtils; +import org.apache.pinot.common.metrics.ControllerMetrics; +import org.apache.pinot.controller.ControllerConf; +import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; +import org.apache.pinot.controller.helix.core.controllerjob.ControllerJobTypes; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceJobConstants; +import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; +import org.apache.pinot.controller.helix.core.rebalance.TableRebalanceProgressStats; +import org.apache.pinot.core.periodictask.BasePeriodicTask; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.JsonUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * Periodic task to check if tenant rebalance jobs are stuck and retry them. Controller crashes or restarts could + * make a tenant rebalance job stuck. + * The task checks for each tenant rebalance job's metadata in ZK, look at the TenantTableRebalanceJobContext in + * ongoingJobsQueue, which is a list with table rebalance job ids that the controller is currently processing, to see + * if any of these table rebalance jobs has not updated their progress stats longer than the configured heartbeat + * timeout. If so, the tenant rebalance job is considered stuck, and the task will resume the tenant rebalance job by + * aborting all the ongoing table rebalance jobs, move the ongoing TenantTableRebalanceJobContext back to the head of + * the parallel queue, and then trigger the tenant rebalance job again with the updated context, with an attempt job ID + * <p> + * Notice that fundamentally this is not a retry but a resume, since we will not re-do the table rebalance for those + * tables that have already been processed. + */ +public class TenantRebalanceChecker extends BasePeriodicTask { + private final static String TASK_NAME = TenantRebalanceChecker.class.getSimpleName(); + private static final Logger LOGGER = LoggerFactory.getLogger(TenantRebalanceChecker.class); + private static final double RETRY_DELAY_SCALE_FACTOR = 2.0; + private final TenantRebalancer _tenantRebalancer; + private final PinotHelixResourceManager _pinotHelixResourceManager; + + public TenantRebalanceChecker(ControllerConf config, + ControllerMetrics controllerMetrics, + PinotHelixResourceManager pinotHelixResourceManager, TenantRebalancer tenantRebalancer) { + super(TASK_NAME, config.getTenantRebalanceCheckerFrequencyInSeconds(), + config.getTenantRebalanceCheckerInitialDelayInSeconds()); + _pinotHelixResourceManager = pinotHelixResourceManager; + _tenantRebalancer = tenantRebalancer; + } + + @Override + protected void runTask(Properties periodicTaskProperties) { + checkAndRetryTenantRebalance(); + } + + private void checkAndRetryTenantRebalance() { + Map<String, Map<String, String>> allJobMetadataByJobId = + _pinotHelixResourceManager.getAllJobs(Set.of(ControllerJobTypes.TENANT_REBALANCE), x -> true); + for (Map.Entry<String, Map<String, String>> entry : allJobMetadataByJobId.entrySet()) { + String jobId = entry.getKey(); + Map<String, String> jobZKMetadata = entry.getValue(); + + try { + // Check if the tenant rebalance job is stuck + String tenantRebalanceContextStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_CONTEXT); + String tenantRebalanceProgressStatsStr = + jobZKMetadata.get(RebalanceJobConstants.JOB_METADATA_KEY_REBALANCE_PROGRESS_STATS); + if (StringUtils.isEmpty(tenantRebalanceContextStr) || StringUtils.isEmpty(tenantRebalanceProgressStatsStr)) { + // Skip rebalance job: {} as it has no job context or progress stats + LOGGER.info("Skip checking tenant rebalance job: {} as it has no job context or progress stats", jobId); + continue; + } + DefaultTenantRebalanceContext tenantRebalanceContext = + JsonUtils.stringToObject(tenantRebalanceContextStr, DefaultTenantRebalanceContext.class); + TenantRebalanceProgressStats progressStats = + JsonUtils.stringToObject(tenantRebalanceProgressStatsStr, TenantRebalanceProgressStats.class); + long statsUpdatedAt = Long.parseLong(jobZKMetadata.get(CommonConstants.ControllerJob.SUBMISSION_TIME_MS)); + + DefaultTenantRebalanceContext retryTenantRebalanceContext = + prepareRetryIfTenantRebalanceJobStuck(jobZKMetadata, tenantRebalanceContext, statsUpdatedAt); + if (retryTenantRebalanceContext != null) { + for (TenantRebalancer.TenantTableRebalanceJobContext ctx; + (ctx = retryTenantRebalanceContext.getOngoingJobsQueue().poll()) != null; ) { Review Comment: I think a while loop would be more readable here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
