http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
index 35906c6..0af3c4f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/HdfsServerConstants.java
@@ -189,10 +189,8 @@ public interface HdfsServerConstants {
         return NamenodeRole.NAMENODE;
       }
     }
-
+    
     public void setClusterId(String cid) {
-      Preconditions.checkState(this == UPGRADE || this == UPGRADEONLY
-          || this == FORMAT);
       clusterId = cid;
     }
 
@@ -217,7 +215,6 @@ public interface HdfsServerConstants {
     }
 
     public void setForce(int force) {
-      Preconditions.checkState(this == RECOVER);
       this.force = force;
     }
     
@@ -230,7 +227,6 @@ public interface HdfsServerConstants {
     }
     
     public void setForceFormat(boolean force) {
-      Preconditions.checkState(this == FORMAT);
       isForceFormat = force;
     }
     
@@ -239,7 +235,6 @@ public interface HdfsServerConstants {
     }
     
     public void setInteractiveFormat(boolean interactive) {
-      Preconditions.checkState(this == FORMAT);
       isInteractiveFormat = interactive;
     }
     

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 1f51935..b757fd7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -48,6 +48,7 @@ import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_P
 import static 
org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_KEY;
 import static org.apache.hadoop.util.ExitUtil.terminate;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import 
org.apache.hadoop.hdfs.protocol.proto.ReconfigurationProtocolProtos.ReconfigurationProtocolService;
 
@@ -109,10 +110,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
 import org.apache.hadoop.hdfs.server.datanode.checker.DatasetVolumeChecker;
 import org.apache.hadoop.hdfs.server.datanode.checker.StorageLocationChecker;
-import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.util.AutoCloseableLock;
 import org.apache.hadoop.hdfs.client.BlockReportOptions;
 import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
@@ -191,7 +190,6 @@ import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.net.DNS;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.unix.DomainSocket;
-import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SaslPropertiesResolver;
 import org.apache.hadoop.security.SecurityUtil;
@@ -373,7 +371,6 @@ public class DataNode extends ReconfigurableBase
   private final String confVersion;
   private final long maxNumberOfBlocksToLog;
   private final boolean pipelineSupportECN;
-  private final boolean ozoneEnabled;
 
   private final List<String> usersWithLocalPathAccess;
   private final boolean connectToDnViaHostname;
@@ -402,8 +399,6 @@ public class DataNode extends ReconfigurableBase
 
   private final SocketFactory socketFactory;
 
-  private DatanodeStateMachine datanodeStateMachine;
-
   private static Tracer createTracer(Configuration conf) {
     return new Tracer.Builder("DataNode").
         conf(TraceUtils.wrapHadoopConf(DATANODE_HTRACE_PREFIX , conf)).
@@ -414,8 +409,6 @@ public class DataNode extends ReconfigurableBase
 
   private ScheduledThreadPoolExecutor metricsLoggerTimer;
 
-  private ObjectStoreHandler objectStoreHandler = null;
-
   /**
    * Creates a dummy DataNode for testing purpose.
    */
@@ -434,7 +427,6 @@ public class DataNode extends ReconfigurableBase
     this.connectToDnViaHostname = false;
     this.blockScanner = new BlockScanner(this, this.getConf());
     this.pipelineSupportECN = false;
-    this.ozoneEnabled = false;
     this.socketFactory = NetUtils.getDefaultSocketFactory(conf);
     this.dnConf = new DNConf(this);
     initOOBTimeout();
@@ -474,8 +466,6 @@ public class DataNode extends ReconfigurableBase
         DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,
         DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT);
 
-    this.ozoneEnabled = DFSUtil.isOzoneEnabled(conf);
-
     confVersion = "core-" +
         conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +
         ",hdfs-" +
@@ -531,7 +521,7 @@ public class DataNode extends ReconfigurableBase
 
   @Override  // ReconfigurableBase
   protected Configuration getNewConf() {
-    return new OzoneConfiguration();
+    return new HdfsConfiguration();
   }
 
   /**
@@ -961,8 +951,8 @@ public class DataNode extends ReconfigurableBase
     // the DN is started by JSVC, pass it along.
     ServerSocketChannel httpServerChannel = secureResources != null ?
         secureResources.getHttpServerChannel() : null;
-    this.httpServer = new DatanodeHttpServer(getConf(), this, 
httpServerChannel,
-        this.objectStoreHandler);
+
+    httpServer = new DatanodeHttpServer(getConf(), this, httpServerChannel);
     httpServer.start();
     if (httpServer.getHttpAddress() != null) {
       infoPort = httpServer.getHttpAddress().getPort();
@@ -1411,9 +1401,7 @@ public class DataNode extends ReconfigurableBase
     
     // global DN settings
     registerMXBean();
-
     initDataXceiver();
-    initObjectStoreHandler();
     startInfoServer();
     pauseMonitor = new JvmPauseMonitor();
     pauseMonitor.init(getConf());
@@ -1454,19 +1442,6 @@ public class DataNode extends ReconfigurableBase
   }
 
   /**
-   * Initializes the object store handler.  This must be called before
-   * initialization of the HTTP server.
-   *
-   * @throws IOException if there is an I/O error
-   */
-  private void initObjectStoreHandler() throws IOException {
-    if (this.ozoneEnabled) {
-      this.objectStoreHandler = new ObjectStoreHandler(getConf());
-      LOG.info("ozone is enabled.");
-    }
-  }
-
-  /**
    * Checks if the DataNode has a secure configuration if security is enabled.
    * There are 2 possible configurations that are considered secure:
    * 1. The server has bound to privileged ports for RPC and HTTP via
@@ -1578,6 +1553,11 @@ public class DataNode extends ReconfigurableBase
         streamingAddr.getAddress().getHostAddress(), hostName, 
         storage.getDatanodeUuid(), getXferPort(), getInfoPort(),
             infoSecurePort, getIpcPort());
+    for (ServicePlugin plugin : plugins) {
+      if (plugin instanceof DataNodeServicePlugin) {
+        ((DataNodeServicePlugin) plugin).onDatanodeIdCreation(dnId);
+      }
+    }
     return new DatanodeRegistration(dnId, storageInfo, 
         new ExportedBlockKeys(), VersionInfo.getVersion());
   }
@@ -1598,31 +1578,15 @@ public class DataNode extends ReconfigurableBase
           + ". Expecting " + storage.getDatanodeUuid());
     }
 
-    if (isOzoneEnabled()) {
-      if (datanodeStateMachine == null) {
-        datanodeStateMachine = new DatanodeStateMachine(
-            getDatanodeId(),
-            getConf());
-        datanodeStateMachine.startDaemon();
+    for (ServicePlugin plugin : plugins) {
+      if (plugin instanceof DataNodeServicePlugin) {
+        ((DataNodeServicePlugin) plugin)
+            .onDatanodeSuccessfulNamenodeRegisration(bpRegistration);
       }
     }
     registerBlockPoolWithSecretManager(bpRegistration, blockPoolId);
   }
-
-  @VisibleForTesting
-  public OzoneContainer getOzoneContainerManager() {
-    return this.datanodeStateMachine.getContainer();
-  }
-
-  @VisibleForTesting
-  public DatanodeStateMachine.DatanodeStates getOzoneStateMachineState() {
-    if (this.datanodeStateMachine != null) {
-      return this.datanodeStateMachine.getContext().getState();
-    }
-    // if the state machine doesn't exist then DN initialization is in progress
-    return DatanodeStateMachine.DatanodeStates.INIT;
-  }
-
+  
   /**
    * After the block pool has contacted the NN, registers that block pool
    * with the secret manager, updating it with the secrets provided by the NN.
@@ -1729,11 +1693,11 @@ public class DataNode extends ReconfigurableBase
   BPOfferService getBPOfferService(String bpid){
     return blockPoolManager.get(bpid);
   }
-
+  
   int getBpOsCount() {
     return blockPoolManager.getAllNamenodeThreads().size();
   }
-
+  
   /**
    * Initializes the {@link #data}. The initialization is done only once, when
    * handshake with the the first namenode is completed.
@@ -2021,7 +1985,7 @@ public class DataNode extends ReconfigurableBase
         }
       }
     }
-
+    
     List<BPOfferService> bposArray = (this.blockPoolManager == null)
         ? new ArrayList<BPOfferService>()
         : this.blockPoolManager.getAllNamenodeThreads();
@@ -2054,6 +2018,7 @@ public class DataNode extends ReconfigurableBase
 
     // Terminate directory scanner and block scanner
     shutdownPeriodicScanners();
+    shutdownDiskBalancer();
 
     // Stop the web server
     if (httpServer != null) {
@@ -2064,17 +2029,6 @@ public class DataNode extends ReconfigurableBase
       }
     }
 
-    // Stop the object store handler
-    if (isOzoneEnabled()) {
-      if (this.objectStoreHandler != null) {
-        this.objectStoreHandler.close();
-        if(datanodeStateMachine != null &&
-            !datanodeStateMachine.isDaemonStopped()) {
-          datanodeStateMachine.stopDaemon();
-        }
-      }
-    }
-
     volumeChecker.shutdownAndWait(1, TimeUnit.SECONDS);
 
     if (storageLocationChecker != null) {
@@ -2088,7 +2042,7 @@ public class DataNode extends ReconfigurableBase
     // shouldRun is set to false here to prevent certain threads from exiting
     // before the restart prep is done.
     this.shouldRun = false;
-
+    
     // wait reconfiguration thread, if any, to exit
     shutdownReconfigurationTask();
 
@@ -2098,8 +2052,9 @@ public class DataNode extends ReconfigurableBase
       while (true) {
         // When shutting down for restart, wait 2.5 seconds before forcing
         // termination of receiver threads.
-        if (!this.shutdownForUpgrade || (this.shutdownForUpgrade && (
-            Time.monotonicNow() - timeNotified > 1000))) {
+        if (!this.shutdownForUpgrade ||
+            (this.shutdownForUpgrade && (Time.monotonicNow() - timeNotified
+                > 1000))) {
           this.threadGroup.interrupt();
           break;
         }
@@ -2110,8 +2065,7 @@ public class DataNode extends ReconfigurableBase
         }
         try {
           Thread.sleep(sleepMs);
-        } catch (InterruptedException e) {
-        }
+        } catch (InterruptedException e) {}
         sleepMs = sleepMs * 3 / 2; // exponential backoff
         if (sleepMs > 200) {
           sleepMs = 200;
@@ -2137,9 +2091,9 @@ public class DataNode extends ReconfigurableBase
       metrics.setDataNodeActiveXceiversCount(0);
     }
 
-    // IPC server needs to be shutdown late in the process, otherwise
-    // shutdown command response won't get sent.
-    if (ipcServer != null) {
+   // IPC server needs to be shutdown late in the process, otherwise
+   // shutdown command response won't get sent.
+   if (ipcServer != null) {
       ipcServer.stop();
     }
 
@@ -2154,7 +2108,7 @@ public class DataNode extends ReconfigurableBase
         LOG.warn("Received exception in BlockPoolManager#shutDownAll", ie);
       }
     }
-
+    
     if (storage != null) {
       try {
         this.storage.unlockAll();
@@ -2175,11 +2129,9 @@ public class DataNode extends ReconfigurableBase
       MBeans.unregister(dataNodeInfoBeanName);
       dataNodeInfoBeanName = null;
     }
-    if (shortCircuitRegistry != null) {
-      shortCircuitRegistry.shutdown();
-    }
+    if (shortCircuitRegistry != null) shortCircuitRegistry.shutdown();
     LOG.info("Shutdown complete.");
-    synchronized (this) {
+    synchronized(this) {
       // it is already false, but setting it again to avoid a findbug warning.
       this.shouldRun = false;
       // Notify the main thread.
@@ -2717,9 +2669,8 @@ public class DataNode extends ReconfigurableBase
    */
   public static DataNode instantiateDataNode(String args [], Configuration 
conf,
       SecureResources resources) throws IOException {
-    if (conf == null) {
-      conf = new OzoneConfiguration();
-    }
+    if (conf == null)
+      conf = new HdfsConfiguration();
     
     if (args != null) {
       // parse generic hadoop options
@@ -3252,12 +3203,6 @@ public class DataNode extends ReconfigurableBase
           } catch (InterruptedException ie) { }
         }
         shutdown();
-
-        if (isOzoneEnabled()) {
-          if(datanodeStateMachine != null) {
-            datanodeStateMachine.stopDaemon();
-          }
-        }
       }
     };
 
@@ -3552,13 +3497,10 @@ public class DataNode extends ReconfigurableBase
     return metricsLoggerTimer;
   }
 
-  public boolean isOzoneEnabled() {
-    return ozoneEnabled;
-  }
-
   public Tracer getTracer() {
     return tracer;
   }
+
   /**
    * Allows submission of a disk balancer Job.
    * @param planID  - Hash value of the plan.
@@ -3676,4 +3618,14 @@ public class DataNode extends ReconfigurableBase
     }
     return volumeInfoList;
   }
+
+  @Private
+  public SecureResources getSecureResources() {
+    return secureResources;
+  }
+
+  @Private
+  public Collection<ServicePlugin> getPlugins() {
+    return Collections.unmodifiableList(plugins);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeServicePlugin.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeServicePlugin.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeServicePlugin.java
new file mode 100644
index 0000000..08aae8b
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeServicePlugin.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
+import org.apache.hadoop.util.ServicePlugin;
+
+/**
+ * Datanode specific service plugin with additional hooks.
+ */
+public interface DataNodeServicePlugin extends ServicePlugin{
+
+  /**
+   * Extension point to modify the datanode id.
+   *
+   * @param dataNodeId
+   */
+  default void onDatanodeIdCreation(DatanodeID dataNodeId) {
+    //NOOP
+  }
+
+  /**
+   * Extension point to modify the datanode id.
+   *
+   * @param dataNodeId
+   */
+  default void onDatanodeSuccessfulNamenodeRegisration(
+      DatanodeRegistration dataNodeId) {
+    //NOOP
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
deleted file mode 100644
index a302462..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ObjectStoreHandler.java
+++ /dev/null
@@ -1,189 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode;
-
-import static org.apache.hadoop.ozone.OzoneConfigKeys.*;
-import static 
com.sun.jersey.api.core.ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS;
-import static com.sun.jersey.api.core.ResourceConfig.FEATURE_TRACE;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.sun.jersey.api.container.ContainerFactory;
-import com.sun.jersey.api.core.ApplicationAdapter;
-
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.ksm.protocolPB
-    .KeySpaceManagerProtocolClientSideTranslatorPB;
-import org.apache.hadoop.ozone.ksm.protocolPB.KeySpaceManagerProtocolPB;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.scm.protocolPB
-    .ScmBlockLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.scm.protocolPB.ScmBlockLocationProtocolPB;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.ipc.Client;
-import org.apache.hadoop.ipc.ProtobufRpcEngine;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.conf.OzoneConfiguration;
-import org.apache.hadoop.scm.protocolPB
-    .StorageContainerLocationProtocolClientSideTranslatorPB;
-import org.apache.hadoop.scm.protocolPB.StorageContainerLocationProtocolPB;
-import org.apache.hadoop.ozone.web.handlers.ServiceFilter;
-import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
-import org.apache.hadoop.ozone.web.ObjectStoreApplication;
-import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer;
-import org.apache.hadoop.ozone.web.storage.DistributedStorageHandler;
-import org.apache.hadoop.ozone.web.localstorage.LocalStorageHandler;
-import org.apache.hadoop.security.UserGroupInformation;
-
-/**
- * Implements object store handling within the DataNode process.  This class is
- * responsible for initializing and maintaining the RPC clients and servers and
- * the web application required for the object store implementation.
- */
-public final class ObjectStoreHandler implements Closeable {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(ObjectStoreJerseyContainer.class);
-
-  private final ObjectStoreJerseyContainer objectStoreJerseyContainer;
-  private final KeySpaceManagerProtocolClientSideTranslatorPB
-      keySpaceManagerClient;
-  private final StorageContainerLocationProtocolClientSideTranslatorPB
-      storageContainerLocationClient;
-  private final ScmBlockLocationProtocolClientSideTranslatorPB
-      scmBlockLocationClient;
-  private final StorageHandler storageHandler;
-
-  /**
-   * Creates a new ObjectStoreHandler.
-   *
-   * @param conf configuration
-   * @throws IOException if there is an I/O error
-   */
-  public ObjectStoreHandler(Configuration conf) throws IOException {
-    String shType = conf.getTrimmed(OZONE_HANDLER_TYPE_KEY,
-        OZONE_HANDLER_TYPE_DEFAULT);
-    LOG.info("ObjectStoreHandler initializing with {}: {}",
-        OZONE_HANDLER_TYPE_KEY, shType);
-    boolean ozoneTrace = conf.getBoolean(OZONE_TRACE_ENABLED_KEY,
-        OZONE_TRACE_ENABLED_DEFAULT);
-
-    // Initialize Jersey container for object store web application.
-    if (OzoneConsts.OZONE_HANDLER_DISTRIBUTED.equalsIgnoreCase(shType)) {
-      RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
-          ProtobufRpcEngine.class);
-      long scmVersion =
-          RPC.getProtocolVersion(StorageContainerLocationProtocolPB.class);
-
-      InetSocketAddress scmAddress =
-          OzoneClientUtils.getScmAddressForClients(conf);
-      this.storageContainerLocationClient =
-          new StorageContainerLocationProtocolClientSideTranslatorPB(
-              RPC.getProxy(StorageContainerLocationProtocolPB.class, 
scmVersion,
-              scmAddress, UserGroupInformation.getCurrentUser(), conf,
-              NetUtils.getDefaultSocketFactory(conf),
-              Client.getRpcTimeout(conf)));
-
-      InetSocketAddress scmBlockAddress =
-          OzoneClientUtils.getScmAddressForBlockClients(conf);
-      this.scmBlockLocationClient =
-          new ScmBlockLocationProtocolClientSideTranslatorPB(
-              RPC.getProxy(ScmBlockLocationProtocolPB.class, scmVersion,
-                  scmBlockAddress, UserGroupInformation.getCurrentUser(), conf,
-                  NetUtils.getDefaultSocketFactory(conf),
-                  Client.getRpcTimeout(conf)));
-
-      RPC.setProtocolEngine(conf, KeySpaceManagerProtocolPB.class,
-          ProtobufRpcEngine.class);
-      long ksmVersion =
-          RPC.getProtocolVersion(KeySpaceManagerProtocolPB.class);
-      InetSocketAddress ksmAddress = OzoneClientUtils.getKsmAddress(conf);
-      this.keySpaceManagerClient =
-          new KeySpaceManagerProtocolClientSideTranslatorPB(
-              RPC.getProxy(KeySpaceManagerProtocolPB.class, ksmVersion,
-              ksmAddress, UserGroupInformation.getCurrentUser(), conf,
-              NetUtils.getDefaultSocketFactory(conf),
-              Client.getRpcTimeout(conf)));
-
-      storageHandler = new DistributedStorageHandler(
-          new OzoneConfiguration(conf),
-          this.storageContainerLocationClient,
-          this.keySpaceManagerClient);
-    } else {
-      if (OzoneConsts.OZONE_HANDLER_LOCAL.equalsIgnoreCase(shType)) {
-        storageHandler = new LocalStorageHandler(conf);
-        this.storageContainerLocationClient = null;
-        this.scmBlockLocationClient = null;
-        this.keySpaceManagerClient = null;
-      } else {
-        throw new IllegalArgumentException(
-            String.format("Unrecognized value for %s: %s,"
-                + " Allowed values are %s,%s",
-                OZONE_HANDLER_TYPE_KEY, shType,
-                OzoneConsts.OZONE_HANDLER_DISTRIBUTED,
-                OzoneConsts.OZONE_HANDLER_LOCAL));
-      }
-    }
-    ApplicationAdapter aa =
-        new ApplicationAdapter(new ObjectStoreApplication());
-    Map<String, Object> settingsMap = new HashMap<>();
-    settingsMap.put(PROPERTY_CONTAINER_REQUEST_FILTERS,
-        ServiceFilter.class.getCanonicalName());
-    settingsMap.put(FEATURE_TRACE, ozoneTrace);
-    aa.setPropertiesAndFeatures(settingsMap);
-    this.objectStoreJerseyContainer = ContainerFactory.createContainer(
-        ObjectStoreJerseyContainer.class, aa);
-    this.objectStoreJerseyContainer.setStorageHandler(storageHandler);
-  }
-
-  /**
-   * Returns the initialized web application container.
-   *
-   * @return initialized web application container
-   */
-  public ObjectStoreJerseyContainer getObjectStoreJerseyContainer() {
-    return this.objectStoreJerseyContainer;
-  }
-
-  /**
-   * Returns the storage handler.
-   *
-   * @return returns the storage handler
-   */
-  public StorageHandler getStorageHandler() {
-    return this.storageHandler;
-  }
-
-  @Override
-  public void close() {
-    LOG.info("Closing ObjectStoreHandler.");
-    storageHandler.close();
-    IOUtils.cleanupWithLogger(LOG, storageContainerLocationClient);
-    IOUtils.cleanupWithLogger(LOG, scmBlockLocationClient);
-    IOUtils.cleanupWithLogger(LOG, keySpaceManagerClient);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 1852192..c141293 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -1745,7 +1745,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> 
{
       }
       assert newReplicaInfo.getState() == ReplicaState.FINALIZED
           : "Replica should be finalized";
-      
+
       volumeMap.add(bpid, newReplicaInfo);
       return newReplicaInfo;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
index dbc03c2..2e46b28 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/DatanodeHttpServer.java
@@ -51,11 +51,9 @@ import org.apache.hadoop.hdfs.server.common.JspHelper;
 import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
 import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.DataNodeUGIProvider;
-import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
 import org.apache.hadoop.http.HttpConfig;
 import org.apache.hadoop.http.HttpServer2;
 import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.security.http.RestCsrfPreventionFilter;
 import org.apache.hadoop.security.ssl.SSLFactory;
@@ -93,19 +91,11 @@ public class DatanodeHttpServer implements Closeable {
 
   public DatanodeHttpServer(final Configuration conf,
       final DataNode datanode,
-      final ServerSocketChannel externalHttpChannel,
-      final ObjectStoreHandler objectStoreHandler)
+      final ServerSocketChannel externalHttpChannel)
     throws IOException {
     this.restCsrfPreventionFilter = createRestCsrfPreventionFilter(conf);
     this.conf = conf;
 
-    final ObjectStoreJerseyContainer finalContainer;
-    if (objectStoreHandler != null) {
-      finalContainer = objectStoreHandler.getObjectStoreJerseyContainer();
-    } else {
-      finalContainer = null;
-    }
-
     Configuration confForInfoServer = new Configuration(conf);
     confForInfoServer.setInt(HttpServer2.HTTP_MAX_THREADS_KEY, 10);
     int proxyPort =
@@ -160,8 +150,7 @@ public class DatanodeHttpServer implements Closeable {
           }
           p.addLast(
               new ChunkedWriteHandler(),
-              new URLDispatcher(jettyAddr, conf, confForCreate,
-                  finalContainer));
+              new URLDispatcher(jettyAddr, conf, confForCreate));
         }
       });
 
@@ -218,8 +207,7 @@ public class DatanodeHttpServer implements Closeable {
             }
             p.addLast(
                 new ChunkedWriteHandler(),
-                new URLDispatcher(jettyAddr, conf, confForCreate,
-                  finalContainer));
+                new URLDispatcher(jettyAddr, conf, confForCreate));
           }
         });
     } else {
@@ -386,4 +374,3 @@ public class DatanodeHttpServer implements Closeable {
     }
   }
 }
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/RestCsrfPreventionFilterHandler.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/RestCsrfPreventionFilterHandler.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/RestCsrfPreventionFilterHandler.java
index 4958bb5..e275fb3 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/RestCsrfPreventionFilterHandler.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/RestCsrfPreventionFilterHandler.java
@@ -43,7 +43,7 @@ import 
org.apache.hadoop.security.http.RestCsrfPreventionFilter.HttpInteraction;
  * handler drops the request and immediately sends an HTTP 400 response.
  */
 @InterfaceAudience.Private
-final class RestCsrfPreventionFilterHandler
+public final class RestCsrfPreventionFilterHandler
     extends SimpleChannelInboundHandler<HttpRequest> {
 
   private static final Log LOG = DatanodeHttpServer.LOG;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java
index dd958d1..8ec5bf6 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/web/URLDispatcher.java
@@ -17,106 +17,42 @@
  */
 package org.apache.hadoop.hdfs.server.datanode.web;
 
-import static 
org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
-
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelPipeline;
 import io.netty.channel.SimpleChannelInboundHandler;
 import io.netty.handler.codec.http.HttpRequest;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler;
-import org.apache.hadoop.ozone.client.rest.headers.Header;
-import org.apache.hadoop.ozone.web.netty.ObjectStoreJerseyContainer;
-import 
org.apache.hadoop.ozone.web.netty.RequestDispatchObjectStoreChannelHandler;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
 import java.net.InetSocketAddress;
 
+import static 
org.apache.hadoop.hdfs.server.datanode.web.webhdfs.WebHdfsHandler.WEBHDFS_PREFIX;
+
 class URLDispatcher extends SimpleChannelInboundHandler<HttpRequest> {
-  protected static final Logger LOG =
-      LoggerFactory.getLogger(URLDispatcher.class);
   private final InetSocketAddress proxyHost;
   private final Configuration conf;
   private final Configuration confForCreate;
-  private final ObjectStoreJerseyContainer objectStoreJerseyContainer;
 
   URLDispatcher(InetSocketAddress proxyHost, Configuration conf,
-                Configuration confForCreate,
-                ObjectStoreJerseyContainer objectStoreJerseyContainer)
-      throws IOException {
+                Configuration confForCreate) {
     this.proxyHost = proxyHost;
     this.conf = conf;
     this.confForCreate = confForCreate;
-    this.objectStoreJerseyContainer = objectStoreJerseyContainer;
   }
 
   @Override
   protected void channelRead0(ChannelHandlerContext ctx, HttpRequest req)
       throws Exception {
+    String uri = req.getUri();
     ChannelPipeline p = ctx.pipeline();
-    if (isWebHdfsRequest(req)) {
+    if (uri.startsWith(WEBHDFS_PREFIX)) {
       WebHdfsHandler h = new WebHdfsHandler(conf, confForCreate);
       p.replace(this, WebHdfsHandler.class.getSimpleName(), h);
       h.channelRead0(ctx, req);
-    } else if (isObjectStoreRequest(req)) {
-      RequestDispatchObjectStoreChannelHandler h =
-          new RequestDispatchObjectStoreChannelHandler(
-              this.objectStoreJerseyContainer);
-      p.replace(this,
-          RequestDispatchObjectStoreChannelHandler.class.getSimpleName(), h);
-      h.channelRead0(ctx, req);
-    } else if (!isObjectStoreRequestHeaders(req)){
+    } else {
       SimpleHttpProxyHandler h = new SimpleHttpProxyHandler(proxyHost);
       p.replace(this, SimpleHttpProxyHandler.class.getSimpleName(), h);
       h.channelRead0(ctx, req);
     }
   }
-
-
-  /*
-   * Returns true if the request has ozone headers
-   *
-   * @param req HTTP request
-   * @return true if request has ozone header, else false
-   */
-
-  private boolean isObjectStoreRequestHeaders(HttpRequest req) {
-    for (String version : req.headers().getAll(Header.OZONE_VERSION_HEADER)) {
-      if (version != null) {
-        LOG.debug("ozone : dispatching call to Ozone, when security is not " +
-            "enabled");
-        return true;
-      }
-    }
-    return false;
-  }
-
-
-  /*
-   * Returns true if the request is to be handled by the object store.
-   *
-   * @param req HTTP request
-   * @return true if the request is to be handled by the object store
-   */
-  private boolean isObjectStoreRequest(HttpRequest req) {
-    if (this.objectStoreJerseyContainer == null) {
-      LOG.debug("ozone : ozone is disabled or when security is enabled, ozone" 
+
-          " is not supported");
-      return false;
-    }
-    return isObjectStoreRequestHeaders(req);
-  }
-
-  /**
-   * Returns true if the request is to be handled by WebHDFS.
-   *
-   * @param req HTTP request
-   * @return true if the request is to be handled by WebHDFS
-   */
-  private boolean isWebHdfsRequest(HttpRequest req) {
-    return req.getUri().startsWith(WEBHDFS_PREFIX);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
index 2debf2e..10650da 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamespaceInfo.java
@@ -34,9 +34,9 @@ import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
 /**
- * NamespaceInfo is returned by the name-node in reply
+ * NamespaceInfo is returned by the name-node in reply 
  * to a data-node handshake.
- *
+ * 
  */
 @InterfaceAudience.Private
 @InterfaceStability.Evolving
@@ -110,7 +110,7 @@ public class NamespaceInfo extends StorageInfo {
     this.capabilities = capabilities;
   }
 
-  public NamespaceInfo(int nsID, String clusterID, String bpID,
+  public NamespaceInfo(int nsID, String clusterID, String bpID, 
       long cT) {
     this(nsID, clusterID, bpID, cT, Storage.getBuildVersion(),
         VersionInfo.getVersion());
@@ -122,7 +122,7 @@ public class NamespaceInfo extends StorageInfo {
         VersionInfo.getVersion());
     this.state = st;
   }
-
+  
   public long getCapabilities() {
     return capabilities;
   }
@@ -151,7 +151,7 @@ public class NamespaceInfo extends StorageInfo {
   public String getBlockPoolID() {
     return blockPoolID;
   }
-
+  
   public String getSoftwareVersion() {
     return softwareVersion;
   }
@@ -194,4 +194,4 @@ public class NamespaceInfo extends StorageInfo {
           "BPID=" + storage.getBlockPoolID() + ".");
     }
   }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java
index a1ea3fb..aaa1038 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/GetConf.java
@@ -26,7 +26,6 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Collection;
 import java.util.Set;
 
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -36,8 +35,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.DFSUtil.ConfiguredNNAddress;
-import org.apache.hadoop.ozone.client.OzoneClientUtils;
-import org.apache.hadoop.conf.OzoneConfiguration;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
@@ -81,10 +78,6 @@ public class GetConf extends Configured implements Tool {
         "gets the exclude file path that defines the datanodes " +
         "that need to decommissioned."),
     NNRPCADDRESSES("-nnRpcAddresses", "gets the namenode rpc addresses"),
-    KEYSPACEMANAGER("-keyspacemanagers",
-        "gets list of ozone key space manager nodes in the cluster"),
-    STORAGECONTAINERMANAGER("-storagecontainermanagers",
-        "gets list of ozone storage container manager nodes in the cluster"),
     CONFKEY("-confKey [key]", "gets a specific key from the configuration");
 
     private static final Map<String, CommandHandler> map;
@@ -104,10 +97,6 @@ public class GetConf extends Configured implements Tool {
           new CommandHandler(DFSConfigKeys.DFS_HOSTS_EXCLUDE));
       map.put(StringUtils.toLowerCase(NNRPCADDRESSES.getName()),
           new NNRpcAddressesCommandHandler());
-      map.put(StringUtils.toLowerCase(KEYSPACEMANAGER.getName()),
-          new KeySpaceManagersCommandHandler());
-      map.put(StringUtils.toLowerCase(STORAGECONTAINERMANAGER.getName()),
-          new StorageContainerManagersCommandHandler());
       map.put(StringUtils.toLowerCase(CONFKEY.getName()),
           new PrintConfKeyCommandHandler());
     }
@@ -235,36 +224,9 @@ public class GetConf extends Configured implements Tool {
    * Handler for {@link Command#SECONDARY}
    */
   static class SecondaryNameNodesCommandHandler extends CommandHandler {
-    @Override public int doWorkInternal(GetConf tool, String[] args)
-        throws IOException {
-      tool.printMap(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf()));
-      return 0;
-    }
-  }
-
-  /**
-   * Handler for {@link Command#STORAGECONTAINERMANAGER}.
-   */
-  static class StorageContainerManagersCommandHandler extends CommandHandler {
     @Override
-    public int doWorkInternal(GetConf tool, String[] args) throws IOException {
-      Collection<InetSocketAddress> addresses =
-          OzoneClientUtils.getSCMAddresses(tool.getConf());
-      for (InetSocketAddress addr : addresses) {
-        tool.printOut(addr.getHostName());
-      }
-      return 0;
-    }
-  }
-
-  /**
-   * Handler for {@link Command#KEYSPACEMANAGER}.
-   */
-  static class KeySpaceManagersCommandHandler extends CommandHandler {
-    @Override
-    public int doWorkInternal(GetConf tool, String[] args) throws IOException {
-      tool.printOut(OzoneClientUtils.getKsmAddress(tool.getConf())
-          .getHostName());
+    public int doWorkInternal(GetConf tool, String []args) throws IOException {
+      tool.printMap(DFSUtil.getSecondaryNameNodeAddresses(tool.getConf()));
       return 0;
     }
   }
@@ -395,11 +357,8 @@ public class GetConf extends Configured implements Tool {
     if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
       System.exit(0);
     }
-
-    Configuration conf = new Configuration();
-    conf.addResource(new HdfsConfiguration());
-    conf.addResource(new OzoneConfiguration());
-    int res = ToolRunner.run(new GetConf(conf), args);
+    
+    int res = ToolRunner.run(new GetConf(new HdfsConfiguration()), args);
     System.exit(res);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/InconsistentStorageStateException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/InconsistentStorageStateException.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/InconsistentStorageStateException.java
deleted file mode 100644
index c3f9234..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/InconsistentStorageStateException.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one
-  * or more contributor license agreements.  See the NOTICE file
-  * distributed with this work for additional information
-  * regarding copyright ownership.  The ASF licenses this file
-  * to you under the Apache License, Version 2.0 (the
-  * "License"); you may not use this file except in compliance
-  * with the License.  You may obtain a copy of the License at
-  *
-  *     http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package org.apache.hadoop.ozone.common;
-
-import java.io.File;
-import java.io.IOException;
-
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-
-/**
- * The exception is thrown when file system state is inconsistent
- * and is not recoverable.
- */
[email protected]
[email protected]
-public class InconsistentStorageStateException extends IOException {
-  private static final long serialVersionUID = 1L;
-
-  public InconsistentStorageStateException(String descr) {
-    super(descr);
-  }
-
-  public InconsistentStorageStateException(File dir, String descr) {
-    super("Directory " + getFilePath(dir) + " is in an inconsistent state: "
-        + descr);
-  }
-
-  private static String getFilePath(File dir) {
-    try {
-      return dir.getCanonicalPath();
-    } catch (IOException e) {
-    }
-    return dir.getPath();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/Storage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/Storage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/Storage.java
deleted file mode 100644
index cfcddb4..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/Storage.java
+++ /dev/null
@@ -1,249 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.common;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.file.DirectoryStream;
-import java.nio.file.Files;
-import java.nio.file.Path;
-import java.util.Properties;
-
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeType;
-import org.apache.hadoop.util.Time;
-
-/**
- * Storage information file. This Class defines the methods to check
- * the consistency of the storage dir and the version file.
- * <p>
- * Local storage information is stored in a separate file VERSION.
- * It contains type of the node,
- * the storage layout version, the SCM id, and
- * the KSM/SCM state creation time.
- *
- */
[email protected]
-public abstract class Storage {
-  private static final Logger LOG = LoggerFactory.getLogger(Storage.class);
-
-  protected static final String STORAGE_DIR_CURRENT = "current";
-  protected static final String STORAGE_FILE_VERSION = "VERSION";
-
-  private final NodeType nodeType;
-  private final File root;
-  private final File storageDir;
-
-  private StorageState state;
-  private StorageInfo storageInfo;
-
-
-  /**
-   * Determines the state of the Version file.
-   */
-  public enum StorageState {
-    NON_EXISTENT, NOT_INITIALIZED, INITIALIZED
-  }
-
-  public Storage(NodeType type, File root, String sdName)
-      throws IOException {
-    this.nodeType = type;
-    this.root = root;
-    this.storageDir = new File(root, sdName);
-    this.state = getStorageState();
-    if (state == StorageState.INITIALIZED) {
-      this.storageInfo = new StorageInfo(type, getVersionFile());
-    } else {
-      this.storageInfo = new StorageInfo(
-          nodeType, StorageInfo.newClusterID(), Time.now());
-      setNodeProperties();
-    }
-  }
-
-  /**
-   * Gets the path of the Storage dir.
-   * @return Stoarge dir path
-   */
-  public String getStorageDir() {
-    return storageDir.getAbsoluteFile().toString();
-  }
-
-  /**
-   * Gets the state of the version file.
-   * @return the state of the Version file
-   */
-  public StorageState getState() {
-    return state;
-  }
-
-  public NodeType getNodeType() {
-    return storageInfo.getNodeType();
-  }
-
-  public String getClusterID() {
-    return storageInfo.getClusterID();
-  }
-
-  public long getCreationTime() {
-    return storageInfo.getCreationTime();
-  }
-
-  public void setClusterId(String clusterId) throws IOException {
-    if (state == StorageState.INITIALIZED) {
-      throw new IOException(
-          "Storage directory " + storageDir + " already initialized.");
-    } else {
-      storageInfo.setClusterId(clusterId);
-    }
-  }
-
-  /**
-   * Retreives the storageInfo instance to read/write the common
-   * version file properties.
-   * @return the instance of the storageInfo class
-   */
-  protected StorageInfo getStorageInfo() {
-    return storageInfo;
-  }
-
-  abstract protected Properties getNodeProperties();
-
-  /**
-   * Sets the Node properties spaecific to KSM/SCM.
-   */
-  private void setNodeProperties() {
-    Properties nodeProperties = getNodeProperties();
-    if (nodeProperties != null) {
-      for (String key : nodeProperties.stringPropertyNames()) {
-        storageInfo.setProperty(key, nodeProperties.getProperty(key));
-      }
-    }
-  }
-
-  /**
-   * Directory {@code current} contains latest files defining
-   * the file system meta-data.
-   *
-   * @return the directory path
-   */
-  private File getCurrentDir() {
-    return new File(storageDir, STORAGE_DIR_CURRENT);
-  }
-
-  /**
-   * File {@code VERSION} contains the following fields:
-   * <ol>
-   * <li>node type</li>
-   * <li>KSM/SCM state creation time</li>
-   * <li>other fields specific for this node type</li>
-   * </ol>
-   * The version file is always written last during storage directory updates.
-   * The existence of the version file indicates that all other files have
-   * been successfully written in the storage directory, the storage is valid
-   * and does not need to be recovered.
-   *
-   * @return the version file path
-   */
-  private File getVersionFile() {
-    return new File(getCurrentDir(), STORAGE_FILE_VERSION);
-  }
-
-
-  /**
-   * Check to see if current/ directory is empty. This method is used
-   * before determining to format the directory.
-   * @throws IOException if unable to list files under the directory.
-   */
-  private void checkEmptyCurrent() throws IOException {
-    File currentDir = getCurrentDir();
-    if (!currentDir.exists()) {
-      // if current/ does not exist, it's safe to format it.
-      return;
-    }
-    try (DirectoryStream<Path> dirStream = Files
-        .newDirectoryStream(currentDir.toPath())) {
-      if (dirStream.iterator().hasNext()) {
-        throw new InconsistentStorageStateException(getCurrentDir(),
-            "Can't initialize the storage directory because the current "
-                + "it is not empty.");
-      }
-    }
-  }
-
-  /**
-   * Check consistency of the storage directory.
-   *
-   * @return state {@link StorageState} of the storage directory
-   * @throws IOException
-   */
-  private StorageState getStorageState() throws IOException {
-    assert root != null : "root is null";
-    String rootPath = root.getCanonicalPath();
-    try { // check that storage exists
-      if (!root.exists()) {
-        // storage directory does not exist
-        LOG.warn("Storage directory " + rootPath + " does not exist");
-        return StorageState.NON_EXISTENT;
-      }
-      // or is inaccessible
-      if (!root.isDirectory()) {
-        LOG.warn(rootPath + "is not a directory");
-        return StorageState.NON_EXISTENT;
-      }
-      if (!FileUtil.canWrite(root)) {
-        LOG.warn("Cannot access storage directory " + rootPath);
-        return StorageState.NON_EXISTENT;
-      }
-    } catch (SecurityException ex) {
-      LOG.warn("Cannot access storage directory " + rootPath, ex);
-      return StorageState.NON_EXISTENT;
-    }
-
-    // check whether current directory is valid
-    File versionFile = getVersionFile();
-    boolean hasCurrent = versionFile.exists();
-
-    if (hasCurrent) {
-      return StorageState.INITIALIZED;
-    } else {
-      checkEmptyCurrent();
-      return StorageState.NOT_INITIALIZED;
-    }
-  }
-
-  /**
-   * Creates the Version file if not present,
-   * otherwise returns with IOException.
-   * @throws IOException
-   */
-  public void initialize() throws IOException {
-    if (state == StorageState.INITIALIZED) {
-      throw new IOException("Storage directory already initialized.");
-    }
-    if (!getCurrentDir().mkdirs()) {
-      throw new IOException("Cannot create directory " + getCurrentDir());
-    }
-    storageInfo.writeTo(getVersionFile());
-  }
-
-}
-

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
deleted file mode 100644
index 82ad955..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/StorageInfo.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.ozone.common;
-
-import java.io.IOException;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.File;
-import java.io.RandomAccessFile;
-
-import java.util.Properties;
-import java.util.UUID;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeType;
-
-/**
- * Common class for storage information. This class defines the common
- * properties and functions to set them , write them into the version file
- * and read them from the version file.
- *
- */
[email protected]
-public class StorageInfo {
-
-  private Properties properties = new Properties();
-
-  /**
-   * Property to hold node type.
-   */
-  private static final String NODE_TYPE = "nodeType";
-  /**
-   * Property to hold ID of the cluster.
-   */
-  private static final String CLUSTER_ID = "clusterID";
-  /**
-   * Property to hold creation time of the storage.
-   */
-  private static final String CREATION_TIME = "cTime";
-
-  /**
-   * Constructs StorageInfo instance.
-   * @param type
-   *          Type of the node using the storage
-   * @param cid
-   *          Cluster ID
-   * @param cT
-   *          Cluster creation Time
-
-   * @throws IOException
-   */
-  public StorageInfo(NodeType type, String cid, long cT)
-      throws IOException {
-    Preconditions.checkNotNull(type);
-    Preconditions.checkNotNull(cid);
-    Preconditions.checkNotNull(cT);
-    properties.setProperty(NODE_TYPE, type.name());
-    properties.setProperty(CLUSTER_ID, cid);
-    properties.setProperty(CREATION_TIME, String.valueOf(cT));
-  }
-
-  public StorageInfo(NodeType type, File propertiesFile)
-      throws IOException {
-    this.properties = readFrom(propertiesFile);
-    verifyNodeType(type);
-    verifyClusterId();
-    verifyCreationTime();
-  }
-
-  public NodeType getNodeType() {
-    return NodeType.valueOf(properties.getProperty(NODE_TYPE));
-  }
-
-  public String getClusterID() {
-    return properties.getProperty(CLUSTER_ID);
-  }
-
-  public Long  getCreationTime() {
-    String creationTime = properties.getProperty(CREATION_TIME);
-    if(creationTime != null) {
-      return Long.parseLong(creationTime);
-    }
-    return null;
-  }
-
-  public String getProperty(String key) {
-    return properties.getProperty(key);
-  }
-
-  public void setProperty(String key, String value) {
-    properties.setProperty(key, value);
-  }
-
-  public void setClusterId(String clusterId) {
-    properties.setProperty(CLUSTER_ID, clusterId);
-  }
-
-  private void verifyNodeType(NodeType type)
-      throws InconsistentStorageStateException {
-    NodeType nodeType = getNodeType();
-    Preconditions.checkNotNull(nodeType);
-    if(type != nodeType) {
-      throw new InconsistentStorageStateException("Expected NodeType: " + type 
+
-          ", but found: " + nodeType);
-    }
-  }
-
-  private void verifyClusterId()
-      throws InconsistentStorageStateException {
-    String clusterId = getClusterID();
-    Preconditions.checkNotNull(clusterId);
-    if(clusterId.isEmpty()) {
-      throw new InconsistentStorageStateException("Cluster ID not found");
-    }
-  }
-
-  private void verifyCreationTime() {
-    Long creationTime = getCreationTime();
-    Preconditions.checkNotNull(creationTime);
-  }
-
-
-  public void writeTo(File to)
-      throws IOException {
-    try (RandomAccessFile file = new RandomAccessFile(to, "rws");
-         FileOutputStream out = new FileOutputStream(file.getFD())) {
-      file.seek(0);
-    /*
-     * If server is interrupted before this line,
-     * the version file will remain unchanged.
-     */
-      properties.store(out, null);
-    /*
-     * Now the new fields are flushed to the head of the file, but file
-     * length can still be larger then required and therefore the file can
-     * contain whole or corrupted fields from its old contents in the end.
-     * If server is interrupted here and restarted later these extra fields
-     * either should not effect server behavior or should be handled
-     * by the server correctly.
-     */
-      file.setLength(out.getChannel().position());
-    }
-  }
-
-  private Properties readFrom(File from) throws IOException {
-    try (RandomAccessFile file = new RandomAccessFile(from, "rws");
-        FileInputStream in = new FileInputStream(file.getFD())) {
-      Properties props = new Properties();
-      file.seek(0);
-      props.load(in);
-      return props;
-    }
-  }
-
-  /**
-   * Generate new clusterID.
-   *
-   * clusterID is a persistent attribute of the cluster.
-   * It is generated when the cluster is created and remains the same
-   * during the life cycle of the cluster.  When a new SCM node is initialized,
-   * if this is a new cluster, a new clusterID is generated and stored.
-   * @return new clusterID
-   */
-  public static String newClusterID() {
-    return "CID-" + UUID.randomUUID().toString();
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/package-info.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/package-info.java
deleted file mode 100644
index 6517e58..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/common/package-info.java
+++ /dev/null
@@ -1,18 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.common;
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
deleted file mode 100644
index 690e3b5..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkInfo.java
+++ /dev/null
@@ -1,185 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.helpers;
-
-import com.google.common.base.Preconditions;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Java class that represents ChunkInfo ProtoBuf class. This helper class 
allows
- * us to convert to and from protobuf to normal java.
- */
-public class ChunkInfo {
-  private final String chunkName;
-  private final long offset;
-  private final long len;
-  private String checksum;
-  private final Map<String, String> metadata;
-
-
-  /**
-   * Constructs a ChunkInfo.
-   *
-   * @param chunkName - File Name where chunk lives.
-   * @param offset    - offset where Chunk Starts.
-   * @param len       - Length of the Chunk.
-   */
-  public ChunkInfo(String chunkName, long offset, long len) {
-    this.chunkName = chunkName;
-    this.offset = offset;
-    this.len = len;
-    this.metadata = new TreeMap<>();
-  }
-
-  /**
-   * Adds metadata.
-   *
-   * @param key   - Key Name.
-   * @param value - Value.
-   * @throws IOException
-   */
-  public void addMetadata(String key, String value) throws IOException {
-    synchronized (this.metadata) {
-      if (this.metadata.containsKey(key)) {
-        throw new IOException("This key already exists. Key " + key);
-      }
-      metadata.put(key, value);
-    }
-  }
-
-  /**
-   * Gets a Chunkinfo class from the protobuf definitions.
-   *
-   * @param info - Protobuf class
-   * @return ChunkInfo
-   * @throws IOException
-   */
-  public static ChunkInfo getFromProtoBuf(ContainerProtos.ChunkInfo info)
-      throws IOException {
-    Preconditions.checkNotNull(info);
-
-    ChunkInfo chunkInfo = new ChunkInfo(info.getChunkName(), info.getOffset(),
-        info.getLen());
-
-    for (int x = 0; x < info.getMetadataCount(); x++) {
-      chunkInfo.addMetadata(info.getMetadata(x).getKey(),
-          info.getMetadata(x).getValue());
-    }
-
-
-    if (info.hasChecksum()) {
-      chunkInfo.setChecksum(info.getChecksum());
-    }
-    return chunkInfo;
-  }
-
-  /**
-   * Returns a ProtoBuf Message from ChunkInfo.
-   *
-   * @return Protocol Buffer Message
-   */
-  public ContainerProtos.ChunkInfo getProtoBufMessage() {
-    ContainerProtos.ChunkInfo.Builder builder = ContainerProtos
-        .ChunkInfo.newBuilder();
-
-    builder.setChunkName(this.getChunkName());
-    builder.setOffset(this.getOffset());
-    builder.setLen(this.getLen());
-    if (this.getChecksum() != null && !this.getChecksum().isEmpty()) {
-      builder.setChecksum(this.getChecksum());
-    }
-
-    for (Map.Entry<String, String> entry : metadata.entrySet()) {
-      OzoneProtos.KeyValue.Builder keyValBuilder =
-          OzoneProtos.KeyValue.newBuilder();
-      builder.addMetadata(keyValBuilder.setKey(entry.getKey())
-          .setValue(entry.getValue()).build());
-    }
-
-    return builder.build();
-  }
-
-  /**
-   * Returns the chunkName.
-   *
-   * @return - String
-   */
-  public String getChunkName() {
-    return chunkName;
-  }
-
-  /**
-   * Gets the start offset of the given chunk in physical file.
-   *
-   * @return - long
-   */
-  public long getOffset() {
-    return offset;
-  }
-
-  /**
-   * Returns the length of the Chunk.
-   *
-   * @return long
-   */
-  public long getLen() {
-    return len;
-  }
-
-  /**
-   * Returns the SHA256 value of this chunk.
-   *
-   * @return - Hash String
-   */
-  public String getChecksum() {
-    return checksum;
-  }
-
-  /**
-   * Sets the Hash value of this chunk.
-   *
-   * @param checksum - Hash String.
-   */
-  public void setChecksum(String checksum) {
-    this.checksum = checksum;
-  }
-
-  /**
-   * Returns Metadata associated with this Chunk.
-   *
-   * @return - Map of Key,values.
-   */
-  public Map<String, String> getMetadata() {
-    return metadata;
-  }
-
-  @Override
-  public String toString() {
-    return "ChunkInfo{" +
-        "chunkName='" + chunkName +
-        ", offset=" + offset +
-        ", len=" + len +
-        '}';
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
deleted file mode 100644
index 4eb211b..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ChunkUtils.java
+++ /dev/null
@@ -1,345 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-package org.apache.hadoop.ozone.container.common.helpers;
-
-import com.google.common.base.Preconditions;
-import com.google.protobuf.ByteString;
-import org.apache.commons.codec.binary.Hex;
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.container.common.impl.ChunkManagerImpl;
-import org.apache.hadoop.scm.container.common.helpers.Pipeline;
-import 
org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.channels.AsynchronousFileChannel;
-import java.nio.channels.FileLock;
-import java.nio.file.StandardOpenOption;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.util.concurrent.ExecutionException;
-
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
-    .Result.CHECKSUM_MISMATCH;
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
-    .Result.CONTAINER_INTERNAL_ERROR;
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
-    .Result.CONTAINER_NOT_FOUND;
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
-    .Result.INVALID_WRITE_SIZE;
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
-    .Result.IO_EXCEPTION;
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
-    .Result.OVERWRITE_FLAG_REQUIRED;
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
-    .Result.UNABLE_TO_FIND_CHUNK;
-import static org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos
-    .Result.UNABLE_TO_FIND_DATA_DIR;
-
-/**
- * Set of utility functions used by the chunk Manager.
- */
-public final class ChunkUtils {
-
-  /* Never constructed. */
-  private ChunkUtils() {
-  }
-
-  /**
-   * Checks if we are getting a request to overwrite an existing range of
-   * chunk.
-   *
-   * @param chunkFile - File
-   * @param chunkInfo - Buffer to write
-   * @return bool
-   */
-  public static boolean isOverWriteRequested(File chunkFile, ChunkInfo
-      chunkInfo) {
-
-    if (!chunkFile.exists()) {
-      return false;
-    }
-
-    long offset = chunkInfo.getOffset();
-    return offset < chunkFile.length();
-  }
-
-  /**
-   * Overwrite is permitted if an only if the user explicitly asks for it. We
-   * permit this iff the key/value pair contains a flag called
-   * [OverWriteRequested, true].
-   *
-   * @param chunkInfo - Chunk info
-   * @return true if the user asks for it.
-   */
-  public static boolean isOverWritePermitted(ChunkInfo chunkInfo) {
-    String overWrite = 
chunkInfo.getMetadata().get(OzoneConsts.CHUNK_OVERWRITE);
-    return (overWrite != null) &&
-        (!overWrite.isEmpty()) &&
-        (Boolean.valueOf(overWrite));
-  }
-
-  /**
-   * Validates chunk data and returns a file object to Chunk File that we are
-   * expected to write data to.
-   *
-   * @param pipeline - pipeline.
-   * @param data - container data.
-   * @param info - chunk info.
-   * @return File
-   * @throws StorageContainerException
-   */
-  public static File validateChunk(Pipeline pipeline, ContainerData data,
-      ChunkInfo info) throws StorageContainerException {
-
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-
-    File chunkFile = getChunkFile(pipeline, data, info);
-    if (ChunkUtils.isOverWriteRequested(chunkFile, info)) {
-      if (!ChunkUtils.isOverWritePermitted(info)) {
-        log.error("Rejecting write chunk request. Chunk overwrite " +
-            "without explicit request. {}", info.toString());
-        throw new StorageContainerException("Rejecting write chunk request. " +
-            "OverWrite flag required." + info.toString(),
-            OVERWRITE_FLAG_REQUIRED);
-      }
-    }
-    return chunkFile;
-  }
-
-  /**
-   * Validates that Path to chunk file exists.
-   *
-   * @param pipeline - Container Info.
-   * @param data - Container Data
-   * @param info - Chunk info
-   * @return - File.
-   * @throws StorageContainerException
-   */
-  public static File getChunkFile(Pipeline pipeline, ContainerData data,
-      ChunkInfo info) throws StorageContainerException {
-
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-    if (data == null) {
-      log.error("Invalid container Name: {}", pipeline.getContainerName());
-      throw new StorageContainerException("Unable to find the container Name:" 
+
-          " " +
-          pipeline.getContainerName(), CONTAINER_NOT_FOUND);
-    }
-
-    File dataDir = ContainerUtils.getDataDirectory(data).toFile();
-    if (!dataDir.exists()) {
-      log.error("Unable to find the data directory: {}", dataDir);
-      throw new StorageContainerException("Unable to find the data directory:" 
+
-          " " + dataDir, UNABLE_TO_FIND_DATA_DIR);
-    }
-
-    return dataDir.toPath().resolve(info.getChunkName()).toFile();
-
-  }
-
-  /**
-   * Writes the data in chunk Info to the specified location in the chunkfile.
-   *
-   * @param chunkFile - File to write data to.
-   * @param chunkInfo - Data stream to write.
-   * @param data - The data buffer.
-   * @throws StorageContainerException
-   */
-  public static void writeData(File chunkFile, ChunkInfo chunkInfo,
-      byte[] data) throws
-      StorageContainerException, ExecutionException, InterruptedException,
-      NoSuchAlgorithmException {
-
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-    if (data.length != chunkInfo.getLen()) {
-      String err = String.format("data array does not match the length " +
-              "specified. DataLen: %d Byte Array: %d",
-          chunkInfo.getLen(), data.length);
-      log.error(err);
-      throw new StorageContainerException(err, INVALID_WRITE_SIZE);
-    }
-
-    AsynchronousFileChannel file = null;
-    FileLock lock = null;
-
-    try {
-      file =
-          AsynchronousFileChannel.open(chunkFile.toPath(),
-              StandardOpenOption.CREATE,
-              StandardOpenOption.WRITE,
-              StandardOpenOption.SPARSE,
-              StandardOpenOption.SYNC);
-      lock = file.lock().get();
-      if (chunkInfo.getChecksum() != null &&
-          !chunkInfo.getChecksum().isEmpty()) {
-        verifyChecksum(chunkInfo, data, log);
-      }
-      int size = file.write(ByteBuffer.wrap(data), 
chunkInfo.getOffset()).get();
-      if (size != data.length) {
-        log.error("Invalid write size found. Size:{}  Expected: {} ", size,
-            data.length);
-        throw new StorageContainerException("Invalid write size found. " +
-            "Size: " + size + " Expected: " + data.length, INVALID_WRITE_SIZE);
-      }
-    } catch (IOException e) {
-      throw new StorageContainerException(e, IO_EXCEPTION);
-
-    } finally {
-      if (lock != null) {
-        try {
-          lock.release();
-        } catch (IOException e) {
-          log.error("Unable to release lock ??, Fatal Error.");
-          throw new StorageContainerException(e, CONTAINER_INTERNAL_ERROR);
-
-        }
-      }
-      if (file != null) {
-        try {
-          file.close();
-        } catch (IOException e) {
-          throw new StorageContainerException("Error closing chunk file",
-              e, CONTAINER_INTERNAL_ERROR);
-        }
-      }
-    }
-  }
-
-  /**
-   * Verifies the checksum of a chunk against the data buffer.
-   *
-   * @param chunkInfo - Chunk Info.
-   * @param data - data buffer
-   * @param log - log
-   * @throws NoSuchAlgorithmException
-   * @throws StorageContainerException
-   */
-  private static void verifyChecksum(ChunkInfo chunkInfo, byte[] data, Logger
-      log) throws NoSuchAlgorithmException, StorageContainerException {
-    MessageDigest sha = MessageDigest.getInstance(OzoneConsts.FILE_HASH);
-    sha.update(data);
-    if (!Hex.encodeHexString(sha.digest()).equals(
-        chunkInfo.getChecksum())) {
-      log.error("Checksum mismatch. Provided: {} , computed: {}",
-          chunkInfo.getChecksum(), DigestUtils.sha256Hex(sha.digest()));
-      throw new StorageContainerException("Checksum mismatch. Provided: " +
-          chunkInfo.getChecksum() + " , computed: " +
-          DigestUtils.sha256Hex(sha.digest()), CHECKSUM_MISMATCH);
-    }
-  }
-
-  /**
-   * Reads data from an existing chunk file.
-   *
-   * @param chunkFile - file where data lives.
-   * @param data - chunk definition.
-   * @return ByteBuffer
-   * @throws StorageContainerException
-   * @throws ExecutionException
-   * @throws InterruptedException
-   */
-  public static ByteBuffer readData(File chunkFile, ChunkInfo data) throws
-      StorageContainerException, ExecutionException, InterruptedException,
-      NoSuchAlgorithmException {
-    Logger log = LoggerFactory.getLogger(ChunkManagerImpl.class);
-
-    if (!chunkFile.exists()) {
-      log.error("Unable to find the chunk file. chunk info : {}",
-          data.toString());
-      throw new StorageContainerException("Unable to find the chunk file. " +
-          "chunk info " +
-          data.toString(), UNABLE_TO_FIND_CHUNK);
-    }
-
-    AsynchronousFileChannel file = null;
-    FileLock lock = null;
-    try {
-      file =
-          AsynchronousFileChannel.open(chunkFile.toPath(),
-              StandardOpenOption.READ);
-      lock = file.lock(data.getOffset(), data.getLen(), true).get();
-
-      ByteBuffer buf = ByteBuffer.allocate((int) data.getLen());
-      file.read(buf, data.getOffset()).get();
-
-      if (data.getChecksum() != null && !data.getChecksum().isEmpty()) {
-        verifyChecksum(data, buf.array(), log);
-      }
-
-      return buf;
-    } catch (IOException e) {
-      throw new StorageContainerException(e, IO_EXCEPTION);
-    } finally {
-      if (lock != null) {
-        try {
-          lock.release();
-        } catch (IOException e) {
-          log.error("I/O error is lock release.");
-        }
-      }
-      if (file != null) {
-        IOUtils.closeStream(file);
-      }
-    }
-  }
-
-  /**
-   * Returns a CreateContainer Response. This call is used by create and delete
-   * containers which have null success responses.
-   *
-   * @param msg Request
-   * @return Response.
-   */
-  public static ContainerProtos.ContainerCommandResponseProto
-      getChunkResponse(ContainerProtos.ContainerCommandRequestProto msg) {
-    return ContainerUtils.getContainerResponse(msg);
-  }
-
-  /**
-   * Gets a response to the read chunk calls.
-   *
-   * @param msg - Msg
-   * @param data - Data
-   * @param info - Info
-   * @return Response.
-   */
-  public static ContainerProtos.ContainerCommandResponseProto
-      getReadChunkResponse(ContainerProtos.ContainerCommandRequestProto msg,
-      byte[] data, ChunkInfo info) {
-    Preconditions.checkNotNull(msg);
-
-    ContainerProtos.ReadChunkResponseProto.Builder response =
-        ContainerProtos.ReadChunkResponseProto.newBuilder();
-    response.setChunkData(info.getProtoBufMessage());
-    response.setData(ByteString.copyFrom(data));
-    response.setPipeline(msg.getReadChunk().getPipeline());
-
-    ContainerProtos.ContainerCommandResponseProto.Builder builder =
-        ContainerUtils.getContainerResponse(msg, ContainerProtos.Result
-            .SUCCESS, "");
-    builder.setReadChunk(response);
-    return builder.build();
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ce23d9ad/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
deleted file mode 100644
index d0886e1..0000000
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerData.java
+++ /dev/null
@@ -1,326 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- *  with the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- */
-
-package org.apache.hadoop.ozone.container.common.helpers;
-
-import org.apache.commons.codec.digest.DigestUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
-import org.apache.hadoop.ozone.OzoneConsts;
-import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
-import org.apache.hadoop.scm.ScmConfigKeys;
-import org.apache.hadoop.util.Time;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Map;
-import java.util.TreeMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-/**
- * This class maintains the information about a container in the ozone world.
- * <p>
- * A container is a name, along with metadata- which is a set of key value
- * pair.
- */
-public class ContainerData {
-
-  private final String containerName;
-  private final Map<String, String> metadata;
-  private String dbPath;  // Path to Level DB Store.
-  // Path to Physical file system where container and checksum are stored.
-  private String containerFilePath;
-  private String hash;
-  private AtomicLong bytesUsed;
-  private long maxSize;
-  private Long containerID;
-  private OzoneProtos.LifeCycleState state;
-
-  /**
-   * Constructs a  ContainerData Object.
-   *
-   * @param containerName - Name
-   */
-  public ContainerData(String containerName, Long containerID,
-      Configuration conf) {
-    this.metadata = new TreeMap<>();
-    this.containerName = containerName;
-    this.maxSize = 
conf.getLong(ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_KEY,
-        ScmConfigKeys.SCM_CONTAINER_CLIENT_MAX_SIZE_DEFAULT) * OzoneConsts.GB;
-    this.bytesUsed =  new AtomicLong(0L);
-    this.containerID = containerID;
-    this.state = OzoneProtos.LifeCycleState.OPEN;
-  }
-
-  /**
-   * Constructs a ContainerData object from ProtoBuf classes.
-   *
-   * @param protoData - ProtoBuf Message
-   * @throws IOException
-   */
-  public static ContainerData getFromProtBuf(
-      ContainerProtos.ContainerData protoData, Configuration conf)
-      throws IOException {
-    ContainerData data = new ContainerData(protoData.getName(),
-        protoData.getContainerID(), conf);
-    for (int x = 0; x < protoData.getMetadataCount(); x++) {
-      data.addMetadata(protoData.getMetadata(x).getKey(),
-          protoData.getMetadata(x).getValue());
-    }
-
-    if (protoData.hasContainerPath()) {
-      data.setContainerPath(protoData.getContainerPath());
-    }
-
-    if (protoData.hasDbPath()) {
-      data.setDBPath(protoData.getDbPath());
-    }
-
-    if (protoData.hasState()) {
-      data.setState(protoData.getState());
-    }
-
-    if(protoData.hasHash()) {
-      data.setHash(protoData.getHash());
-    }
-
-    if (protoData.hasBytesUsed()) {
-      data.setBytesUsed(protoData.getBytesUsed());
-    }
-
-    if (protoData.hasSize()) {
-      data.setMaxSize(protoData.getSize());
-    }
-    return data;
-  }
-
-  /**
-   * Returns a ProtoBuf Message from ContainerData.
-   *
-   * @return Protocol Buffer Message
-   */
-  public ContainerProtos.ContainerData getProtoBufMessage() {
-    ContainerProtos.ContainerData.Builder builder = ContainerProtos
-        .ContainerData.newBuilder();
-    builder.setName(this.getContainerName());
-    builder.setContainerID(this.getContainerID());
-
-    if (this.getDBPath() != null) {
-      builder.setDbPath(this.getDBPath());
-    }
-
-    if (this.getHash() != null) {
-      builder.setHash(this.getHash());
-    }
-
-    if (this.getContainerPath() != null) {
-      builder.setContainerPath(this.getContainerPath());
-    }
-
-    builder.setState(this.getState());
-
-    for (Map.Entry<String, String> entry : metadata.entrySet()) {
-      OzoneProtos.KeyValue.Builder keyValBuilder =
-          OzoneProtos.KeyValue.newBuilder();
-      builder.addMetadata(keyValBuilder.setKey(entry.getKey())
-          .setValue(entry.getValue()).build());
-    }
-
-    if (this.getBytesUsed() >= 0) {
-      builder.setBytesUsed(this.getBytesUsed());
-    }
-
-    if (this.getKeyCount() >= 0) {
-      builder.setKeyCount(this.getKeyCount());
-    }
-
-    if (this.getMaxSize() >= 0) {
-      builder.setSize(this.getMaxSize());
-    }
-
-    return builder.build();
-  }
-
-  /**
-   * Returns the name of the container.
-   *
-   * @return - name
-   */
-  public String getContainerName() {
-    return containerName;
-  }
-
-  /**
-   * Adds metadata.
-   */
-  public void addMetadata(String key, String value) throws IOException {
-    synchronized (this.metadata) {
-      if (this.metadata.containsKey(key)) {
-        throw new IOException("This key already exists. Key " + key);
-      }
-      metadata.put(key, value);
-    }
-  }
-
-  /**
-   * Returns all metadata.
-   */
-  public Map<String, String> getAllMetadata() {
-    synchronized (this.metadata) {
-      return Collections.unmodifiableMap(this.metadata);
-    }
-  }
-
-  /**
-   * Returns value of a key.
-   */
-  public String getValue(String key) {
-    synchronized (this.metadata) {
-      return metadata.get(key);
-    }
-  }
-
-  /**
-   * Deletes a metadata entry from the map.
-   *
-   * @param key - Key
-   */
-  public void deleteKey(String key) {
-    synchronized (this.metadata) {
-      metadata.remove(key);
-    }
-  }
-
-  /**
-   * Returns path.
-   *
-   * @return - path
-   */
-  public String getDBPath() {
-    return dbPath;
-  }
-
-  /**
-   * Sets path.
-   *
-   * @param path - String.
-   */
-  public void setDBPath(String path) {
-    this.dbPath = path;
-  }
-
-  /**
-   * This function serves as the generic key for ContainerCache class. Both
-   * ContainerData and ContainerKeyData overrides this function to 
appropriately
-   * return the right name that can  be used in ContainerCache.
-   *
-   * @return String Name.
-   */
-  public String getName() {
-    return getContainerName();
-  }
-
-  /**
-   * Get container file path.
-   * @return - Physical path where container file and checksum is stored.
-   */
-  public String getContainerPath() {
-    return containerFilePath;
-  }
-
-  /**
-   * Set container Path.
-   * @param containerPath - File path.
-   */
-  public void setContainerPath(String containerPath) {
-    this.containerFilePath = containerPath;
-  }
-
-  /**
-   * Get container ID.
-   * @return - container ID.
-   */
-  public synchronized Long getContainerID() {
-    return containerID;
-  }
-
-  public synchronized  void setState(OzoneProtos.LifeCycleState state) {
-    this.state = state;
-  }
-
-  public synchronized OzoneProtos.LifeCycleState getState() {
-    return this.state;
-  }
-
-  /**
-   * checks if the container is open.
-   * @return - boolean
-   */
-  public synchronized  boolean isOpen() {
-    return OzoneProtos.LifeCycleState.OPEN == state;
-  }
-
-  /**
-   * Marks this container as closed.
-   */
-  public synchronized void closeContainer() {
-    // TODO: closed or closing here
-    setState(OzoneProtos.LifeCycleState.CLOSED);
-
-    // Some thing brain dead for now. name + Time stamp of when we get the 
close
-    // container message.
-    setHash(DigestUtils.sha256Hex(this.getContainerName() +
-        Long.toString(Time.monotonicNow())));
-  }
-
-  /**
-   * Final hash for this container.
-   * @return - Hash
-   */
-  public String getHash() {
-    return hash;
-  }
-
-  public void setHash(String hash) {
-    this.hash = hash;
-  }
-
-  public void setMaxSize(long maxSize) {
-    this.maxSize = maxSize;
-  }
-
-  public long getMaxSize() {
-    return maxSize;
-  }
-
-  public long getKeyCount() {
-    return metadata.size();
-  }
-
-  public void setBytesUsed(long used) {
-    this.bytesUsed.set(used);
-  }
-
-  public long addBytesUsed(long delta) {
-    return this.bytesUsed.addAndGet(delta);
-  }
-
-  public long getBytesUsed() {
-    return bytesUsed.get();
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to