Gargi-jais11 commented on code in PR #8932: URL: https://github.com/apache/ozone/pull/8932#discussion_r2300575271
########## hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java: ########## @@ -0,0 +1,767 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone.container.diskbalancer; + +import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException; +import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; +import org.apache.hadoop.hdds.server.ServerUtils; +import org.apache.hadoop.hdds.utils.BackgroundService; +import org.apache.hadoop.hdds.utils.BackgroundTask; +import org.apache.hadoop.hdds.utils.BackgroundTaskQueue; +import org.apache.hadoop.hdds.utils.BackgroundTaskResult; +import org.apache.hadoop.hdds.utils.FaultInjector; +import org.apache.hadoop.ozone.OzoneConsts; +import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; +import org.apache.hadoop.ozone.container.common.impl.ContainerData; +import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml; +import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.utils.ContainerLogger; +import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil; +import org.apache.hadoop.ozone.container.common.volume.HddsVolume; +import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet; +import org.apache.hadoop.ozone.container.common.volume.StorageVolume; +import org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory; +import org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy; +import org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy; +import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil; +import org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; +import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A per-datanode disk balancing service takes in charge + * of moving contains among disks. + */ +public class DiskBalancerService extends BackgroundService { + + private static final Logger LOG = + LoggerFactory.getLogger(DiskBalancerService.class); + + public static final String DISK_BALANCER_DIR = "diskBalancer"; + + private OzoneContainer ozoneContainer; + private final ConfigurationSource conf; + + private double threshold; + private long bandwidthInMB; + private int parallelThread; + private boolean stopAfterDiskEven; + private DiskBalancerVersion version; + + // State field using the new enum + private volatile DiskBalancerOperationalState operationalState = + DiskBalancerOperationalState.STOPPED; + + private AtomicLong totalBalancedBytes = new AtomicLong(0L); + private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L); + private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow()); + + private Set<ContainerID> inProgressContainers; + private static FaultInjector injector; + + /** + * A map that tracks the total bytes which will be freed from each source volume + * during container moves in the current disk balancing cycle. + * + * Unlike committedBytes, which is used for pre-allocating space on + * destination volumes, deltaSizes helps track how many space will be + * freed on the source volumes without modifying their + * committedBytes (which could otherwise go negative). + */ + private Map<HddsVolume, Long> deltaSizes; + private MutableVolumeSet volumeSet; + + private DiskBalancerVolumeChoosingPolicy volumeChoosingPolicy; + private ContainerChoosingPolicy containerChoosingPolicy; + private final File diskBalancerInfoFile; + + private DiskBalancerServiceMetrics metrics; + private long bytesToMove; + private long containerDefaultSize; + + /** + * Defines the operational states of the DiskBalancerService. + */ + public enum DiskBalancerOperationalState { + /** + * DiskBalancer is stopped and will not run unless explicitly started. + * This is the initial state, can be set by admin STOP commands, + * or if the balancer stops itself after disks are even. + */ + STOPPED, + + /** + * DiskBalancer is running normally. + * The service is actively performing disk balancing operations. + */ + RUNNING, + + /** + * DiskBalancer was running but is temporarily paused due to node state changes + * (e.g., node entering maintenance or decommissioning). + * When the node returns to IN_SERVICE, it can resume to RUNNING state. + */ + PAUSED_BY_NODE_STATE + } + + public DiskBalancerService(OzoneContainer ozoneContainer, + long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit, + int workerSize, ConfigurationSource conf) throws IOException { + super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize, + serviceCheckTimeout); + this.ozoneContainer = ozoneContainer; + this.conf = conf; + + String diskBalancerInfoPath = getDiskBalancerInfoPath(); + Objects.requireNonNull(diskBalancerInfoPath); + diskBalancerInfoFile = new File(diskBalancerInfoPath); + + inProgressContainers = ConcurrentHashMap.newKeySet(); + deltaSizes = new ConcurrentHashMap<>(); + volumeSet = ozoneContainer.getVolumeSet(); + containerDefaultSize = (long) conf.getStorageSize( + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + + try { + volumeChoosingPolicy = VolumeChoosingPolicyFactory.getDiskBalancerPolicy(conf); + containerChoosingPolicy = (ContainerChoosingPolicy) + conf.getObject(DiskBalancerConfiguration.class) + .getContainerChoosingPolicyClass().newInstance(); + } catch (Exception e) { + LOG.error("Got exception when initializing DiskBalancerService", e); + throw new IOException(e); + } + + metrics = DiskBalancerServiceMetrics.create(); + + loadDiskBalancerInfo(); + + constructTmpDir(); + } + + /** + * Update DiskBalancerService based on new DiskBalancerInfo. + * @param diskBalancerInfo + * @throws IOException + */ + public synchronized void refresh(DiskBalancerInfo diskBalancerInfo) throws IOException { + applyDiskBalancerInfo(diskBalancerInfo); + } + + private void constructTmpDir() throws IOException { + for (HddsVolume volume: + StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) { + Path tmpDir = getDiskBalancerTmpDir(volume); + try { + FileUtils.deleteDirectory(tmpDir.toFile()); + FileUtils.forceMkdir(tmpDir.toFile()); + } catch (IOException ex) { + LOG.warn("Can not reconstruct tmp directory under volume {}", volume, + ex); + throw ex; + } + } + } + + /** + * If the diskBalancer.info file exists, load the file. If not exists, + * return the default config. + * @throws IOException + */ + private void loadDiskBalancerInfo() throws IOException { + DiskBalancerInfo diskBalancerInfo; + try { + if (diskBalancerInfoFile.exists()) { + diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile); + } else { + boolean shouldRunDefault = + conf.getObject(DiskBalancerConfiguration.class) + .getDiskBalancerShouldRun(); + diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault, + new DiskBalancerConfiguration()); + } + } catch (IOException e) { + LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " + + "Falling back to default configs", e); + throw e; + } + + applyDiskBalancerInfo(diskBalancerInfo); + } + + private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo) + throws IOException { + // First store in local file, then update in memory variables + writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile); + + updateOperationalStateFromInfo(diskBalancerInfo); + + setThreshold(diskBalancerInfo.getThreshold()); + setBandwidthInMB(diskBalancerInfo.getBandwidthInMB()); + setParallelThread(diskBalancerInfo.getParallelThread()); + setStopAfterDiskEven(diskBalancerInfo.isStopAfterDiskEven()); + setVersion(diskBalancerInfo.getVersion()); + + // Default executorService is ScheduledThreadPoolExecutor, so we can + // update the poll size by setting corePoolSize. + if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) { + ((ScheduledThreadPoolExecutor) getExecutorService()) + .setCorePoolSize(parallelThread); + } + } + + /** + * Determines the new operational state based on the provided DiskBalancerInfo + * and updates the service's operationalState if it has changed. + * + * @param diskBalancerInfo The DiskBalancerInfo containing shouldRun and paused flags. + */ + private void updateOperationalStateFromInfo(DiskBalancerInfo diskBalancerInfo) { + DiskBalancerOperationalState newOperationalState = diskBalancerInfo.getOperationalState(); + + if (this.operationalState != newOperationalState) { + LOG.info("DiskBalancer operational state changing from {} to {} " + + "based on DiskBalancerInfo (derived: shouldRun={}, paused={}).", + this.operationalState, newOperationalState, + diskBalancerInfo.isShouldRun(), diskBalancerInfo.isPaused()); + this.operationalState = newOperationalState; + } + } + + private String getDiskBalancerInfoPath() { + String diskBalancerInfoDir = + conf.getObject(DiskBalancerConfiguration.class) + .getDiskBalancerInfoDir(); + if (Strings.isNullOrEmpty(diskBalancerInfoDir)) { + File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf); + if (metaDirPath == null) { + // this means meta data is not found, in theory should not happen at + // this point because should've failed earlier. + throw new IllegalArgumentException("Unable to locate meta data" + + "directory when getting datanode disk balancer file path"); + } + diskBalancerInfoDir = metaDirPath.toString(); + } + // Use default datanode disk balancer file name for file path + return new File(diskBalancerInfoDir, + OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT) + .toString(); + } + + /** + * Read {@link DiskBalancerInfo} from a local info file. + * + * @param path DiskBalancerInfo file local path + * @return {@link DatanodeDetails} + * @throws IOException If the conf file is malformed or other I/O exceptions + */ + private synchronized DiskBalancerInfo readDiskBalancerInfoFile( + File path) throws IOException { + if (!path.exists()) { + throw new IOException("DiskBalancerConf file not found."); + } + try { + return DiskBalancerYaml.readDiskBalancerInfoFile(path); + } catch (IOException e) { + LOG.warn("Error loading DiskBalancerInfo yaml from {}", + path.getAbsolutePath(), e); + throw new IOException("Failed to parse DiskBalancerInfo from " + + path.getAbsolutePath(), e); + } + } + + /** + * Persistent a {@link DiskBalancerInfo} to a local file. + * + * @throws IOException when read/write error occurs + */ + private synchronized void writeDiskBalancerInfoTo( + DiskBalancerInfo diskBalancerInfo, File path) + throws IOException { + if (path.exists()) { + if (!path.delete() || !path.createNewFile()) { + throw new IOException("Unable to overwrite the DiskBalancerInfo file."); + } + } else { + if (!path.getParentFile().exists() && + !path.getParentFile().mkdirs()) { + throw new IOException("Unable to create DiskBalancerInfo directories."); + } + } + DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path); + } + + public void setThreshold(double threshold) { + this.threshold = threshold; + } + + public void setBandwidthInMB(long bandwidthInMB) { + this.bandwidthInMB = bandwidthInMB; + } + + public void setParallelThread(int parallelThread) { + this.parallelThread = parallelThread; + } + + public void setStopAfterDiskEven(boolean stopAfterDiskEven) { + this.stopAfterDiskEven = stopAfterDiskEven; + } + + public void setVersion(DiskBalancerVersion version) { + this.version = version; + } + + public DiskBalancerReportProto getDiskBalancerReportProto() { + DiskBalancerReportProto.Builder builder = + DiskBalancerReportProto.newBuilder(); + return builder.setIsRunning(this.operationalState == DiskBalancerOperationalState.RUNNING) + .setBalancedBytes(totalBalancedBytes.get()) + .setDiskBalancerConf( + HddsProtos.DiskBalancerConfigurationProto.newBuilder() + .setThreshold(threshold) + .setDiskBandwidthInMB(bandwidthInMB) + .setParallelThread(parallelThread) + .setStopAfterDiskEven(stopAfterDiskEven) + .build()) + .build(); + } + + @Override + public BackgroundTaskQueue getTasks() { + BackgroundTaskQueue queue = new BackgroundTaskQueue(); + + if (this.operationalState == DiskBalancerOperationalState.STOPPED || + this.operationalState == DiskBalancerOperationalState.PAUSED_BY_NODE_STATE) { + return queue; + } + metrics.incrRunningLoopCount(); + + if (shouldDelay()) { + metrics.incrIdleLoopExceedsBandwidthCount(); + return queue; + } + + int availableTaskCount = parallelThread - inProgressContainers.size(); + if (availableTaskCount <= 0) { + LOG.info("No available thread for disk balancer service. " + + "Current thread count is {}.", parallelThread); + return queue; + } + + for (int i = 0; i < availableTaskCount; i++) { + Pair<HddsVolume, HddsVolume> pair = volumeChoosingPolicy + .chooseVolume(volumeSet, threshold, deltaSizes, containerDefaultSize); + if (pair == null) { + continue; + } + HddsVolume sourceVolume = pair.getLeft(), destVolume = pair.getRight(); + ContainerData toBalanceContainer = containerChoosingPolicy + .chooseContainer(ozoneContainer, sourceVolume, inProgressContainers); + if (toBalanceContainer != null) { + DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer, sourceVolume, + destVolume); + queue.add(task); + inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID())); + deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L) + - toBalanceContainer.getBytesUsed()); + } else { + // release destVolume committed bytes + destVolume.incCommittedBytes(0 - containerDefaultSize); + } + } + + if (queue.isEmpty()) { + bytesToMove = 0; + if (stopAfterDiskEven) { + LOG.info("Disk balancer is stopped due to disk even as" + + " the property StopAfterDiskEven is set to true."); + this.operationalState = DiskBalancerOperationalState.STOPPED; + try { + // Persist the updated shouldRun status into the YAML file + writeDiskBalancerInfoTo(getDiskBalancerInfo(), diskBalancerInfoFile); + } catch (IOException e) { + LOG.warn("Failed to persist updated DiskBalancerInfo to file.", e); + } + } + metrics.incrIdleLoopNoAvailableVolumePairCount(); + } else { + bytesToMove = calculateBytesToMove(volumeSet); Review Comment: Yes correct. This is being implemented through this ticket https://issues.apache.org/jira/browse/HDDS-13611 . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
