sumitagrawl commented on code in PR #8932:
URL: https://github.com/apache/ozone/pull/8932#discussion_r2298488969


##########
hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/storage/DiskBalancerConfiguration.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.storage;
+
+import static org.apache.hadoop.hdds.conf.ConfigTag.DATANODE;
+
+import jakarta.annotation.Nonnull;
+import java.time.Duration;
+import org.apache.hadoop.hdds.conf.Config;
+import org.apache.hadoop.hdds.conf.ConfigGroup;
+import org.apache.hadoop.hdds.conf.ConfigTag;
+import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class contains configuration values for the DiskBalancer.
+ */
+@ConfigGroup(prefix = "hdds.datanode.disk.balancer")
+public final class DiskBalancerConfiguration {
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerConfiguration.class);
+
+  @Config(key = "info.dir", type = ConfigType.STRING,
+      defaultValue = "", tags = {ConfigTag.DISKBALANCER},
+      description = "The path where datanode diskBalancer's conf is to be " +
+          "written to. if this property is not defined, ozone will fall " +
+          "back to use metadata directory instead.")
+  private String infoDir;
+
+  @Config(key = "volume.density.threshold", type = ConfigType.DOUBLE,
+      defaultValue = "10", tags = {ConfigTag.DISKBALANCER},
+      description = "Threshold is a percentage in the range of 0 to 100. A " +
+          "datanode is considered balanced if for each volume, the " +
+          "utilization of the volume(used space to capacity ratio) differs" +
+          " from the utilization of the datanode(used space to capacity ratio" 
+
+          " of the entire datanode) no more than the threshold.")
+  private double threshold = 10d;
+
+  @Config(key = "max.disk.throughputInMBPerSec", type = ConfigType.LONG,

Review Comment:
   config can be named as, max.disk.throughput.mb.per.sec



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerVersion.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+
+/**
+ * Defines versions for the DiskBalancerService.
+ */
+public enum DiskBalancerVersion {
+  ONE(1, "First Version") {

Review Comment:
   How do this versioning helpful? already we have HDDSVersioning. Its just set 
of config.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer.policy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.fs.SpaceUsageSource;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Choose a random volume for disk balancing.
+ *
+ * Source volumes use deltaMap to simulate space that will be freed 
(pre-deleted).
+ * Destination volumes use committedBytes to account for space already 
reserved.
+ * Both deltaMap and committedBytes are considered to calculate usage.
+ */
+public class DefaultVolumeChoosingPolicy implements 
DiskBalancerVolumeChoosingPolicy {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      DefaultVolumeChoosingPolicy.class);
+  private final ReentrantLock lock;
+
+  public DefaultVolumeChoosingPolicy(ReentrantLock globalLock) {
+    lock = globalLock;
+  }
+
+  @Override
+  public Pair<HddsVolume, HddsVolume> chooseVolume(MutableVolumeSet volumeSet,
+      double threshold, Map<HddsVolume, Long> deltaMap, long containerSize) {
+    lock.lock();
+    try {
+      double idealUsage = volumeSet.getIdealUsage();
+
+      // Threshold is given as a percentage
+      double normalizedThreshold = threshold / 100;
+      List<HddsVolume> volumes = StorageVolumeUtil
+          .getHddsVolumesList(volumeSet.getVolumesList())
+          .stream()
+          .filter(volume -> {
+            SpaceUsageSource usage = volume.getCurrentUsage();
+
+            return Math.abs(
+                  ((double)((usage.getCapacity() - usage.getAvailable())
+                      + deltaMap.getOrDefault(volume, 0L) + 
volume.getCommittedBytes()))
+                      / usage.getCapacity() - idealUsage) >= 
normalizedThreshold;
+
+          }).sorted((v1, v2) -> {
+            SpaceUsageSource usage1 = v1.getCurrentUsage();
+            SpaceUsageSource usage2 = v2.getCurrentUsage();
+
+            return Double.compare(
+                  (double) ((usage2.getCapacity() - usage2.getAvailable())
+                      + deltaMap.getOrDefault(v2, 0L) + 
v2.getCommittedBytes()) /
+                      usage2.getCapacity(),
+                  (double) ((usage1.getCapacity() - usage1.getAvailable())
+                      + deltaMap.getOrDefault(v1, 0L) + 
v1.getCommittedBytes()) /
+                      usage1.getCapacity());
+          }).collect(Collectors.toList());
+
+      // Can not generate DiskBalancerTask if we have less than 2 results
+      if (volumes.size() <= 1) {
+        LOG.debug("Can not find appropriate Source volume and Dest Volume.");
+        return null;
+      }
+      AvailableSpaceFilter filter = new AvailableSpaceFilter(containerSize);
+      HddsVolume srcVolume = volumes.get(0);
+      HddsVolume destVolume = volumes.get(volumes.size() - 1);

Review Comment:
   This is sorted based on available space. If last volume space is not 
available, not need to check in while loop for previous volume.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java:
##########
@@ -897,6 +901,84 @@ public DataScanResult scanData(DataTransferThrottler 
throttler, Canceler cancele
     return checker.fullCheck(throttler, canceler);
   }
 
+  @Override
+  public void copyContainerDirectory(Path destination) throws IOException {
+    readLock();
+    try {
+      // Closed/ Quasi closed containers are considered for replication by
+      // replication manager if they are under-replicated.
+      ContainerProtos.ContainerDataProto.State state =
+          getContainerData().getState();
+      if (!(state == ContainerProtos.ContainerDataProto.State.CLOSED ||
+          state == ContainerProtos.ContainerDataProto.State.QUASI_CLOSED)) {
+        throw new IllegalStateException(
+            "Only (quasi)closed containers can be exported, but " +
+                "ContainerId=" + getContainerData().getContainerID() +
+                " is in state " + state);
+      }
+
+      if (!containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+        compactDB();
+        // Close DB (and remove from cache) to avoid concurrent modification
+        // while copying it.
+        BlockUtils.removeDB(containerData, config);
+      }
+
+      if (containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+        // Synchronize the dump and copy operation,
+        // so concurrent copy don't get dump files overwritten.
+        // We seldom got concurrent exports for a container,
+        // so it should not influence performance much.
+        synchronized (dumpLock) {
+          BlockUtils.dumpKVContainerDataToFiles(containerData, config);
+          copyContainerToDestination(destination);
+        }
+      } else {
+        copyContainerToDestination(destination);
+      }
+      if (getInjector() != null && getInjector().getException() != null) {

Review Comment:
   injector here at this point has no meaning, it can be implemented with spy, 
after calling actual method, can throw exception, which this method is 
overloaded.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueContainer.java:
##########
@@ -897,6 +901,84 @@ public DataScanResult scanData(DataTransferThrottler 
throttler, Canceler cancele
     return checker.fullCheck(throttler, canceler);
   }
 
+  @Override
+  public void copyContainerDirectory(Path destination) throws IOException {
+    readLock();
+    try {
+      // Closed/ Quasi closed containers are considered for replication by
+      // replication manager if they are under-replicated.
+      ContainerProtos.ContainerDataProto.State state =
+          getContainerData().getState();
+      if (!(state == ContainerProtos.ContainerDataProto.State.CLOSED ||
+          state == ContainerProtos.ContainerDataProto.State.QUASI_CLOSED)) {
+        throw new IllegalStateException(
+            "Only (quasi)closed containers can be exported, but " +
+                "ContainerId=" + getContainerData().getContainerID() +
+                " is in state " + state);
+      }
+
+      if (!containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+        compactDB();
+        // Close DB (and remove from cache) to avoid concurrent modification
+        // while copying it.
+        BlockUtils.removeDB(containerData, config);
+      }
+
+      if (containerData.getSchemaVersion().equals(OzoneConsts.SCHEMA_V3)) {
+        // Synchronize the dump and copy operation,
+        // so concurrent copy don't get dump files overwritten.
+        // We seldom got concurrent exports for a container,
+        // so it should not influence performance much.
+        synchronized (dumpLock) {
+          BlockUtils.dumpKVContainerDataToFiles(containerData, config);
+          copyContainerToDestination(destination);
+        }
+      } else {
+        copyContainerToDestination(destination);

Review Comment:
   do need have write lock for copy to destination? if multiple thread does 
same thing.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultContainerChoosingPolicy.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer.policy;
+
+import static java.util.concurrent.TimeUnit.HOURS;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.cache.Cache;
+import com.google.common.cache.CacheBuilder;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Choose a container from specified volume, make sure it's not being 
balancing.
+ */
+public class DefaultContainerChoosingPolicy implements ContainerChoosingPolicy 
{
+  public static final Logger LOG = LoggerFactory.getLogger(
+      DefaultContainerChoosingPolicy.class);
+
+  private static final ThreadLocal<Cache<HddsVolume, Iterator<Container<?>>>> 
CACHE =

Review Comment:
   Can we avoid ThreadLocal? better to use instance or service level arguments 
/ context.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerYaml.java:
##########
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.file.Files;
+import org.apache.hadoop.hdds.server.YamlUtils;
+import 
org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerService.DiskBalancerOperationalState;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.DumperOptions;
+import org.yaml.snakeyaml.Yaml;
+
+/**
+ * Class for creating diskbalancer.info file in yaml format.
+ */
+
+public final class DiskBalancerYaml {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerYaml.class);
+
+  private DiskBalancerYaml() {
+    // static helper methods only, no state.
+  }
+
+  /**
+   * Creates a yaml file to store DiskBalancer info.
+   *
+   * @param diskBalancerInfo {@link DiskBalancerInfo}
+   * @param path            Path to diskBalancer.info file
+   */
+  public static void createDiskBalancerInfoFile(
+      DiskBalancerInfo diskBalancerInfo, File path)
+      throws IOException {
+    DumperOptions options = new DumperOptions();

Review Comment:
   We can use DB to save disk balancer config, instead of local file.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java:
##########
@@ -1338,30 +1338,30 @@ private boolean checkContainerClose(KeyValueContainer 
kvContainer) {
   @Override
   public Container importContainer(ContainerData originalContainerData,
       final InputStream rawContainerStream,
-      final TarContainerPacker packer)
-      throws IOException {
-    Preconditions.checkState(originalContainerData instanceof
-        KeyValueContainerData, "Should be KeyValueContainerData instance");
-
-    KeyValueContainerData containerData = new KeyValueContainerData(
-        (KeyValueContainerData) originalContainerData);
-
-    KeyValueContainer container = new KeyValueContainer(containerData,
-        conf);
+      final TarContainerPacker packer) throws IOException {
+    KeyValueContainer container = createNewContainer(originalContainerData);
 
     HddsVolume targetVolume = originalContainerData.getVolume();
     populateContainerPathFields(container, targetVolume);
     container.importContainerData(rawContainerStream, packer);
-    ContainerLogger.logImported(containerData);
+    ContainerLogger.logImported(container.getContainerData());
     sendICR(container);
     return container;
+  }
 
+  @Override
+  public Container importContainer(ContainerData targetTempContainerData) 
throws IOException {
+    KeyValueContainer container = createNewContainer(targetTempContainerData);
+    HddsVolume targetVolume = targetTempContainerData.getVolume();
+    populateContainerPathFields(container, targetVolume);
+    container.importContainerData((KeyValueContainerData) 
targetTempContainerData);

Review Comment:
   how about ContainerLogger and send ICR compared to above implementation? why 
not required.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/policy/DefaultVolumeChoosingPolicy.java:
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer.policy;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.fs.SpaceUsageSource;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.AvailableSpaceFilter;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Choose a random volume for disk balancing.
+ *
+ * Source volumes use deltaMap to simulate space that will be freed 
(pre-deleted).
+ * Destination volumes use committedBytes to account for space already 
reserved.
+ * Both deltaMap and committedBytes are considered to calculate usage.
+ */
+public class DefaultVolumeChoosingPolicy implements 
DiskBalancerVolumeChoosingPolicy {
+
+  public static final Logger LOG = LoggerFactory.getLogger(
+      DefaultVolumeChoosingPolicy.class);
+  private final ReentrantLock lock;
+
+  public DefaultVolumeChoosingPolicy(ReentrantLock globalLock) {
+    lock = globalLock;
+  }
+
+  @Override
+  public Pair<HddsVolume, HddsVolume> chooseVolume(MutableVolumeSet volumeSet,
+      double threshold, Map<HddsVolume, Long> deltaMap, long containerSize) {
+    lock.lock();
+    try {
+      double idealUsage = volumeSet.getIdealUsage();
+
+      // Threshold is given as a percentage
+      double normalizedThreshold = threshold / 100;
+      List<HddsVolume> volumes = StorageVolumeUtil

Review Comment:
   The algorithm can be simplified as we need just get Min and Max utilized 
space to get source and destination.



##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerService.java:
##########
@@ -0,0 +1,767 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.diskbalancer;
+
+import static 
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.CONTAINER_ALREADY_EXISTS;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.conf.StorageUnit;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import 
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
+import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
+import org.apache.hadoop.hdds.server.ServerUtils;
+import org.apache.hadoop.hdds.utils.BackgroundService;
+import org.apache.hadoop.hdds.utils.BackgroundTask;
+import org.apache.hadoop.hdds.utils.BackgroundTaskQueue;
+import org.apache.hadoop.hdds.utils.BackgroundTaskResult;
+import org.apache.hadoop.hdds.utils.FaultInjector;
+import org.apache.hadoop.ozone.OzoneConsts;
+import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
+import org.apache.hadoop.ozone.container.common.impl.ContainerData;
+import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
+import org.apache.hadoop.ozone.container.common.interfaces.Container;
+import org.apache.hadoop.ozone.container.common.utils.ContainerLogger;
+import org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil;
+import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
+import org.apache.hadoop.ozone.container.common.volume.MutableVolumeSet;
+import org.apache.hadoop.ozone.container.common.volume.StorageVolume;
+import 
org.apache.hadoop.ozone.container.common.volume.VolumeChoosingPolicyFactory;
+import 
org.apache.hadoop.ozone.container.diskbalancer.policy.ContainerChoosingPolicy;
+import 
org.apache.hadoop.ozone.container.diskbalancer.policy.DiskBalancerVolumeChoosingPolicy;
+import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerLocationUtil;
+import 
org.apache.hadoop.ozone.container.keyvalue.helpers.KeyValueContainerUtil;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A per-datanode disk balancing service takes in charge
+ * of moving contains among disks.
+ */
+public class DiskBalancerService extends BackgroundService {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(DiskBalancerService.class);
+
+  public static final String DISK_BALANCER_DIR = "diskBalancer";
+
+  private OzoneContainer ozoneContainer;
+  private final ConfigurationSource conf;
+
+  private double threshold;
+  private long bandwidthInMB;
+  private int parallelThread;
+  private boolean stopAfterDiskEven;
+  private DiskBalancerVersion version;
+
+  // State field using the new enum
+  private volatile DiskBalancerOperationalState operationalState =
+      DiskBalancerOperationalState.STOPPED;
+
+  private AtomicLong totalBalancedBytes = new AtomicLong(0L);
+  private AtomicLong balancedBytesInLastWindow = new AtomicLong(0L);
+  private AtomicLong nextAvailableTime = new AtomicLong(Time.monotonicNow());
+
+  private Set<ContainerID> inProgressContainers;
+  private static FaultInjector injector;
+
+  /**
+   * A map that tracks the total bytes which will be freed from each source 
volume
+   * during container moves in the current disk balancing cycle.
+   *
+   * Unlike committedBytes, which is used for pre-allocating space on
+   * destination volumes, deltaSizes helps track how many space will be
+   * freed on the source volumes without modifying their
+   * committedBytes (which could otherwise go negative).
+   */
+  private Map<HddsVolume, Long> deltaSizes;
+  private MutableVolumeSet volumeSet;
+
+  private DiskBalancerVolumeChoosingPolicy volumeChoosingPolicy;
+  private ContainerChoosingPolicy containerChoosingPolicy;
+  private final File diskBalancerInfoFile;
+
+  private DiskBalancerServiceMetrics metrics;
+  private long bytesToMove;
+  private long containerDefaultSize;
+
+  /**
+   * Defines the operational states of the DiskBalancerService.
+   */
+  public enum DiskBalancerOperationalState {
+    /**
+     * DiskBalancer is stopped and will not run unless explicitly started.
+     * This is the initial state, can be set by admin STOP commands,
+     * or if the balancer stops itself after disks are even.
+     */
+    STOPPED,
+
+    /**
+     * DiskBalancer is running normally.
+     * The service is actively performing disk balancing operations.
+     */
+    RUNNING,
+
+    /**
+     * DiskBalancer was running but is temporarily paused due to node state 
changes
+     * (e.g., node entering maintenance or decommissioning).
+     * When the node returns to IN_SERVICE, it can resume to RUNNING state.
+     */
+    PAUSED_BY_NODE_STATE
+  }
+
+  public DiskBalancerService(OzoneContainer ozoneContainer,
+      long serviceCheckInterval, long serviceCheckTimeout, TimeUnit timeUnit,
+      int workerSize, ConfigurationSource conf) throws IOException {
+    super("DiskBalancerService", serviceCheckInterval, timeUnit, workerSize,
+        serviceCheckTimeout);
+    this.ozoneContainer = ozoneContainer;
+    this.conf = conf;
+
+    String diskBalancerInfoPath = getDiskBalancerInfoPath();
+    Objects.requireNonNull(diskBalancerInfoPath);
+    diskBalancerInfoFile = new File(diskBalancerInfoPath);
+
+    inProgressContainers = ConcurrentHashMap.newKeySet();
+    deltaSizes = new ConcurrentHashMap<>();
+    volumeSet = ozoneContainer.getVolumeSet();
+    containerDefaultSize = (long) conf.getStorageSize(
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
+        ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES);
+
+    try {
+      volumeChoosingPolicy = 
VolumeChoosingPolicyFactory.getDiskBalancerPolicy(conf);
+      containerChoosingPolicy = (ContainerChoosingPolicy)
+          conf.getObject(DiskBalancerConfiguration.class)
+              .getContainerChoosingPolicyClass().newInstance();
+    } catch (Exception e) {
+      LOG.error("Got exception when initializing DiskBalancerService", e);
+      throw new IOException(e);
+    }
+
+    metrics = DiskBalancerServiceMetrics.create();
+
+    loadDiskBalancerInfo();
+
+    constructTmpDir();
+  }
+
+  /**
+   * Update DiskBalancerService based on new DiskBalancerInfo.
+   * @param diskBalancerInfo
+   * @throws IOException
+   */
+  public synchronized void refresh(DiskBalancerInfo diskBalancerInfo) throws 
IOException {
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void constructTmpDir() throws IOException {
+    for (HddsVolume volume:
+        StorageVolumeUtil.getHddsVolumesList(volumeSet.getVolumesList())) {
+      Path tmpDir = getDiskBalancerTmpDir(volume);
+      try {
+        FileUtils.deleteDirectory(tmpDir.toFile());
+        FileUtils.forceMkdir(tmpDir.toFile());
+      } catch (IOException ex) {
+        LOG.warn("Can not reconstruct tmp directory under volume {}", volume,
+            ex);
+        throw ex;
+      }
+    }
+  }
+
+  /**
+   * If the diskBalancer.info file exists, load the file. If not exists,
+   * return the default config.
+   * @throws IOException
+   */
+  private void loadDiskBalancerInfo() throws IOException {
+    DiskBalancerInfo diskBalancerInfo;
+    try {
+      if (diskBalancerInfoFile.exists()) {
+        diskBalancerInfo = readDiskBalancerInfoFile(diskBalancerInfoFile);
+      } else {
+        boolean shouldRunDefault =
+            conf.getObject(DiskBalancerConfiguration.class)
+                .getDiskBalancerShouldRun();
+        diskBalancerInfo = new DiskBalancerInfo(shouldRunDefault,
+            new DiskBalancerConfiguration());
+      }
+    } catch (IOException e) {
+      LOG.warn("Can not load diskBalancerInfo from diskBalancer.info file. " +
+          "Falling back to default configs", e);
+      throw e;
+    }
+
+    applyDiskBalancerInfo(diskBalancerInfo);
+  }
+
+  private void applyDiskBalancerInfo(DiskBalancerInfo diskBalancerInfo)
+      throws IOException {
+    // First store in local file, then update in memory variables
+    writeDiskBalancerInfoTo(diskBalancerInfo, diskBalancerInfoFile);
+
+    updateOperationalStateFromInfo(diskBalancerInfo);
+
+    setThreshold(diskBalancerInfo.getThreshold());
+    setBandwidthInMB(diskBalancerInfo.getBandwidthInMB());
+    setParallelThread(diskBalancerInfo.getParallelThread());
+    setStopAfterDiskEven(diskBalancerInfo.isStopAfterDiskEven());
+    setVersion(diskBalancerInfo.getVersion());
+
+    // Default executorService is ScheduledThreadPoolExecutor, so we can
+    // update the poll size by setting corePoolSize.
+    if ((getExecutorService() instanceof ScheduledThreadPoolExecutor)) {
+      ((ScheduledThreadPoolExecutor) getExecutorService())
+          .setCorePoolSize(parallelThread);
+    }
+  }
+
+  /**
+   * Determines the new operational state based on the provided 
DiskBalancerInfo
+   * and updates the service's operationalState if it has changed.
+   *
+   * @param diskBalancerInfo The DiskBalancerInfo containing shouldRun and 
paused flags.
+   */
+  private void updateOperationalStateFromInfo(DiskBalancerInfo 
diskBalancerInfo) {
+    DiskBalancerOperationalState newOperationalState = 
diskBalancerInfo.getOperationalState();
+
+    if (this.operationalState != newOperationalState) {
+      LOG.info("DiskBalancer operational state changing from {} to {} " +
+              "based on DiskBalancerInfo (derived: shouldRun={}, paused={}).",
+          this.operationalState, newOperationalState,
+          diskBalancerInfo.isShouldRun(), diskBalancerInfo.isPaused());
+      this.operationalState = newOperationalState;
+    }
+  }
+
+  private String getDiskBalancerInfoPath() {
+    String diskBalancerInfoDir =
+        conf.getObject(DiskBalancerConfiguration.class)
+            .getDiskBalancerInfoDir();
+    if (Strings.isNullOrEmpty(diskBalancerInfoDir)) {
+      File metaDirPath = ServerUtils.getOzoneMetaDirPath(conf);
+      if (metaDirPath == null) {
+        // this means meta data is not found, in theory should not happen at
+        // this point because should've failed earlier.
+        throw new IllegalArgumentException("Unable to locate meta data" +
+            "directory when getting datanode disk balancer file path");
+      }
+      diskBalancerInfoDir = metaDirPath.toString();
+    }
+    // Use default datanode disk balancer file name for file path
+    return new File(diskBalancerInfoDir,
+        OzoneConsts.OZONE_SCM_DATANODE_DISK_BALANCER_INFO_FILE_DEFAULT)
+        .toString();
+  }
+
+  /**
+   * Read {@link DiskBalancerInfo} from a local info file.
+   *
+   * @param path DiskBalancerInfo file local path
+   * @return {@link DatanodeDetails}
+   * @throws IOException If the conf file is malformed or other I/O exceptions
+   */
+  private synchronized DiskBalancerInfo readDiskBalancerInfoFile(
+      File path) throws IOException {
+    if (!path.exists()) {
+      throw new IOException("DiskBalancerConf file not found.");
+    }
+    try {
+      return DiskBalancerYaml.readDiskBalancerInfoFile(path);
+    } catch (IOException e) {
+      LOG.warn("Error loading DiskBalancerInfo yaml from {}",
+          path.getAbsolutePath(), e);
+      throw new IOException("Failed to parse DiskBalancerInfo from "
+          + path.getAbsolutePath(), e);
+    }
+  }
+
+  /**
+   * Persistent a {@link DiskBalancerInfo} to a local file.
+   *
+   * @throws IOException when read/write error occurs
+   */
+  private synchronized void writeDiskBalancerInfoTo(
+      DiskBalancerInfo diskBalancerInfo, File path)
+      throws IOException {
+    if (path.exists()) {
+      if (!path.delete() || !path.createNewFile()) {
+        throw new IOException("Unable to overwrite the DiskBalancerInfo 
file.");
+      }
+    } else {
+      if (!path.getParentFile().exists() &&
+          !path.getParentFile().mkdirs()) {
+        throw new IOException("Unable to create DiskBalancerInfo 
directories.");
+      }
+    }
+    DiskBalancerYaml.createDiskBalancerInfoFile(diskBalancerInfo, path);
+  }
+
+  public void setThreshold(double threshold) {
+    this.threshold = threshold;
+  }
+
+  public void setBandwidthInMB(long bandwidthInMB) {
+    this.bandwidthInMB = bandwidthInMB;
+  }
+
+  public void setParallelThread(int parallelThread) {
+    this.parallelThread = parallelThread;
+  }
+
+  public void setStopAfterDiskEven(boolean stopAfterDiskEven) {
+    this.stopAfterDiskEven = stopAfterDiskEven;
+  }
+
+  public void setVersion(DiskBalancerVersion version) {
+    this.version = version;
+  }
+
+  public DiskBalancerReportProto getDiskBalancerReportProto() {
+    DiskBalancerReportProto.Builder builder =
+        DiskBalancerReportProto.newBuilder();
+    return builder.setIsRunning(this.operationalState == 
DiskBalancerOperationalState.RUNNING)
+        .setBalancedBytes(totalBalancedBytes.get())
+        .setDiskBalancerConf(
+            HddsProtos.DiskBalancerConfigurationProto.newBuilder()
+                .setThreshold(threshold)
+                .setDiskBandwidthInMB(bandwidthInMB)
+                .setParallelThread(parallelThread)
+                .setStopAfterDiskEven(stopAfterDiskEven)
+                .build())
+        .build();
+  }
+
+  @Override
+  public BackgroundTaskQueue getTasks() {
+    BackgroundTaskQueue queue = new BackgroundTaskQueue();
+
+    if (this.operationalState == DiskBalancerOperationalState.STOPPED ||
+        this.operationalState == 
DiskBalancerOperationalState.PAUSED_BY_NODE_STATE) {
+      return queue;
+    }
+    metrics.incrRunningLoopCount();
+
+    if (shouldDelay()) {
+      metrics.incrIdleLoopExceedsBandwidthCount();
+      return queue;
+    }
+
+    int availableTaskCount = parallelThread - inProgressContainers.size();
+    if (availableTaskCount <= 0) {
+      LOG.info("No available thread for disk balancer service. " +
+          "Current thread count is {}.", parallelThread);
+      return queue;
+    }
+
+    for (int i = 0; i < availableTaskCount; i++) {
+      Pair<HddsVolume, HddsVolume> pair = volumeChoosingPolicy
+          .chooseVolume(volumeSet, threshold, deltaSizes, 
containerDefaultSize);
+      if (pair == null) {
+        continue;
+      }
+      HddsVolume sourceVolume = pair.getLeft(), destVolume = pair.getRight();
+      ContainerData toBalanceContainer = containerChoosingPolicy
+          .chooseContainer(ozoneContainer, sourceVolume, inProgressContainers);
+      if (toBalanceContainer != null) {
+        DiskBalancerTask task = new DiskBalancerTask(toBalanceContainer, 
sourceVolume,
+            destVolume);
+        queue.add(task);
+        
inProgressContainers.add(ContainerID.valueOf(toBalanceContainer.getContainerID()));
+        deltaSizes.put(sourceVolume, deltaSizes.getOrDefault(sourceVolume, 0L)
+            - toBalanceContainer.getBytesUsed());
+      } else {
+        // release destVolume committed bytes
+        destVolume.incCommittedBytes(0 - containerDefaultSize);
+      }
+    }
+
+    if (queue.isEmpty()) {
+      bytesToMove = 0;
+      if (stopAfterDiskEven) {
+        LOG.info("Disk balancer is stopped due to disk even as" +
+            " the property StopAfterDiskEven is set to true.");
+        this.operationalState = DiskBalancerOperationalState.STOPPED;
+        try {
+          // Persist the updated shouldRun status into the YAML file
+          writeDiskBalancerInfoTo(getDiskBalancerInfo(), diskBalancerInfoFile);
+        } catch (IOException e) {
+          LOG.warn("Failed to persist updated DiskBalancerInfo to file.", e);
+        }
+      }
+      metrics.incrIdleLoopNoAvailableVolumePairCount();
+    } else {
+      bytesToMove = calculateBytesToMove(volumeSet);

Review Comment:
   It does not consider committed space for bytes to move, this may give wrong 
info, as some volume is mostly occupied as committed, and treated as source 
volume.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to