http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
deleted file mode 100644
index 7c950dc..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ChunkManagerImpl.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import 
org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
-import org.apache.hadoop.ozone.container.common.helpers.ChunkUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.ChunkManager;
-import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.file.Files;
-import java.nio.file.StandardCopyOption;
-import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.ExecutionException;
-
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .Result.UNSUPPORTED_REQUEST;
-
-/**
- * An implementation of ChunkManager that is used by default in ozone.
- */
-public class ChunkManagerImpl implements ChunkManager {
-  static final Logger LOG =
-      LoggerFactory.getLogger(ChunkManagerImpl.class);
-
-  private final ContainerManager containerManager;
-
-  /**
-   * Constructs a ChunkManager.
-   *
-   * @param manager - ContainerManager.
-   */
-  public ChunkManagerImpl(ContainerManager manager) {
-    this.containerManager = manager;
-  }
-
-  /**
-   * writes a given chunk.
-   *
-   * @param pipeline - Name and the set of machines that make this container.
-   * @param keyName - Name of the Key.
-   * @param info - ChunkInfo.
-   * @throws StorageContainerException
-   */
-  @Override
-  public void writeChunk(Pipeline pipeline, String keyName, ChunkInfo info,
-      byte[] data, ContainerProtos.Stage stage)
-      throws StorageContainerException {
-    // we don't want container manager to go away while we are writing chunks.
-    containerManager.readLock();
-
-    // TODO : Take keyManager Write lock here.
-    try {
-      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
-      String containerName = pipeline.getContainerName();
-      Preconditions.checkNotNull(containerName,
-          "Container name cannot be null");
-      ContainerData container =
-          containerManager.readContainer(containerName);
-      File chunkFile = ChunkUtils.validateChunk(pipeline, container, info);
-      File tmpChunkFile = getTmpChunkFile(chunkFile, info);
-
-      LOG.debug("writing chunk:{} chunk stage:{} chunk file:{} tmp chunk file",
-          info.getChunkName(), stage, chunkFile, tmpChunkFile);
-      switch (stage) {
-      case WRITE_DATA:
-        ChunkUtils.writeData(tmpChunkFile, info, data);
-        break;
-      case COMMIT_DATA:
-        commitChunk(tmpChunkFile, chunkFile, containerName, info.getLen());
-        break;
-      case COMBINED:
-        // directly write to the chunk file
-        long oldSize = chunkFile.length();
-        ChunkUtils.writeData(chunkFile, info, data);
-        long newSize = chunkFile.length();
-        containerManager.incrBytesUsed(containerName, newSize - oldSize);
-        containerManager.incrWriteCount(containerName);
-        containerManager.incrWriteBytes(containerName, info.getLen());
-        break;
-      }
-    } catch (ExecutionException | NoSuchAlgorithmException | IOException e) {
-      LOG.error("write data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ", e,
-          CONTAINER_INTERNAL_ERROR);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.error("write data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ", e,
-          CONTAINER_INTERNAL_ERROR);
-    } finally {
-      containerManager.readUnlock();
-    }
-  }
-
-  // Create a temporary file in the same container directory
-  // in the format "<chunkname>.tmp"
-  private static File getTmpChunkFile(File chunkFile, ChunkInfo info)
-      throws StorageContainerException {
-    return new File(chunkFile.getParent(),
-        chunkFile.getName() +
-            OzoneConsts.CONTAINER_CHUNK_NAME_DELIMITER +
-            OzoneConsts.CONTAINER_TEMPORARY_CHUNK_PREFIX);
-  }
-
-  // Commit the chunk by renaming the temporary chunk file to chunk file
-  private void commitChunk(File tmpChunkFile, File chunkFile,
-      String containerName, long chunkLen) throws IOException {
-    long sizeDiff = tmpChunkFile.length() - chunkFile.length();
-    // It is safe to replace here as the earlier chunk if existing should be
-    // caught as part of validateChunk
-    Files.move(tmpChunkFile.toPath(), chunkFile.toPath(),
-        StandardCopyOption.REPLACE_EXISTING);
-    containerManager.incrBytesUsed(containerName, sizeDiff);
-    containerManager.incrWriteCount(containerName);
-    containerManager.incrWriteBytes(containerName, chunkLen);
-  }
-
-  /**
-   * reads the data defined by a chunk.
-   *
-   * @param pipeline - container pipeline.
-   * @param keyName - Name of the Key
-   * @param info - ChunkInfo.
-   * @return byte array
-   * @throws StorageContainerException
-   * TODO: Right now we do not support partial reads and writes of chunks.
-   * TODO: Explore if we need to do that for ozone.
-   */
-  @Override
-  public byte[] readChunk(Pipeline pipeline, String keyName, ChunkInfo info)
-      throws StorageContainerException {
-    containerManager.readLock();
-    try {
-      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
-      String containerName = pipeline.getContainerName();
-      Preconditions.checkNotNull(containerName,
-          "Container name cannot be null");
-      ContainerData container =
-          containerManager.readContainer(containerName);
-      File chunkFile = ChunkUtils.getChunkFile(pipeline, container, info);
-      ByteBuffer data =  ChunkUtils.readData(chunkFile, info);
-      containerManager.incrReadCount(containerName);
-      containerManager.incrReadBytes(containerName, chunkFile.length());
-      return data.array();
-    } catch (ExecutionException | NoSuchAlgorithmException e) {
-      LOG.error("read data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ",
-          e, CONTAINER_INTERNAL_ERROR);
-    } catch (InterruptedException e) {
-      Thread.currentThread().interrupt();
-      LOG.error("read data failed. error: {}", e);
-      throw new StorageContainerException("Internal error: ",
-          e, CONTAINER_INTERNAL_ERROR);
-    } finally {
-      containerManager.readUnlock();
-    }
-  }
-
-  /**
-   * Deletes a given chunk.
-   *
-   * @param pipeline - Pipeline.
-   * @param keyName - Key Name
-   * @param info - Chunk Info
-   * @throws StorageContainerException
-   */
-  @Override
-  public void deleteChunk(Pipeline pipeline, String keyName, ChunkInfo info)
-      throws StorageContainerException {
-    containerManager.readLock();
-    try {
-      Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
-      String containerName = pipeline.getContainerName();
-      Preconditions.checkNotNull(containerName,
-          "Container name cannot be null");
-      File chunkFile = ChunkUtils.getChunkFile(pipeline, containerManager
-          .readContainer(containerName), info);
-      if ((info.getOffset() == 0) && (info.getLen() == chunkFile.length())) {
-        FileUtil.fullyDelete(chunkFile);
-        containerManager.decrBytesUsed(containerName, chunkFile.length());
-      } else {
-        LOG.error("Not Supported Operation. Trying to delete a " +
-            "chunk that is in shared file. chunk info : " + info.toString());
-        throw new StorageContainerException("Not Supported Operation. " +
-            "Trying to delete a chunk that is in shared file. chunk info : "
-            + info.toString(), UNSUPPORTED_REQUEST);
-      }
-    } finally {
-      containerManager.readUnlock();
-    }
-  }
-
-  /**
-   * Shutdown the chunkManager.
-   *
-   * In the chunkManager we haven't acquired any resources, so nothing to do
-   * here. This call is made with containerManager Writelock held.
-   */
-  @Override
-  public void shutdown() {
-    Preconditions.checkState(this.containerManager.hasWriteLock());
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java
deleted file mode 100644
index b842493..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerLocationManagerImpl.java
+++ /dev/null
@@ -1,149 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.interfaces
-    .ContainerLocationManager;
-import 
org.apache.hadoop.ozone.container.common.interfaces.ContainerLocationManagerMXBean;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * A class that tells the ContainerManager where to place the containers.
- * Please note : There is *no* one-to-one correlation between metadata
- * Locations and data Locations.
- *
- *  For example : A user could map all container files to a
- *  SSD but leave data/metadata on bunch of other disks.
- */
-public class ContainerLocationManagerImpl implements ContainerLocationManager,
-    ContainerLocationManagerMXBean {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerLocationManagerImpl.class);
-
-  private final List<ContainerStorageLocation> dataLocations;
-  private int currentIndex;
-  private final List<StorageLocation> metadataLocations;
-  private final ObjectName jmxbean;
-
-  /**
-   * Constructs a Location Manager.
-   * @param metadataLocations  - Refers to the metadataLocations
-   * where we store the container metadata.
-   * @param dataDirs - metadataLocations where we store the actual
-   * data or chunk files.
-   * @param conf - configuration.
-   * @throws IOException
-   */
-  public ContainerLocationManagerImpl(List<StorageLocation> metadataLocations,
-      List<StorageLocation> dataDirs, Configuration conf)
-      throws IOException {
-    dataLocations = new LinkedList<>();
-    for (StorageLocation dataDir : dataDirs) {
-      dataLocations.add(new ContainerStorageLocation(dataDir, conf));
-    }
-    this.metadataLocations = metadataLocations;
-    jmxbean = MBeans.register("OzoneDataNode",
-        ContainerLocationManager.class.getSimpleName(), this);
-  }
-
-  /**
-   * Returns the path where the container should be placed from a set of
-   * metadataLocations.
-   *
-   * @return A path where we should place this container and metadata.
-   * @throws IOException
-   */
-  @Override
-  public Path getContainerPath()
-      throws IOException {
-    Preconditions.checkState(metadataLocations.size() > 0);
-    int index = currentIndex % metadataLocations.size();
-    return Paths.get(metadataLocations.get(index).getNormalizedUri());
-  }
-
-  /**
-   * Returns the path where the container Data file are stored.
-   *
-   * @return  a path where we place the LevelDB and data files of a container.
-   * @throws IOException
-   */
-  @Override
-  public Path getDataPath(String containerName) throws IOException {
-    Path currentPath = Paths.get(
-        dataLocations.get(currentIndex++ % dataLocations.size())
-            .getNormalizedUri());
-    currentPath = currentPath.resolve(OzoneConsts.CONTAINER_PREFIX);
-    return currentPath.resolve(containerName);
-  }
-
-  @Override
-  public StorageLocationReport[] getLocationReport() throws IOException {
-    StorageLocationReport[] reports =
-        new StorageLocationReport[dataLocations.size()];
-    for (int idx = 0; idx < dataLocations.size(); idx++) {
-      ContainerStorageLocation loc = dataLocations.get(idx);
-      long scmUsed = 0;
-      long remaining = 0;
-      try {
-        scmUsed = loc.getScmUsed();
-        remaining = loc.getAvailable();
-      } catch (IOException ex) {
-        LOG.warn("Failed to get scmUsed and remaining for container " +
-            "storage location {}", loc.getNormalizedUri());
-        // reset scmUsed and remaining if df/du failed.
-        scmUsed = 0;
-        remaining = 0;
-      }
-
-      // TODO: handle failed storage
-      // For now, include storage report for location that failed to get df/du.
-      StorageLocationReport r = new StorageLocationReport(
-          loc.getStorageUuId(), false, loc.getCapacity(),
-          scmUsed, remaining);
-      reports[idx] = r;
-    }
-    return reports;
-  }
-
-  /**
-   * Supports clean shutdown of container location du threads.
-   *
-   * @throws IOException
-   */
-  @Override
-  public void shutdown() throws IOException {
-    for (ContainerStorageLocation loc: dataLocations) {
-      loc.shutdown();
-    }
-    MBeans.unregister(jmxbean);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
deleted file mode 100644
index 35a2b64..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerManagerImpl.java
+++ /dev/null
@@ -1,1101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdsl.protocol.DatanodeDetails;
-import org.apache.hadoop.hdsl.protocol.proto.ContainerProtos;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.hdsl.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMNodeReport;
-import org.apache.hadoop.hdsl.protocol.proto
-    .StorageContainerDatanodeProtocolProtos.SCMStorageReport;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.ozone.container.common.helpers.KeyData;
-import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
-import org.apache.hadoop.ozone.container.common.interfaces.*;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsRequestProto;
-import 
org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
-import org.apache.hadoop.util.ReflectionUtils;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import org.apache.hadoop.utils.MetadataKeyFilters;
-import org.apache.hadoop.utils.MetadataStore;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.FilenameFilter;
-import java.io.IOException;
-import java.nio.file.Path;
-import java.nio.file.Paths;
-import java.security.DigestInputStream;
-import java.security.DigestOutputStream;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentNavigableMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.stream.Collectors;
-
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .Result.CONTAINER_EXISTS;
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .Result.CONTAINER_NOT_FOUND;
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .Result.INVALID_CONFIG;
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .Result.IO_EXCEPTION;
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .Result.NO_SUCH_ALGORITHM;
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .Result.UNABLE_TO_READ_METADATA_DB;
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .Result.UNSUPPORTED_REQUEST;
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-   .Result.ERROR_IN_COMPACT_DB;
-import static org.apache.hadoop.hdsl.protocol.proto.ContainerProtos
-    .Result.UNCLOSED_CONTAINER_IO;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
-import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_META;
-
-/**
- * A Generic ContainerManagerImpl that will be called from Ozone
- * ContainerManagerImpl. This allows us to support delta changes to ozone
- * version without having to rewrite the containerManager.
- */
-public class ContainerManagerImpl implements ContainerManager {
-  static final Logger LOG =
-      LoggerFactory.getLogger(ContainerManagerImpl.class);
-
-  private final ConcurrentSkipListMap<String, ContainerStatus>
-      containerMap = new ConcurrentSkipListMap<>();
-
-  // Use a non-fair RW lock for better throughput, we may revisit this decision
-  // if this causes fairness issues.
-  private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-  private ContainerLocationManager locationManager;
-  private ChunkManager chunkManager;
-  private KeyManager keyManager;
-  private Configuration conf;
-  private DatanodeDetails datanodeDetails;
-
-  private ContainerDeletionChoosingPolicy containerDeletionChooser;
-  private ContainerReportManager containerReportManager;
-
-  /**
-   * Init call that sets up a container Manager.
-   *
-   * @param config - Configuration.
-   * @param containerDirs - List of Metadata Container locations.
-   * @param dnDetails - DatanodeDetails.
-   * @throws IOException
-   */
-  @Override
-  public void init(
-      Configuration config, List<StorageLocation> containerDirs,
-      DatanodeDetails dnDetails) throws IOException {
-    Preconditions.checkNotNull(config, "Config must not be null");
-    Preconditions.checkNotNull(containerDirs, "Container directories cannot " +
-        "be null");
-    Preconditions.checkNotNull(dnDetails, "Datanode Details cannot " +
-        "be null");
-
-    Preconditions.checkState(containerDirs.size() > 0, "Number of container" +
-        " directories must be greater than zero.");
-
-    this.conf = config;
-    this.datanodeDetails = dnDetails;
-
-    readLock();
-    try {
-      containerDeletionChooser = ReflectionUtils.newInstance(conf.getClass(
-          ScmConfigKeys.OZONE_SCM_CONTAINER_DELETION_CHOOSING_POLICY,
-          TopNOrderedContainerDeletionChoosingPolicy.class,
-          ContainerDeletionChoosingPolicy.class), conf);
-
-      for (StorageLocation path : containerDirs) {
-        File directory = Paths.get(path.getNormalizedUri()).toFile();
-        if (!directory.exists() && !directory.mkdirs()) {
-          LOG.error("Container metadata directory doesn't exist "
-              + "and cannot be created. Path: {}", path.toString());
-          throw new StorageContainerException("Container metadata "
-              + "directory doesn't exist and cannot be created " + path
-              .toString(), INVALID_CONFIG);
-        }
-
-        // TODO: This will fail if any directory is invalid.
-        // We should fix this to handle invalid directories and continue.
-        // Leaving it this way to fail fast for time being.
-        if (!directory.isDirectory()) {
-          LOG.error("Invalid path to container metadata directory. path: {}",
-              path.toString());
-          throw new StorageContainerException("Invalid path to container " +
-              "metadata directory." + path, INVALID_CONFIG);
-        }
-        LOG.info("Loading containers under {}", path);
-        File[] files = directory.listFiles(new ContainerFilter());
-        if (files != null) {
-          for (File containerFile : files) {
-            LOG.debug("Loading container {}", containerFile);
-            String containerPath =
-                ContainerUtils.getContainerNameFromFile(containerFile);
-            Preconditions.checkNotNull(containerPath, "Container path cannot" +
-                " be null");
-            readContainerInfo(containerPath);
-          }
-        }
-      }
-
-      List<StorageLocation> dataDirs = new LinkedList<>();
-      for (String dir : config.getStrings(DFS_DATANODE_DATA_DIR_KEY)) {
-        StorageLocation location = StorageLocation.parse(dir);
-        dataDirs.add(location);
-      }
-      this.locationManager =
-          new ContainerLocationManagerImpl(containerDirs, dataDirs, config);
-
-      this.containerReportManager =
-          new ContainerReportManagerImpl(config);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Reads the Container Info from a file and verifies that checksum match. If
-   * the checksums match, then that file is added to containerMap.
-   *
-   * @param containerName - Name which points to the persisted container.
-   * @throws StorageContainerException
-   */
-  private void readContainerInfo(String containerName)
-      throws StorageContainerException {
-    Preconditions.checkState(containerName.length() > 0,
-        "Container name length cannot be zero.");
-    FileInputStream containerStream = null;
-    DigestInputStream dis = null;
-    FileInputStream metaStream = null;
-    Path cPath = Paths.get(containerName).getFileName();
-    String keyName = null;
-    if (cPath != null) {
-      keyName = cPath.toString();
-    }
-    Preconditions.checkNotNull(keyName,
-        "Container Name  to container key mapping is null");
-
-    try {
-      String containerFileName = containerName.concat(CONTAINER_EXTENSION);
-      String metaFileName = containerName.concat(CONTAINER_META);
-
-      containerStream = new FileInputStream(containerFileName);
-
-      metaStream = new FileInputStream(metaFileName);
-
-      MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-
-      dis = new DigestInputStream(containerStream, sha);
-
-      ContainerProtos.ContainerData containerDataProto =
-          ContainerProtos.ContainerData.parseDelimitedFrom(dis);
-      ContainerData containerData;
-      if (containerDataProto == null) {
-        // Sometimes container metadata might have been created but empty,
-        // when loading the info we get a null, this often means last time
-        // SCM was ending up at some middle phase causing that the metadata
-        // was not populated. Such containers are marked as inactive.
-        containerMap.put(keyName, new ContainerStatus(null));
-        return;
-      }
-      containerData = ContainerData.getFromProtBuf(containerDataProto, conf);
-      ContainerProtos.ContainerMeta meta =
-          ContainerProtos.ContainerMeta.parseDelimitedFrom(metaStream);
-      if (meta != null && !DigestUtils.sha256Hex(sha.digest())
-          .equals(meta.getHash())) {
-        // This means we were not able read data from the disk when booted the
-        // datanode. We are going to rely on SCM understanding that we don't
-        // have valid data for this container when we send container reports.
-        // Hopefully SCM will ask us to delete this container and rebuild it.
-        LOG.error("Invalid SHA found for container data. Name :{}"
-            + "cowardly refusing to read invalid data", containerName);
-        containerMap.put(keyName, new ContainerStatus(null));
-        return;
-      }
-
-      ContainerStatus containerStatus = new ContainerStatus(containerData);
-      // Initialize pending deletion blocks count in in-memory
-      // container status.
-      MetadataStore metadata = KeyUtils.getDB(containerData, conf);
-      List<Map.Entry<byte[], byte[]>> underDeletionBlocks = metadata
-          .getSequentialRangeKVs(null, Integer.MAX_VALUE,
-              MetadataKeyFilters.getDeletingKeyFilter());
-      containerStatus.incrPendingDeletionBlocks(underDeletionBlocks.size());
-
-      List<Map.Entry<byte[], byte[]>> liveKeys = metadata
-          .getRangeKVs(null, Integer.MAX_VALUE,
-              MetadataKeyFilters.getNormalKeyFilter());
-
-      // Get container bytesUsed upon loading container
-      // The in-memory state is updated upon key write or delete
-      // TODO: update containerDataProto and persist it into container MetaFile
-      long bytesUsed = 0;
-      bytesUsed = liveKeys.parallelStream().mapToLong(e-> {
-        KeyData keyData;
-        try {
-          keyData = KeyUtils.getKeyData(e.getValue());
-          return keyData.getSize();
-        } catch (IOException ex) {
-          return 0L;
-        }
-      }).sum();
-      containerStatus.setBytesUsed(bytesUsed);
-
-      containerMap.put(keyName, containerStatus);
-    } catch (IOException | NoSuchAlgorithmException ex) {
-      LOG.error("read failed for file: {} ex: {}", containerName,
-          ex.getMessage());
-
-      // TODO : Add this file to a recovery Queue.
-
-      // Remember that this container is busted and we cannot use it.
-      containerMap.put(keyName, new ContainerStatus(null));
-      throw new StorageContainerException("Unable to read container info",
-          UNABLE_TO_READ_METADATA_DB);
-    } finally {
-      IOUtils.closeStream(dis);
-      IOUtils.closeStream(containerStream);
-      IOUtils.closeStream(metaStream);
-    }
-  }
-
-  /**
-   * Creates a container with the given name.
-   *
-   * @param pipeline -- Nodes which make up this container.
-   * @param containerData - Container Name and metadata.
-   * @throws StorageContainerException - Exception
-   */
-  @Override
-  public void createContainer(Pipeline pipeline, ContainerData containerData)
-      throws StorageContainerException {
-    Preconditions.checkNotNull(containerData, "Container data cannot be null");
-    writeLock();
-    try {
-      if (containerMap.containsKey(containerData.getName())) {
-        LOG.debug("container already exists. {}", containerData.getName());
-        throw new StorageContainerException("container already exists.",
-            CONTAINER_EXISTS);
-      }
-
-      // This is by design. We first write and close the
-      // container Info and metadata to a directory.
-      // Then read back and put that info into the containerMap.
-      // This allows us to make sure that our write is consistent.
-
-      writeContainerInfo(containerData, false);
-      File cFile = new File(containerData.getContainerPath());
-      readContainerInfo(ContainerUtils.getContainerNameFromFile(cFile));
-    } catch (NoSuchAlgorithmException ex) {
-      LOG.error("Internal error: We seem to be running a JVM without a " +
-          "needed hash algorithm.");
-      throw new StorageContainerException("failed to create container",
-          NO_SUCH_ALGORITHM);
-    } finally {
-      writeUnlock();
-    }
-
-  }
-
-  /**
-   * Writes a container to a chosen location and updates the container Map.
-   *
-   * The file formats of ContainerData and Container Meta is the following.
-   *
-   * message ContainerData {
-   * required string name = 1;
-   * repeated KeyValue metadata = 2;
-   * optional string dbPath = 3;
-   * optional string containerPath = 4;
-   * optional int64 bytesUsed = 5;
-   * optional int64 size = 6;
-   * }
-   *
-   * message ContainerMeta {
-   * required string fileName = 1;
-   * required string hash = 2;
-   * }
-   *
-   * @param containerData - container Data
-   * @param overwrite - Whether we are overwriting.
-   * @throws StorageContainerException, NoSuchAlgorithmException
-   */
-  private void writeContainerInfo(ContainerData containerData,
-      boolean  overwrite)
-      throws StorageContainerException, NoSuchAlgorithmException {
-
-    Preconditions.checkNotNull(this.locationManager,
-        "Internal error: location manager cannot be null");
-
-    FileOutputStream containerStream = null;
-    DigestOutputStream dos = null;
-    FileOutputStream metaStream = null;
-
-    try {
-      Path metadataPath = null;
-      Path location = (!overwrite) ? locationManager.getContainerPath():
-          Paths.get(containerData.getContainerPath()).getParent();
-      if (location == null) {
-        throw new StorageContainerException(
-            "Failed to get container file path.",
-            CONTAINER_INTERNAL_ERROR);
-      }
-
-      File containerFile = ContainerUtils.getContainerFile(containerData,
-          location);
-      File metadataFile = ContainerUtils.getMetadataFile(containerData,
-          location);
-      String containerName = containerData.getContainerName();
-
-      if(!overwrite) {
-        ContainerUtils.verifyIsNewContainer(containerFile, metadataFile);
-        metadataPath = this.locationManager.getDataPath(containerName);
-        metadataPath = ContainerUtils.createMetadata(metadataPath,
-            containerName, conf);
-      }  else {
-        metadataPath = ContainerUtils.getMetadataDirectory(containerData);
-      }
-
-      containerStream = new FileOutputStream(containerFile);
-      metaStream = new FileOutputStream(metadataFile);
-      MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-
-      dos = new DigestOutputStream(containerStream, sha);
-      containerData.setDBPath(metadataPath.resolve(
-          ContainerUtils.getContainerDbFileName(containerName))
-          .toString());
-      containerData.setContainerPath(containerFile.toString());
-
-      ContainerProtos.ContainerData protoData = containerData
-          .getProtoBufMessage();
-      protoData.writeDelimitedTo(dos);
-
-      ContainerProtos.ContainerMeta protoMeta = ContainerProtos
-          .ContainerMeta.newBuilder()
-          .setFileName(containerFile.toString())
-          .setHash(DigestUtils.sha256Hex(sha.digest()))
-          .build();
-      protoMeta.writeDelimitedTo(metaStream);
-
-    } catch (IOException ex) {
-      // TODO : we need to clean up partially constructed files
-      // The proper way to do would be for a thread
-      // to read all these 3 artifacts and make sure they are
-      // sane. That info needs to come from the replication
-      // pipeline, and if not consistent delete these file.
-
-      // In case of ozone this is *not* a deal breaker since
-      // SCM is guaranteed to generate unique container names.
-      // The saving grace is that we check if we have residue files
-      // lying around when creating a new container. We need to queue
-      // this information to a cleaner thread.
-
-      LOG.error("Creation of container failed. Name: {}, we might need to " +
-              "cleanup partially created artifacts. ",
-          containerData.getContainerName(), ex);
-      throw new StorageContainerException("Container creation failed. ",
-          ex, CONTAINER_INTERNAL_ERROR);
-    } finally {
-      IOUtils.closeStream(dos);
-      IOUtils.closeStream(containerStream);
-      IOUtils.closeStream(metaStream);
-    }
-  }
-
-  /**
-   * Deletes an existing container.
-   *
-   * @param pipeline - nodes that make this container.
-   * @param containerName - name of the container.
-   * @param forceDelete - whether this container should be deleted forcibly.
-   * @throws StorageContainerException
-   */
-  @Override
-  public void deleteContainer(Pipeline pipeline, String containerName,
-      boolean forceDelete) throws StorageContainerException {
-    Preconditions.checkNotNull(containerName, "Container name cannot be null");
-    Preconditions.checkState(containerName.length() > 0,
-        "Container name length cannot be zero.");
-    writeLock();
-    try {
-      if (isOpen(pipeline.getContainerName())) {
-        throw new StorageContainerException(
-            "Deleting an open container is not allowed.",
-            UNCLOSED_CONTAINER_IO);
-      }
-
-      ContainerStatus status = containerMap.get(containerName);
-      if (status == null) {
-        LOG.debug("No such container. Name: {}", containerName);
-        throw new StorageContainerException("No such container. Name : " +
-            containerName, CONTAINER_NOT_FOUND);
-      }
-      if (status.getContainer() == null) {
-        LOG.debug("Invalid container data. Name: {}", containerName);
-        throw new StorageContainerException("Invalid container data. Name : " +
-            containerName, CONTAINER_NOT_FOUND);
-      }
-      ContainerUtils.removeContainer(status.getContainer(), conf, forceDelete);
-      containerMap.remove(containerName);
-    } catch (StorageContainerException e) {
-      throw e;
-    } catch (IOException e) {
-      // TODO : An I/O error during delete can leave partial artifacts on the
-      // disk. We will need the cleaner thread to cleanup this information.
-      LOG.error("Failed to cleanup container. Name: {}", containerName, e);
-      throw new StorageContainerException(containerName, e, IO_EXCEPTION);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  /**
-   * A simple interface for container Iterations.
-   * <p/>
-   * This call make no guarantees about consistency of the data between
-   * different list calls. It just returns the best known data at that point of
-   * time. It is possible that using this iteration you can miss certain
-   * container from the listing.
-   *
-   * @param prefix -  Return keys that match this prefix.
-   * @param count - how many to return
-   * @param prevKey - Previous Key Value or empty String.
-   * @param data - Actual containerData
-   * @throws StorageContainerException
-   */
-  @Override
-  public void listContainer(String prefix, long count, String prevKey,
-      List<ContainerData> data) throws StorageContainerException {
-    // TODO : Support list with Prefix and PrevKey
-    Preconditions.checkNotNull(data,
-        "Internal assertion: data cannot be null");
-    readLock();
-    try {
-      ConcurrentNavigableMap<String, ContainerStatus> map;
-      if (prevKey == null || prevKey.isEmpty()) {
-        map = containerMap.tailMap(containerMap.firstKey(), true);
-      } else {
-        map = containerMap.tailMap(prevKey, false);
-      }
-
-      int currentCount = 0;
-      for (ContainerStatus entry : map.values()) {
-        if (currentCount < count) {
-          data.add(entry.getContainer());
-          currentCount++;
-        } else {
-          return;
-        }
-      }
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Get metadata about a specific container.
-   *
-   * @param containerName - Name of the container
-   * @return ContainerData - Container Data.
-   * @throws StorageContainerException
-   */
-  @Override
-  public ContainerData readContainer(String containerName) throws
-      StorageContainerException {
-    Preconditions.checkNotNull(containerName, "Container name cannot be null");
-    Preconditions.checkState(containerName.length() > 0,
-        "Container name length cannot be zero.");
-    if (!containerMap.containsKey(containerName)) {
-      throw new StorageContainerException("Unable to find the container. Name: 
"
-          + containerName, CONTAINER_NOT_FOUND);
-    }
-    ContainerData cData = containerMap.get(containerName).getContainer();
-    if (cData == null) {
-      throw new StorageContainerException("Invalid container data. Name: "
-          + containerName, CONTAINER_INTERNAL_ERROR);
-    }
-    return cData;
-  }
-
-  /**
-   * Closes a open container, if it is already closed or does not exist a
-   * StorageContainerException is thrown.
-   *
-   * @param containerName - Name of the container.
-   * @throws StorageContainerException
-   */
-  @Override
-  public void closeContainer(String containerName)
-      throws StorageContainerException, NoSuchAlgorithmException {
-    ContainerData containerData = readContainer(containerName);
-    containerData.closeContainer();
-    writeContainerInfo(containerData, true);
-    MetadataStore db = KeyUtils.getDB(containerData, conf);
-
-    // It is ok if this operation takes a bit of time.
-    // Close container is not expected to be instantaneous.
-    try {
-      db.compactDB();
-    } catch (IOException e) {
-      LOG.error("Error in DB compaction while closing container", e);
-      throw new StorageContainerException(e, ERROR_IN_COMPACT_DB);
-    }
-
-    // Active is different from closed. Closed means it is immutable, active
-    // false means we have some internal error that is happening to this
-    // container. This is a way to track damaged containers if we have an
-    // I/O failure, this allows us to take quick action in case of container
-    // issues.
-
-    ContainerStatus status = new ContainerStatus(containerData);
-    containerMap.put(containerName, status);
-  }
-
-  @Override
-  public void updateContainer(Pipeline pipeline, String containerName,
-      ContainerData data, boolean forceUpdate)
-      throws StorageContainerException {
-    Preconditions.checkNotNull(pipeline, "Pipeline cannot be null");
-    Preconditions.checkNotNull(containerName, "Container name cannot be null");
-    Preconditions.checkNotNull(data, "Container data cannot be null");
-    FileOutputStream containerStream = null;
-    DigestOutputStream dos = null;
-    MessageDigest sha = null;
-    File containerFileBK = null, containerFile = null;
-    boolean deleted = false;
-
-    if(!containerMap.containsKey(containerName)) {
-      throw new StorageContainerException("Container doesn't exist. Name :"
-          + containerName, CONTAINER_NOT_FOUND);
-    }
-
-    try {
-      sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    } catch (NoSuchAlgorithmException e) {
-      throw new StorageContainerException("Unable to create Message Digest,"
-          + " usually this is a java configuration issue.",
-          NO_SUCH_ALGORITHM);
-    }
-
-    try {
-      Path location = locationManager.getContainerPath();
-      ContainerData orgData = containerMap.get(containerName).getContainer();
-      if (orgData == null) {
-        // updating a invalid container
-        throw new StorageContainerException("Update a container with invalid" +
-            "container meta data", CONTAINER_INTERNAL_ERROR);
-      }
-
-      if (!forceUpdate && !orgData.isOpen()) {
-        throw new StorageContainerException(
-            "Update a closed container is not allowed. Name: " + containerName,
-            UNSUPPORTED_REQUEST);
-      }
-
-      containerFile = ContainerUtils.getContainerFile(orgData, location);
-      // If forceUpdate is true, there is no need to check
-      // whether the container file exists.
-      if (!forceUpdate) {
-        if (!containerFile.exists() || !containerFile.canWrite()) {
-          throw new StorageContainerException(
-              "Container file not exists or corrupted. Name: " + containerName,
-              CONTAINER_INTERNAL_ERROR);
-        }
-
-        // Backup the container file
-        containerFileBK = File.createTempFile(
-            "tmp_" + System.currentTimeMillis() + "_",
-            containerFile.getName(), containerFile.getParentFile());
-        FileUtils.copyFile(containerFile, containerFileBK);
-
-        deleted = containerFile.delete();
-        containerStream = new FileOutputStream(containerFile);
-        dos = new DigestOutputStream(containerStream, sha);
-
-        ContainerProtos.ContainerData protoData = data.getProtoBufMessage();
-        protoData.writeDelimitedTo(dos);
-      }
-
-      // Update the in-memory map
-      ContainerStatus newStatus = new ContainerStatus(data);
-      containerMap.replace(containerName, newStatus);
-    } catch (IOException e) {
-      // Restore the container file from backup
-      if(containerFileBK != null && containerFileBK.exists() && deleted) {
-        if(containerFile.delete()
-            && containerFileBK.renameTo(containerFile)) {
-          throw new StorageContainerException("Container update failed,"
-              + " container data restored from the backup.",
-              CONTAINER_INTERNAL_ERROR);
-        } else {
-          throw new StorageContainerException(
-              "Failed to restore container data from the backup. Name: "
-                  + containerName, CONTAINER_INTERNAL_ERROR);
-        }
-      } else {
-        throw new StorageContainerException(
-            e.getMessage(), CONTAINER_INTERNAL_ERROR);
-      }
-    } finally {
-      if (containerFileBK != null && containerFileBK.exists()) {
-        if(!containerFileBK.delete()) {
-          LOG.warn("Unable to delete container file backup : {}.",
-              containerFileBK.getAbsolutePath());
-        }
-      }
-      IOUtils.closeStream(dos);
-      IOUtils.closeStream(containerStream);
-    }
-  }
-
-  @VisibleForTesting
-  protected File getContainerFile(ContainerData data) throws IOException {
-    return ContainerUtils.getContainerFile(data,
-        this.locationManager.getContainerPath());
-  }
-
-  /**
-   * Checks if a container exists.
-   *
-   * @param containerName - Name of the container.
-   * @return true if the container is open false otherwise.
-   * @throws StorageContainerException - Throws Exception if we are not able to
-   *                                   find the container.
-   */
-  @Override
-  public boolean isOpen(String containerName) throws StorageContainerException 
{
-    final ContainerStatus status = containerMap.get(containerName);
-    if (status == null) {
-      throw new StorageContainerException(
-          "Container status not found: " + containerName, CONTAINER_NOT_FOUND);
-    }
-    final ContainerData cData = status.getContainer();
-    if (cData == null) {
-      throw new StorageContainerException(
-          "Container not found: " + containerName, CONTAINER_NOT_FOUND);
-    }
-    return cData.isOpen();
-  }
-
-  /**
-   * Supports clean shutdown of container.
-   *
-   * @throws IOException
-   */
-  @Override
-  public void shutdown() throws IOException {
-    Preconditions.checkState(this.hasWriteLock(),
-        "Assumption that we are holding the lock violated.");
-    this.containerMap.clear();
-    this.locationManager.shutdown();
-  }
-
-
-  @VisibleForTesting
-  public ConcurrentSkipListMap<String, ContainerStatus> getContainerMap() {
-    return containerMap;
-  }
-
-  /**
-   * Acquire read lock.
-   */
-  @Override
-  public void readLock() {
-    this.lock.readLock().lock();
-
-  }
-
-  @Override
-  public void readLockInterruptibly() throws InterruptedException {
-    this.lock.readLock().lockInterruptibly();
-  }
-
-  /**
-   * Release read lock.
-   */
-  @Override
-  public void readUnlock() {
-    this.lock.readLock().unlock();
-  }
-
-  /**
-   * Check if the current thread holds read lock.
-   */
-  @Override
-  public boolean hasReadLock() {
-    return this.lock.readLock().tryLock();
-  }
-
-  /**
-   * Acquire write lock.
-   */
-  @Override
-  public void writeLock() {
-    this.lock.writeLock().lock();
-  }
-
-  /**
-   * Acquire write lock, unless interrupted while waiting.
-   */
-  @Override
-  public void writeLockInterruptibly() throws InterruptedException {
-    this.lock.writeLock().lockInterruptibly();
-
-  }
-
-  /**
-   * Release write lock.
-   */
-  @Override
-  public void writeUnlock() {
-    this.lock.writeLock().unlock();
-
-  }
-
-  /**
-   * Check if the current thread holds write lock.
-   */
-  @Override
-  public boolean hasWriteLock() {
-    return this.lock.writeLock().isHeldByCurrentThread();
-  }
-
-  public ChunkManager getChunkManager() {
-    return this.chunkManager;
-  }
-
-  /**
-   * Sets the chunk Manager.
-   *
-   * @param chunkManager - Chunk Manager
-   */
-  public void setChunkManager(ChunkManager chunkManager) {
-    this.chunkManager = chunkManager;
-  }
-
-  /**
-   * Gets the Key Manager.
-   *
-   * @return KeyManager.
-   */
-  @Override
-  public KeyManager getKeyManager() {
-    return this.keyManager;
-  }
-
-  /**
-   * Get the node report.
-   * @return node report.
-   */
-  @Override
-  public SCMNodeReport getNodeReport() throws IOException {
-    StorageLocationReport[] reports = locationManager.getLocationReport();
-    SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
-    for (int i = 0; i < reports.length; i++) {
-      SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
-      nrb.addStorageReport(i, srb.setStorageUuid(reports[i].getId())
-          .setCapacity(reports[i].getCapacity())
-          .setScmUsed(reports[i].getScmUsed())
-          .setRemaining(reports[i].getRemaining())
-          .build());
-    }
-    return nrb.build();
-  }
-
-
-  /**
-   * Gets container reports.
-   *
-   * @return List of all closed containers.
-   * @throws IOException
-   */
-  @Override
-  public List<ContainerData> getContainerReports() throws IOException {
-    LOG.debug("Starting container report iteration.");
-    // No need for locking since containerMap is a ConcurrentSkipListMap
-    // And we can never get the exact state since close might happen
-    // after we iterate a point.
-    return containerMap.entrySet().stream()
-        .filter(containerStatus ->
-            !containerStatus.getValue().getContainer().isOpen())
-        .map(containerStatus -> containerStatus.getValue().getContainer())
-        .collect(Collectors.toList());
-  }
-
-  /**
-   * Get container report.
-   *
-   * @return The container report.
-   * @throws IOException
-   */
-  @Override
-  public ContainerReportsRequestProto getContainerReport() throws IOException {
-    LOG.debug("Starting container report iteration.");
-    // No need for locking since containerMap is a ConcurrentSkipListMap
-    // And we can never get the exact state since close might happen
-    // after we iterate a point.
-    List<ContainerStatus> containers = containerMap.values().stream()
-        .collect(Collectors.toList());
-
-    ContainerReportsRequestProto.Builder crBuilder =
-        ContainerReportsRequestProto.newBuilder();
-
-    // TODO: support delta based container report
-    crBuilder.setDatanodeDetails(datanodeDetails.getProtoBufMessage())
-        .setType(ContainerReportsRequestProto.reportType.fullReport);
-
-    for (ContainerStatus container: containers) {
-      StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
-          StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
-      ciBuilder.setContainerName(container.getContainer().getContainerName())
-          .setSize(container.getContainer().getMaxSize())
-          .setUsed(container.getContainer().getBytesUsed())
-          .setKeyCount(container.getContainer().getKeyCount())
-          .setReadCount(container.getReadCount())
-          .setWriteCount(container.getWriteCount())
-          .setReadBytes(container.getReadBytes())
-          .setWriteBytes(container.getWriteBytes())
-          .setContainerID(container.getContainer().getContainerID());
-
-      if (container.getContainer().getHash() != null) {
-        ciBuilder.setFinalhash(container.getContainer().getHash());
-      }
-      crBuilder.addReports(ciBuilder.build());
-    }
-
-    return crBuilder.build();
-  }
-
-  /**
-   * Sets the Key Manager.
-   *
-   * @param keyManager - Key Manager.
-   */
-  @Override
-  public void setKeyManager(KeyManager keyManager) {
-    this.keyManager = keyManager;
-  }
-
-  /**
-   * Filter out only container files from the container metadata dir.
-   */
-  private static class ContainerFilter implements FilenameFilter {
-    /**
-     * Tests if a specified file should be included in a file list.
-     *
-     * @param dir the directory in which the file was found.
-     * @param name the name of the file.
-     * @return <code>true</code> if and only if the name should be included in
-     * the file list; <code>false</code> otherwise.
-     */
-    @Override
-    public boolean accept(File dir, String name) {
-      return name.endsWith(CONTAINER_EXTENSION);
-    }
-  }
-
-  @Override
-  public List<ContainerData> chooseContainerForBlockDeletion(
-      int count) throws StorageContainerException {
-    readLock();
-    try {
-      return containerDeletionChooser.chooseContainerForBlockDeletion(
-          count, containerMap);
-    } finally {
-      readUnlock();
-    }
-  }
-
-  @VisibleForTesting
-  public ContainerDeletionChoosingPolicy getContainerDeletionChooser() {
-    return containerDeletionChooser;
-  }
-
-  @Override
-  public void incrPendingDeletionBlocks(int numBlocks, String containerId) {
-    writeLock();
-    try {
-      ContainerStatus status = containerMap.get(containerId);
-      status.incrPendingDeletionBlocks(numBlocks);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  @Override
-  public void decrPendingDeletionBlocks(int numBlocks, String containerId) {
-    writeLock();
-    try {
-      ContainerStatus status = containerMap.get(containerId);
-      status.decrPendingDeletionBlocks(numBlocks);
-    } finally {
-      writeUnlock();
-    }
-  }
-
-  /**
-   * Increase the read count of the container.
-   *
-   * @param containerName - Name of the container.
-   */
-  @Override
-  public void incrReadCount(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
-    status.incrReadCount();
-  }
-
-  public long getReadCount(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
-    return status.getReadCount();
-  }
-
-  /**
-   * Increse the read counter for bytes read from the container.
-   *
-   * @param containerName - Name of the container.
-   * @param readBytes     - bytes read from the container.
-   */
-  @Override
-  public void incrReadBytes(String containerName, long readBytes) {
-    ContainerStatus status = containerMap.get(containerName);
-    status.incrReadBytes(readBytes);
-  }
-
-  public long getReadBytes(String containerName) {
-    readLock();
-    try {
-      ContainerStatus status = containerMap.get(containerName);
-      return status.getReadBytes();
-    } finally {
-      readUnlock();
-    }
-  }
-
-  /**
-   * Increase the write count of the container.
-   *
-   * @param containerName - Name of the container.
-   */
-  @Override
-  public void incrWriteCount(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
-    status.incrWriteCount();
-  }
-
-  public long getWriteCount(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
-    return status.getWriteCount();
-  }
-
-  /**
-   * Increse the write counter for bytes write into the container.
-   *
-   * @param containerName - Name of the container.
-   * @param writeBytes    - bytes write into the container.
-   */
-  @Override
-  public void incrWriteBytes(String containerName, long writeBytes) {
-    ContainerStatus status = containerMap.get(containerName);
-    status.incrWriteBytes(writeBytes);
-  }
-
-  public long getWriteBytes(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
-    return status.getWriteBytes();
-  }
-
-  /**
-   * Increase the bytes used by the container.
-   *
-   * @param containerName - Name of the container.
-   * @param used          - additional bytes used by the container.
-   * @return the current bytes used.
-   */
-  @Override
-  public long incrBytesUsed(String containerName, long used) {
-    ContainerStatus status = containerMap.get(containerName);
-    return status.incrBytesUsed(used);
-  }
-
-  /**
-   * Decrease the bytes used by the container.
-   *
-   * @param containerName - Name of the container.
-   * @param used          - additional bytes reclaimed by the container.
-   * @return the current bytes used.
-   */
-  @Override
-  public long decrBytesUsed(String containerName, long used) {
-    ContainerStatus status = containerMap.get(containerName);
-    return status.decrBytesUsed(used);
-  }
-
-  public long getBytesUsed(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
-    return status.getBytesUsed();
-  }
-
-  /**
-   * Get the number of keys in the container.
-   *
-   * @param containerName - Name of the container.
-   * @return the current key count.
-   */
-  @Override
-  public long getNumKeys(String containerName) {
-    ContainerStatus status = containerMap.get(containerName);
-    return status.getNumKeys();  }
-
-  /**
-   * Get the container report state to send via HB to SCM.
-   *
-   * @return container report state.
-   */
-  @Override
-  public ReportState getContainerReportState() {
-    return containerReportManager.getContainerReportState();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
deleted file mode 100644
index a300767..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerReportManagerImpl.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.impl;
-
-import org.apache.commons.lang3.RandomUtils;
-import static org.apache.hadoop.ozone.scm.HdslServerUtil
-    .getScmHeartbeatInterval;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ozone.OzoneConfigKeys;
-import 
org.apache.hadoop.ozone.container.common.interfaces.ContainerReportManager;
-import 
org.apache.hadoop.hdsl.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
-import org.apache.hadoop.util.Time;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * Class wraps the container report operations on datanode.
- * // TODO: support incremental/delta container report
- */
-public class ContainerReportManagerImpl implements ContainerReportManager {
-  // Last non-empty container report time
-  private long lastContainerReportTime;
-  private final long containerReportInterval;
-  private final long heartbeatInterval;
-  private AtomicLong reportCount;
-  private static final ReportState NO_CONTAINER_REPORTSTATE =
-      ReportState.newBuilder()
-          .setState(ReportState.states.noContainerReports)
-          .setCount(0).build();
-
-  public ContainerReportManagerImpl(Configuration config) {
-    this.lastContainerReportTime = -1;
-    this.reportCount = new AtomicLong(0L);
-    this.containerReportInterval = config.getTimeDuration(
-        OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL,
-        OzoneConfigKeys.OZONE_CONTAINER_REPORT_INTERVAL_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    this.heartbeatInterval = getScmHeartbeatInterval(config);
-  }
-
-  public ReportState getContainerReportState() {
-    if (lastContainerReportTime < 0) {
-      return getFullContainerReportState();
-    } else {
-      // Add a random delay (0~30s) on top of the container report
-      // interval (60s) so tha the SCM is overwhelmed by the container reports
-      // sent in sync.
-      if (Time.monotonicNow() - lastContainerReportTime >
-          (containerReportInterval + getRandomReportDelay())) {
-        return getFullContainerReportState();
-      } else {
-        return getNoContainerReportState();
-      }
-    }
-  }
-
-  private ReportState getFullContainerReportState() {
-    ReportState.Builder rsBuilder = ReportState.newBuilder();
-    rsBuilder.setState(ReportState.states.completeContinerReport);
-    rsBuilder.setCount(reportCount.incrementAndGet());
-    this.lastContainerReportTime = Time.monotonicNow();
-    return rsBuilder.build();
-  }
-
-  private ReportState getNoContainerReportState() {
-    return NO_CONTAINER_REPORTSTATE;
-  }
-
-  private long getRandomReportDelay() {
-    return RandomUtils.nextLong(0, heartbeatInterval);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java
deleted file mode 100644
index 5577323..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStatus.java
+++ /dev/null
@@ -1,217 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.impl;
-
-import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
-
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This class represents the state of a container. if the
- * container reading encountered an error when we boot up we will post that
- * info to a recovery queue and keep the info in the containerMap.
- * <p/>
- * if and when the issue is fixed, the expectation is that this entry will be
- * deleted by the recovery thread from the containerMap and will insert entry
- * instead of modifying this class.
- */
-public class ContainerStatus {
-  private final ContainerData containerData;
-
-  /**
-   * Number of pending deletion blocks in container.
-   */
-  private int numPendingDeletionBlocks;
-
-  private AtomicLong readBytes;
-
-  private AtomicLong writeBytes;
-
-  private AtomicLong readCount;
-
-  private AtomicLong writeCount;
-
-  /**
-   * Creates a Container Status class.
-   *
-   * @param containerData - ContainerData.
-   */
-  ContainerStatus(ContainerData containerData) {
-    this.numPendingDeletionBlocks = 0;
-    this.containerData = containerData;
-    this.readCount = new AtomicLong(0L);
-    this.readBytes =  new AtomicLong(0L);
-    this.writeCount =  new AtomicLong(0L);
-    this.writeBytes =  new AtomicLong(0L);
-  }
-
-  /**
-   * Returns container if it is active. It is not active if we have had an
-   * error and we are waiting for the background threads to fix the issue.
-   *
-   * @return ContainerData.
-   */
-  public ContainerData getContainer() {
-    return containerData;
-  }
-
-  /**
-   * Increase the count of pending deletion blocks.
-   *
-   * @param numBlocks increment number
-   */
-  public void incrPendingDeletionBlocks(int numBlocks) {
-    this.numPendingDeletionBlocks += numBlocks;
-  }
-
-  /**
-   * Decrease the count of pending deletion blocks.
-   *
-   * @param numBlocks decrement number
-   */
-  public void decrPendingDeletionBlocks(int numBlocks) {
-    this.numPendingDeletionBlocks -= numBlocks;
-  }
-
-  /**
-   * Get the number of pending deletion blocks.
-   */
-  public int getNumPendingDeletionBlocks() {
-    return this.numPendingDeletionBlocks;
-  }
-
-  /**
-   * Get the number of bytes read from the container.
-   * @return the number of bytes read from the container.
-   */
-  public long getReadBytes() {
-    return readBytes.get();
-  }
-
-  /**
-   * Increase the number of bytes read from the container.
-   * @param bytes number of bytes read.
-   */
-  public void incrReadBytes(long bytes) {
-    this.readBytes.addAndGet(bytes);
-  }
-
-  /**
-   * Get the number of times the container is read.
-   * @return the number of times the container is read.
-   */
-  public long getReadCount() {
-    return readCount.get();
-  }
-
-  /**
-   * Increase the number of container read count by 1.
-   */
-  public void incrReadCount() {
-    this.readCount.incrementAndGet();
-  }
-
-  /**
-   * Get the number of bytes write into the container.
-   * @return the number of bytes write into the container.
-   */
-  public long getWriteBytes() {
-    return writeBytes.get();
-  }
-
-  /**
-   * Increase the number of bytes write into the container.
-   * @param bytes the number of bytes write into the container.
-   */
-  public void incrWriteBytes(long bytes) {
-    this.writeBytes.addAndGet(bytes);
-  }
-
-  /**
-   * Get the number of writes into the container.
-   * @return the number of writes into the container.
-   */
-  public long getWriteCount() {
-    return writeCount.get();
-  }
-
-  /**
-   * Increase the number of writes into the container by 1.
-   */
-  public void incrWriteCount() {
-    this.writeCount.incrementAndGet();
-  }
-
-  /**
-   * Get the number of bytes used by the container.
-   * @return the number of bytes used by the container.
-   */
-  public long getBytesUsed() {
-    return containerData.getBytesUsed();
-  }
-
-  /**
-   * Increase the number of bytes used by the container.
-   * @param used number of bytes used by the container.
-   * @return the current number of bytes used by the container afert increase.
-   */
-  public long incrBytesUsed(long used) {
-    return containerData.addBytesUsed(used);
-  }
-
-  /**
-   * Set the number of bytes used by the container.
-   * @param used the number of bytes used by the container.
-   */
-  public void setBytesUsed(long used) {
-    containerData.setBytesUsed(used);
-  }
-
-  /**
-   * Decrease the number of bytes used by the container.
-   * @param reclaimed the number of bytes reclaimed from the container.
-   * @return the current number of bytes used by the container after decrease.
-   */
-  public long decrBytesUsed(long reclaimed) {
-    return this.containerData.addBytesUsed(-1L * reclaimed);
-  }
-
-  /**
-   * Get the maximum container size.
-   * @return the maximum container size.
-   */
-  public long getMaxSize() {
-    return containerData.getMaxSize();
-  }
-
-  /**
-   * Set the maximum container size.
-   * @param size the maximum container size.
-   */
-  public void setMaxSize(long size) {
-    this.containerData.setMaxSize(size);
-  }
-
-  /**
-   * Get the number of keys in the container.
-   * @return the number of keys in the container.
-   */
-  public long getNumKeys() {
-    return containerData.getKeyCount();
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
 
b/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
deleted file mode 100644
index 07a3a53..0000000
--- 
a/hadoop-hdsl/container-service/src/main/java/org/apache/hadoop/ozone/container/common/impl/ContainerStorageLocation.java
+++ /dev/null
@@ -1,203 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.impl;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.CachingGetSpaceUsed;
-import org.apache.hadoop.fs.DF;
-import org.apache.hadoop.fs.GetSpaceUsed;
-import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
-import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.util.ShutdownHookManager;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.FileNotFoundException;
-import java.io.OutputStreamWriter;
-import java.io.FileOutputStream;
-import java.net.URI;
-import java.nio.charset.StandardCharsets;
-import java.nio.file.Paths;
-import java.util.Scanner;
-
-import static org.apache.hadoop.util.RunJar.SHUTDOWN_HOOK_PRIORITY;
-
-/**
- * Class that wraps the space usage of the Datanode Container Storage Location
- * by SCM containers.
- */
-public class ContainerStorageLocation {
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ContainerStorageLocation.class);
-
-  private static final String DU_CACHE_FILE = "scmUsed";
-  private volatile boolean scmUsedSaved = false;
-
-  private final StorageLocation dataLocation;
-  private final String storageUuId;
-  private final DF usage;
-  private final GetSpaceUsed scmUsage;
-  private final File scmUsedFile;
-
-  public ContainerStorageLocation(StorageLocation dataLoc, Configuration conf)
-      throws IOException {
-    this.dataLocation = dataLoc;
-    this.storageUuId = DatanodeStorage.generateUuid();
-    File dataDir = Paths.get(dataLoc.getNormalizedUri()).resolve(
-        OzoneConsts.CONTAINER_PREFIX).toFile();
-    // Initialize container data root if it does not exist as required by DF/DU
-    if (!dataDir.exists()) {
-      if (!dataDir.mkdirs()) {
-        LOG.error("Unable to create the container storage location at : {}",
-            dataDir);
-        throw new IllegalArgumentException("Unable to create the container" +
-            " storage location at : " + dataDir);
-      }
-    }
-    scmUsedFile = new File(dataDir, DU_CACHE_FILE);
-    // get overall disk usage
-    this.usage = new DF(dataDir, conf);
-    // get SCM specific usage
-    this.scmUsage = new CachingGetSpaceUsed.Builder().setPath(dataDir)
-        .setConf(conf)
-        .setInitialUsed(loadScmUsed())
-        .build();
-
-    // Ensure scm usage is saved during shutdown.
-    ShutdownHookManager.get().addShutdownHook(
-        new Runnable() {
-          @Override
-          public void run() {
-            if (!scmUsedSaved) {
-              saveScmUsed();
-            }
-          }
-        }, SHUTDOWN_HOOK_PRIORITY);
-  }
-
-  public URI getNormalizedUri() {
-    return dataLocation.getNormalizedUri();
-  }
-
-  public String getStorageUuId() {
-    return storageUuId;
-  }
-  public long getCapacity() {
-    long capacity = usage.getCapacity();
-    return (capacity > 0) ? capacity : 0;
-  }
-
-  public long getAvailable() throws IOException {
-    long remaining = getCapacity() - getScmUsed();
-    long available = usage.getAvailable();
-    if (remaining > available) {
-      remaining = available;
-    }
-    return (remaining > 0) ? remaining : 0;
-  }
-
-  public long getScmUsed() throws IOException{
-    return scmUsage.getUsed();
-  }
-
-  public void shutdown() {
-    saveScmUsed();
-    scmUsedSaved = true;
-
-    if (scmUsage instanceof CachingGetSpaceUsed) {
-      IOUtils.cleanupWithLogger(null, ((CachingGetSpaceUsed) scmUsage));
-    }
-  }
-
-  /**
-   * Read in the cached DU value and return it if it is less than 600 seconds
-   * old (DU update interval). Slight imprecision of scmUsed is not critical
-   * and skipping DU can significantly shorten the startup time.
-   * If the cached value is not available or too old, -1 is returned.
-   */
-  long loadScmUsed() {
-    long cachedScmUsed;
-    long mtime;
-    Scanner sc;
-
-    try {
-      sc = new Scanner(scmUsedFile, "UTF-8");
-    } catch (FileNotFoundException fnfe) {
-      return -1;
-    }
-
-    try {
-      // Get the recorded scmUsed from the file.
-      if (sc.hasNextLong()) {
-        cachedScmUsed = sc.nextLong();
-      } else {
-        return -1;
-      }
-      // Get the recorded mtime from the file.
-      if (sc.hasNextLong()) {
-        mtime = sc.nextLong();
-      } else {
-        return -1;
-      }
-
-      // Return the cached value if mtime is okay.
-      if (mtime > 0 && (Time.now() - mtime < 600000L)) {
-        LOG.info("Cached ScmUsed found for {} : {} ", dataLocation,
-            cachedScmUsed);
-        return cachedScmUsed;
-      }
-      return -1;
-    } finally {
-      sc.close();
-    }
-  }
-
-  /**
-   * Write the current scmUsed to the cache file.
-   */
-  void saveScmUsed() {
-    if (scmUsedFile.exists() && !scmUsedFile.delete()) {
-      LOG.warn("Failed to delete old scmUsed file in {}.", dataLocation);
-    }
-    OutputStreamWriter out = null;
-    try {
-      long used = getScmUsed();
-      if (used > 0) {
-        out = new OutputStreamWriter(new FileOutputStream(scmUsedFile),
-            StandardCharsets.UTF_8);
-        // mtime is written last, so that truncated writes won't be valid.
-        out.write(Long.toString(used) + " " + Long.toString(Time.now()));
-        out.flush();
-        out.close();
-        out = null;
-      }
-    } catch (IOException ioe) {
-      // If write failed, the volume might be bad. Since the cache file is
-      // not critical, log the error and continue.
-      LOG.warn("Failed to write scmUsed to " + scmUsedFile, ioe);
-    } finally {
-      IOUtils.cleanupWithLogger(null, out);
-    }
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to