http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/resources/ozone-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
new file mode 100644
index 0000000..8018d29
--- /dev/null
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -0,0 +1,1031 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+<!-- Do not modify this file directly.  Instead, copy entries that you -->
+<!-- wish to modify from this file into ozone-site.xml and change them -->
+<!-- there.  If ozone-site.xml does not already exist, create it.      -->
+
+<!--Tags supported are OZONE, CBLOCK, MANAGEMENT, SECURITY, PERFORMANCE,   -->
+<!--DEBUG, CLIENT, SERVER, KSM, SCM, CRITICAL, RATIS, CONTAINER, REQUIRED, -->
+<!--REST, STORAGE, PIPELINE, STANDALONE                                    -->
+
+<configuration>
+
+  <!--Container Settings used by Datanode-->
+  <property>
+    <name>ozone.container.cache.size</name>
+    <value>1024</value>
+    <tag>PERFORMANCE, CONTAINER, STORAGE</tag>
+    <description>The open container is cached on the data node side. We 
maintain
+      an LRU
+      cache for caching the recently used containers. This setting controls the
+      size of that cache.
+    </description>
+  </property>
+  <property>
+    <name>dfs.container.ipc</name>
+    <value>9859</value>
+    <tag>OZONE, CONTAINER, MANAGEMENT</tag>
+    <description>The ipc port number of container.</description>
+  </property>
+  <property>
+    <name>dfs.container.ipc.random.port</name>
+    <value>false</value>
+    <tag>OZONE, DEBUG, CONTAINER</tag>
+    <description>Allocates a random free port for ozone container. This is used
+      only while
+      running unit tests.
+    </description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.datanode.storage.dir</name>
+    <value/>
+    <tag>OZONE, CONTAINER, STORAGE, MANAGEMENT, RATIS</tag>
+    <description>This directory is used for storing Ratis metadata like logs. 
If
+      this is
+      not set then default metadata dirs is used. A warning will be logged if
+      this not set. Ideally, this should be mapped to a fast disk like an SSD.
+    </description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.enabled</name>
+    <value>false</value>
+    <tag>OZONE, MANAGEMENT, PIPELINE, RATIS</tag>
+    <description>Ozone supports different kinds of replication pipelines. Ratis
+      is one of
+      the replication pipeline supported by ozone.
+    </description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.ipc</name>
+    <value>9858</value>
+    <tag>OZONE, CONTAINER, PIPELINE, RATIS, MANAGEMENT</tag>
+    <description>The ipc port number of container.</description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.ipc.random.port</name>
+    <value>false</value>
+    <tag>OZONE,DEBUG</tag>
+    <description>Allocates a random free port for ozone ratis port for the
+      container. This
+      is used only while running unit tests.
+    </description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.rpc.type</name>
+    <value>GRPC</value>
+    <tag>OZONE, RATIS, MANAGEMENT</tag>
+    <description>Ratis supports different kinds of transports like netty, GRPC,
+      Hadoop RPC
+      etc. This picks one of those for this cluster.
+    </description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.num.write.chunk.threads</name>
+    <value>60</value>
+    <tag>OZONE, RATIS, PERFORMANCE</tag>
+    <description>Maximum number of threads in the thread pool that Ratis
+      will use for writing chunks (60 by default).
+    </description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.segment.size</name>
+    <value>1073741824</value>
+    <tag>OZONE, RATIS, PERFORMANCE</tag>
+    <description>The size of the raft segment used by Apache Ratis on 
datanodes.
+      (1 GB by default)
+    </description>
+  </property>
+  <property>
+    <name>dfs.container.ratis.segment.preallocated.size</name>
+    <value>134217728</value>
+    <tag>OZONE, RATIS, PERFORMANCE</tag>
+    <description>The size of the buffer which is preallocated for raft segment
+      used by Apache Ratis on datanodes.(128 MB by default)
+    </description>
+  </property>
+  <property>
+    <name>ozone.container.report.interval</name>
+    <value>60000ms</value>
+    <tag>OZONE, CONTAINER, MANAGEMENT</tag>
+    <description>Time interval of the datanode to send container report. Each
+      datanode periodically send container report upon receive
+      sendContainerReport from SCM. Unit could be defined with
+      postfix (ns,ms,s,m,h,d)</description>
+  </property>
+  <!--Ozone Settings-->
+  <property>
+    <name>ozone.administrators</name>
+    <value/>
+    <tag>OZONE, SECURITY</tag>
+    <description>Ozone administrator users delimited by the comma.
+      If not set, only the user who launches an ozone service will be the admin
+      user. This property must be set if ozone services are started by 
different
+      users. Otherwise, the RPC layer will reject calls from other servers 
which
+      are started by users not in the list.
+    </description>
+  </property>
+  <property>
+    <name>ozone.block.deleting.container.limit.per.interval</name>
+    <value>10</value>
+    <tag>OZONE, PERFORMANCE, SCM</tag>
+    <description>A maximum number of containers to be scanned by block deleting
+      service per
+      time interval. The block deleting service spawns a thread to handle block
+      deletions in a container. This property is used to throttle the number of
+      threads spawned for block deletions.
+    </description>
+  </property>
+  <property>
+    <name>ozone.block.deleting.limit.per.task</name>
+    <value>1000</value>
+    <tag>OZONE, PERFORMANCE, SCM</tag>
+    <description>A maximum number of blocks to be deleted by block deleting
+      service per
+      time interval. This property is used to throttle the actual number of
+      block deletions on a data node per container.
+    </description>
+  </property>
+  <property>
+    <name>ozone.block.deleting.service.interval</name>
+    <value>1m</value>
+    <tag>OZONE, PERFORMANCE, SCM</tag>
+    <description>Time interval of the block deleting service.
+      The block deleting service runs on each datanode periodically and
+      deletes blocks queued for deletion. Unit could be defined with
+      postfix (ns,ms,s,m,h,d)
+    </description>
+  </property>
+  <property>
+    <name>ozone.block.deleting.service.timeout</name>
+    <value>300000ms</value>
+    <tag>OZONE, PERFORMANCE, SCM</tag>
+    <description>A timeout value of block deletion service. If this is set
+      greater than 0,
+      the service will stop waiting for the block deleting completion after 
this
+      time. If timeout happens to a large proportion of block deletion, this
+      needs to be increased with ozone.block.deleting.limit.per.task. This
+      setting supports multiple time unit suffixes as described in
+      dfs.heartbeat.interval. If no suffix is specified, then milliseconds is
+      assumed.
+    </description>
+  </property>
+  <property>
+    <name>ozone.client.connection.timeout</name>
+    <value>5000ms</value>
+    <tag>OZONE, PERFORMANCE, CLIENT</tag>
+    <description>Connection timeout for Ozone client in milliseconds.
+    </description>
+  </property>
+  <property>
+    <name>ozone.client.protocol</name>
+    <value>org.apache.hadoop.ozone.client.rpc.RpcClient</value>
+    <tag>OZONE, CLIENT, MANAGEMENT</tag>
+    <description>Protocol class to be used by the client to connect to ozone
+      cluster.
+      The build-in implementation includes:
+      org.apache.hadoop.ozone.client.rpc.RpcClient for RPC
+      org.apache.hadoop.ozone.client.rest.RestClient for REST
+      The default is the RpClient. Please do not change this unless you have a
+      very good understanding of what you are doing.
+    </description>
+  </property>
+  <property>
+    <name>ozone.client.socket.timeout</name>
+    <value>5000ms</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>Socket timeout for Ozone client. Unit could be defined with
+      postfix (ns,ms,s,m,h,d)</description>
+  </property>
+  <property>
+    <name>ozone.enabled</name>
+    <value>false</value>
+    <tag>OZONE, REQUIRED</tag>
+    <description>
+      Status of the Ozone Object Storage service is enabled.
+      Set to true to enable Ozone.
+      Set to false to disable Ozone.
+      Unless this value is set to true, Ozone services will not be started in
+      the cluster.
+
+      Please note: By default ozone is disabled on a hadoop cluster.
+    </description>
+  </property>
+  <property>
+    <name>ozone.handler.type</name>
+    <value>distributed</value>
+    <tag>OZONE, REST</tag>
+    <description>
+      Tells ozone which storage handler to use. The possible values are:
+      distributed - The Ozone distributed storage handler, which speaks to
+      KSM/SCM on the backend and provides REST services to clients.
+      local - Local Storage handler strictly for testing - To be removed.
+    </description>
+  </property>
+  <property>
+    <name>ozone.key.deleting.limit.per.task</name>
+    <value>1000</value>
+    <tag>KSM, PERFORMANCE</tag>
+    <description>
+      A maximum number of keys to be scanned by key deleting service
+      per time interval in KSM. Those keys are sent to delete metadata and
+      generate transactions in SCM for next async deletion between SCM
+      and DataNode.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.address</name>
+    <value/>
+    <tag>KSM, REQUIRED</tag>
+    <description>
+      The address of the Ozone KSM service. This allows clients to discover
+      the KSMs address.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.group.rights</name>
+    <value>READ_WRITE</value>
+    <tag>KSM, SECURITY</tag>
+    <description>
+      Default group permissions in Ozone KSM.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.handler.count.key</name>
+    <value>20</value>
+    <tag>KSM, PERFORMANCE</tag>
+    <description>
+      The number of RPC handler threads for KSM service endpoints.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.http-address</name>
+    <value>0.0.0.0:9874</value>
+    <tag>KSM, MANAGEMENT</tag>
+    <description>
+      The address and the base port where the KSM web UI will listen on.
+
+      If the port is 0, then the server will start on a free port. However, it
+      is best to specify a well-known port, so it is easy to connect and see
+      the KSM management UI.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.http-bind-host</name>
+    <value>0.0.0.0</value>
+    <tag>KSM, MANAGEMENT</tag>
+    <description>
+      The actual address the KSM web server will bind to. If this optional
+      the address is set, it overrides only the hostname portion of
+      ozone.ksm.http-address.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.http.enabled</name>
+    <value>true</value>
+    <tag>KSM, MANAGEMENT</tag>
+    <description>
+      Property to enable or disable KSM web user interface.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.https-address</name>
+    <value>0.0.0.0:9875</value>
+    <tag>KSM, MANAGEMENT, SECURITY</tag>
+    <description>
+      The address and the base port where the KSM web UI will listen
+      on using HTTPS.
+      If the port is 0 then the server will start on a free port.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.https-bind-host</name>
+    <value>0.0.0.0</value>
+    <tag>KSM, MANAGEMENT, SECURITY</tag>
+    <description>
+      The actual address the KSM web server will bind to using HTTPS.
+      If this optional address is set, it overrides only the hostname portion 
of
+      ozone.ksm.http-address.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.keytab.file</name>
+    <value/>
+    <tag>KSM, SECURITY</tag>
+    <description>
+      The keytab file for Kerberos authentication in KSM.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.db.cache.size.mb</name>
+    <value>128</value>
+    <tag>KSM, PERFORMANCE</tag>
+    <description>
+      The size of KSM DB cache in MB that used for caching files.
+      This value is set to an abnormally low value in the default 
configuration.
+      That is to make unit testing easy. Generally, this value should be set to
+      something like 16GB or more, if you intend to use Ozone at scale.
+
+      A large value for this key allows a proportionally larger amount of KSM
+      metadata to be cached in memory. This makes KSM operations faster.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.user.max.volume</name>
+    <value>1024</value>
+    <tag>KSM, MANAGEMENT</tag>
+    <description>
+      The maximum number of volumes a user can have on a cluster.Increasing or
+      decreasing this number has no real impact on ozone cluster. This is
+      defined only for operational purposes. Only an administrator can create a
+      volume, once a volume is created there are no restrictions on the number
+      of buckets or keys inside each bucket a user can create.
+    </description>
+  </property>
+  <property>
+    <name>ozone.ksm.user.rights</name>
+    <value>READ_WRITE</value>
+    <tag>KSM, SECURITY</tag>
+    <description>
+      Default user permissions used in KSM.
+    </description>
+  </property>
+  <property>
+    <name>ozone.localstorage.root</name>
+    <value>${hadoop.tmp.dir}/ozone</value>
+    <tag>OZONE, DEBUG</tag>
+    <description>
+      This is used only for testing purposes. This value is used by the local
+      storage handler to simulate a REST backend. This is useful only when
+      debugging the REST front end independent of KSM and SCM. To be removed.
+    </description>
+  </property>
+  <property>
+    <name>ozone.metadata.dirs</name>
+    <value/>
+    <tag>OZONE, KSM, SCM, CONTAINER, REQUIRED, STORAGE</tag>
+    <description>
+      Ozone metadata is shared among KSM, which acts as the namespace
+      manager for ozone, SCM which acts as the block manager and data nodes
+      which maintain the name of the key(Key Name and BlockIDs). This
+      replicated and distributed metadata store is maintained under the
+      directory pointed by this key. Since metadata can be I/O intensive, at
+      least on KSM and SCM we recommend having SSDs. If you have the luxury
+      of mapping this path to SSDs on all machines in the cluster, that will
+      be excellent.
+
+      If Ratis metadata directories are not specified, Ratis server will emit a
+      warning and use this path for storing its metadata too.
+    </description>
+  </property>
+  <property>
+    <name>ozone.metastore.impl</name>
+    <value>RocksDB</value>
+    <tag>OZONE, KSM, SCM, CONTAINER, STORAGE</tag>
+    <description>
+      Ozone metadata store implementation. Ozone metadata are well
+      distributed to multiple services such as ksm, scm. They are stored in
+      some local key-value databases. This property determines which database
+      library to use. Supported value is either LevelDB or RocksDB.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.metastore.rocksdb.statistics</name>
+    <value>ALL</value>
+    <tag>OZONE, KSM, SCM, STORAGE, PERFORMANCE</tag>
+    <description>
+      The statistics level of the rocksdb store. If you use any value from
+      org.rocksdb.StatsLevel (eg. ALL or EXCEPT_DETAILED_TIMERS), the rocksdb
+      statistics will be exposed over JMX bean with the choosed setting. Set
+      it to OFF to not initialize rocksdb statistics at all. Please note that
+      collection of statistics could have 5-10% performance penalty.
+      Check the rocksdb documentation for more details.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.scm.block.client.address</name>
+    <value/>
+    <tag>OZONE, SCM</tag>
+    <description>The address of the Ozone SCM block client service. If not
+      defined value of ozone.scm.client.address is used.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.block.client.bind.host</name>
+    <value>0.0.0.0</value>
+    <tag>OZONE, SCM</tag>
+    <description>
+      The hostname or IP address used by the SCM block client
+      endpoint to bind.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.block.client.port</name>
+    <value>9863</value>
+    <tag>OZONE, SCM</tag>
+    <description>
+      The port number of the Ozone SCM block client service.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.block.deletion.max.retry</name>
+    <value>4096</value>
+    <tag>OZONE, SCM</tag>
+    <description>
+      SCM wraps up many blocks in a deletion transaction and sends that to data
+      node for physical deletion periodically. This property determines how 
many
+      times SCM is going to retry sending a deletion operation to the data 
node.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.block.size.in.mb</name>
+    <value>256</value>
+    <tag>OZONE, SCM</tag>
+    <description>
+      The default size of a scm block in bytes. This is maps to the default
+      Ozone block size.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.chunk.size</name>
+    <value>16777216</value>
+    <tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>
+    <description>
+      The chunk size for reading/writing chunk operations in bytes.
+
+      The chunk size defaults to 8MB. If the value configured is more than the
+      maximum size (16MB), it will be reset to the maximum size. This maps to
+      the network packet sizes and file write operations in the client to
+      datanode protocol.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.client.address</name>
+    <value/>
+    <tag>OZONE, SCM, REQUIRED</tag>
+    <description>
+      The address of the Ozone SCM client service. This is a required setting.
+
+      It is a string in the host:port format. The port number is optional
+      and defaults to 9860.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.client.bind.host</name>
+    <value>0.0.0.0</value>
+    <tag>OZONE, SCM, MANAGEMENT</tag>
+    <description>The hostname or IP address used by the SCM client endpoint to
+      bind.
+      This setting is used by the SCM only and never used by clients.
+
+      The setting can be useful in multi-homed setups to restrict the
+      availability of the SCM client service to a specific interface.
+
+      The default is appropriate for most clusters.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.client.port</name>
+    <value>9860</value>
+    <tag>OZONE, SCM, MANAGEMENT</tag>
+    <description>The port number of the Ozone SCM client service.</description>
+  </property>
+  <property>
+    <name>ozone.scm.container.deletion-choosing.policy</name>
+    <value>
+      
org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy
+    </value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      The policy used for choosing desire containers for block deletion.
+      Datanode selects some containers to process block deletion
+      in a certain interval defined by ozone.block.deleting.service.interval.
+      The number of containers to process in each interval is defined
+      by ozone.block.deleting.container.limit.per.interval. This property is
+      used to configure the policy applied while selecting containers.
+      There are two policies supporting now:
+      RandomContainerDeletionChoosingPolicy and
+      TopNOrderedContainerDeletionChoosingPolicy.
+      
org.apache.hadoop.ozone.container.common.impl.RandomContainerDeletionChoosingPolicy
+      implements a simply random policy that to return a random list of
+      containers.
+      
org.apache.hadoop.ozone.container.common.impl.TopNOrderedContainerDeletionChoosingPolicy
+      implements a policy that choosing top count number of containers in a
+      pending-deletion-blocks's num
+      based descending order.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.container.placement.impl</name>
+    <value>
+      
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRandom
+    </value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>Placement policy class for containers.
+      Defaults to SCMContainerPlacementRandom.class
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.container.provision_batch_size</name>
+    <value>20</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>Pre-provision specified number of containers for block
+      allocation.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.container.report.processing.interval</name>
+    <value>60s</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>Time interval for scm to process container reports
+      for a node pool. Scm handles node pool reports in a cyclic clock
+      manner, it fetches pools periodically with this time interval.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.container.reports.wait.timeout</name>
+    <value>300s</value>
+    <tag>OZONE, PERFORMANCE, MANAGEMENT</tag>
+    <description>Maximum time to wait in seconds for processing all container
+      reports from
+      a node pool. It determines the timeout for a
+      node pool report.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.container.size.gb</name>
+    <value>5</value>
+    <tag>OZONE, PERFORMANCE, MANAGEMENT</tag>
+    <description>
+      Default container size used by Ozone. This value is specified
+      in GB.
+      There are two considerations while picking this number. The speed at 
which
+      a container can be replicated, determined by the network speed and the
+      metadata that each container generates. So selecting a large number
+      creates less SCM metadata, but recovery time will be more. 5GB is a 
number
+      that maps to quick replication times in gigabit networks, but still
+      balances the amount of metadata.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.datanode.address</name>
+    <value/>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      The address of the Ozone SCM service used for internal
+      communication between the DataNodes and the SCM.
+
+      It is a string in the host:port format. The port number is optional
+      and defaults to 9861.
+
+      This setting is optional. If unspecified then the hostname portion
+      is picked from the ozone.scm.client.address setting and the
+      default service port of 9861 is chosen.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.datanode.bind.host</name>
+    <value/>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      The hostname or IP address used by the SCM service endpoint to
+      bind.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.datanode.id</name>
+    <value/>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>The path that datanodes will use to store the datanode ID.
+      If this value is not set, then datanode ID is created under the
+      metadata directory.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.datanode.port</name>
+    <value>9861</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      The port number of the Ozone SCM service.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.db.cache.size.mb</name>
+    <value>128</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>SCM keeps track of the Containers in the cluster. This DB 
holds
+      the container metadata. This value is set to a small value to make the
+      unit
+      testing runs smooth. In production, we recommend a value of 16GB or
+      higher. This allows SCM to avoid disk I/O's while looking up the 
container
+      location.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.dead.node.interval</name>
+    <value>10m</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      The interval between heartbeats before a node is tagged as dead.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.handler.count.key</name>
+    <value>10</value>
+    <tag>OZONE, MANAGEMENT, PERFORMANCE</tag>
+    <description>
+      The number of RPC handler threads for each SCM service
+      endpoint.
+
+      The default is appropriate for small clusters (tens of nodes).
+
+      Set a value that is appropriate for the cluster size. Generally, HDFS
+      recommends RPC handler count is set to 20 * log2(Cluster Size) with an
+      upper limit of 200. However, SCM will not have the same amount of
+      traffic as Namenode, so a value much smaller than that will work well 
too.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.heartbeat.interval</name>
+    <value>30s</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      The heartbeat interval from a data node to SCM. Yes,
+      it is not three but 30, since most data nodes will heart beating via 
Ratis
+      heartbeats. If a client is not able to talk to a data node, it will 
notify
+      KSM/SCM eventually. So a 30 second HB seems to work. This assumes that
+      replication strategy used is Ratis if not, this value should be set to
+      something smaller like 3 seconds.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.heartbeat.log.warn.interval.count</name>
+    <value>10</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      Defines how frequently we will log the missing of a heartbeat to SCM.
+      For example in the default case, we will write a warning message for each
+      ten consecutive heartbeats that we miss to SCM. This helps in reducing
+      clutter in a data node log, but trade off is that logs will have less of
+      this statement.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.heartbeat.rpc-timeout</name>
+    <value>1000</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      Timeout value for the RPC from Datanode to SCM in milliseconds.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.heartbeat.thread.interval</name>
+    <value>3s</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      When a heartbeat from the data node arrives on SCM, It is queued for
+      processing with the time stamp of when the heartbeat arrived. There is a
+      heartbeat processing thread inside SCM that runs at a specified interval.
+      This value controls how frequently this thread is run.
+
+      There are some assumptions build into SCM such as this value should allow
+      the heartbeat processing thread to run at least three times more
+      frequently than heartbeats and at least five times more than stale node
+      detection time. If you specify a wrong value, SCM will gracefully refuse
+      to run. For more info look at the node manager tests in SCM.
+
+      In short, you don't need to change this.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.http-address</name>
+    <value>0.0.0.0:9876</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      The address and the base port where the SCM web ui will listen on.
+
+      If the port is 0 then the server will start on a free port.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.http-bind-host</name>
+    <value>0.0.0.0</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      The actual address the SCM web server will bind to. If this
+      optional address is set, it overrides only the hostname portion of
+      ozone.scm.http-address.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.http.enabled</name>
+    <value>true</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      Property to enable or disable SCM web ui.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.https-address</name>
+    <value>0.0.0.0:9877</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      The address and the base port where the SCM web UI will listen
+      on using HTTPS.
+
+      If the port is 0 then the server will start on a free port.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.https-bind-host</name>
+    <value>0.0.0.0</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      The actual address the SCM web server will bind to using HTTPS.
+      If this optional address is set, it overrides only the hostname portion 
of
+      ozone.scm.http-address.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.keytab.file</name>
+    <value/>
+    <tag>OZONE, SECURITY</tag>
+    <description>
+      The keytab file for Kerberos authentication in SCM.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.max.container.report.threads</name>
+    <value>100</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>
+      Maximum number of threads to process container reports in scm.
+      Each container report from a data node is processed by scm in a worker
+      thread, fetched from a thread pool. This property is used to control the
+      maximum size of the thread pool.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.max.hb.count.to.process</name>
+    <value>5000</value>
+    <tag>OZONE, MANAGEMENT, PERFORMANCE</tag>
+    <description>
+      The maximum number of heartbeat to process per loop of the
+      heartbeat process thread. Please see
+      ozone.scm.heartbeat.thread.interval
+      for more info.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.max.nodepool.processing.threads</name>
+    <value>1</value>
+    <tag>OZONE, MANAGEMENT, PERFORMANCE</tag>
+    <description>
+      Number of node pools to process in parallel.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.names</name>
+    <value/>
+    <tag>OZONE</tag>
+    <description>
+      The value of this property is a set of DNS | DNS:PORT | IP
+      Address | IP:PORT. Written as a comma separated string. e.g. scm1,
+      scm2:8020, 7.7.7.7:7777.
+      This property allows datanodes to discover where SCM is, so that
+      datanodes can send heartbeat to SCM.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.stale.node.interval</name>
+    <value>90s</value>
+    <tag>OZONE, MANAGEMENT</tag>
+    <description>
+      The interval for stale node flagging. Please
+      see ozone.scm.heartbeat.thread.interval before changing this value.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.max.nodepool.processing.threads</name>
+    <value>1</value>
+    <tag>OZONE, SCM</tag>
+    <description>
+      Controls the number of node pools that can be processed in parallel by
+      Container Supervisor.
+    </description>
+  </property>
+  <property>
+    <name>ozone.trace.enabled</name>
+    <value>false</value>
+    <tag>OZONE, DEBUG</tag>
+    <description>
+      Setting this flag to true dumps the HTTP request/ response in
+      the logs. Very useful when debugging REST protocol.
+    </description>
+  </property>
+  <property>
+    <name>ozone.web.authentication.kerberos.principal</name>
+    <value/>
+    <tag>OZONE, SECURITY</tag>
+    <description>
+      The server principal used by the SCM and KSM for web UI SPNEGO
+      authentication when Kerberos security is enabled. This is typically set 
to
+      HTTP/_h...@realm.tld The SPNEGO server principal begins with the prefix
+      HTTP/ by convention.
+
+      If the value is '*', the web server will attempt to login with
+      every principal specified in the keytab file.
+    </description>
+  </property>
+
+  <!--Client Settings-->
+  <property>
+    <name>scm.container.client.idle.threshold</name>
+    <value>10s</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>
+      In the standalone pipelines, the SCM clients use netty to
+      communicate with the container. It also uses connection pooling to
+      reduce client side overheads. This allows a connection to stay idle for
+      a while before the connection is closed.
+    </description>
+  </property>
+  <property>
+    <name>scm.container.client.max.size</name>
+    <value>256</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>
+      Controls the maximum number of connections that we cached via
+      clientconnection pooling. If the number of connection
+      exceed this count then the oldest idle connection is evicted.
+    </description>
+  </property>
+
+  <property>
+    <name>scm.container.client.max.outstanding.requests</name>
+    <value>100</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>
+      Controls the maximum number of outstanding async requests that can be
+      handled by the Standalone as well as Ratis client.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.scm.container.creation.lease.timeout</name>
+    <value>60s</value>
+    <tag>OZONE, SCM</tag>
+    <description>
+      Container creation timeout in milliseconds to be used by SCM. When
+      BEGIN_CREATE event happens the container is moved from ALLOCATED to
+      CREATING state, SCM will now wait for the configured amount of time
+      to get COMPLETE_CREATE event if it doesn't receive it will move the
+      container to DELETING.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.key.preallocation.maxsize</name>
+    <value>134217728</value>
+    <tag>OZONE, KSM, PERFORMANCE</tag>
+    <description>
+      When a new key write request is sent to KSM, if a size is requested, at 
most
+      128MB of size is allocated at request time. If client needs more space 
for the
+      write, separate block allocation requests will be made.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.client.list.cache</name>
+    <value>1000</value>
+    <tag>OZONE, PERFORMANCE</tag>
+    <description>
+      Configuration property to configure the cache size of client list calls.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.replication</name>
+    <value>3</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>
+      Default replication value. The actual number of replications can be
+      specified when writing the key. The default is used if replication
+      is not specified. Supported values: 1 and 3.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.replication.type</name>
+    <value>RATIS</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>
+      Default replication type to be used while writing key into ozone. The
+      value can be specified when writing the key, default is used when
+      nothing is specified. Supported values: RATIS, STAND_ALONE and CHAINED.
+    </description>
+  </property>
+  <property>
+    <name>ozone.scm.container.close.threshold</name>
+    <value>0.9f</value>
+    <tag>OZONE, SCM</tag>
+    <description>
+      This determines the threshold to be used for closing a container.
+      When the container used percentage reaches this threshold,
+      the container will be closed. Value should be a positive, non-zero
+      percentage in float notation (X.Yf), with 1.0f meaning 100%.
+    </description>
+  </property>
+  <property>
+    <name>ozone.rest.client.http.connection.max</name>
+    <value>100</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>
+      This defines the overall connection limit for the connection pool used in
+      RestClient.
+    </description>
+  </property>
+  <property>
+    <name>ozone.rest.client.http.connection.per-route.max</name>
+    <value>20</value>
+    <tag>OZONE, CLIENT</tag>
+    <description>
+      This defines the connection limit per one HTTP route/host. Total max
+      connection is limited by ozone.rest.client.http.connection.max property.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.open.key.cleanup.service.interval.seconds</name>
+    <value>86400</value>
+    <tag>OZONE, KSM, PERFORMANCE</tag>
+    <description>
+      A background job periodically checks open key entries and delete the 
expired ones. This entry controls the
+      interval of this cleanup check.
+    </description>
+  </property>
+
+  <property>
+    <name>ozone.open.key.expire.threshold</name>
+    <value>86400</value>
+    <tag>OZONE, KSM, PERFORMANCE</tag>
+    <description>
+      Controls how long an open key operation is considered active. 
Specifically, if a key
+      has been open longer than the value of this config entry, that open key 
is considered as
+      expired (e.g. due to client crash). Default to 24 hours.
+    </description>
+  </property>
+
+
+  <property>
+    <name>hdds.rest.rest-csrf.enabled</name>
+    <value>false</value>
+    <description>
+      If true, then enables Object Store REST server protection against
+      cross-site request forgery (CSRF).
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.rest.http-address</name>
+    <value>0.0.0.0:9880</value>
+    <description>The http address of Object Store REST server inside the
+      datanode.</description>
+  </property>
+
+
+  <property>
+    <name>hdds.rest.netty.high.watermark</name>
+    <value>65535</value>
+    <description>
+      High watermark configuration to Netty for Object Store REST server.
+    </description>
+  </property>
+
+  <property>
+    <name>hdds.rest.netty.low.watermark</name>
+    <value>32768</value>
+    <description>
+      Low watermark configuration to Netty for Object Store REST server.
+    </description>
+  </property>
+
+
+</configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestArchive.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestArchive.java 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestArchive.java
new file mode 100644
index 0000000..f53f770
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/TestArchive.java
@@ -0,0 +1,114 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.RandomUtils;
+import org.apache.hadoop.fs.FileUtil;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Iterator;
+import java.util.Random;
+import java.util.zip.Adler32;
+import java.util.zip.Checksum;
+
+/**
+ * Test archive creation and unpacking.
+ */
+public class TestArchive {
+  private static final int DIR_COUNT = 10;
+  private static final int SUB_DIR_COUNT = 3;
+  private static final int FILE_COUNT = 10;
+  private long checksumWrite = 0L;
+  private long checksumRead = 0L;
+  private long tmp = 0L;
+  private Checksum crc = new Adler32();
+
+  @Rule
+  public TemporaryFolder folder = new TemporaryFolder();
+
+  @Rule
+  public TemporaryFolder outputFolder = new TemporaryFolder();
+
+
+  @Before
+  public void setUp() throws Exception {
+    Random r = new Random();
+    final int megaByte = 1024 * 1024;
+
+    for (int x = 0; x < DIR_COUNT; x++) {
+      File subdir = folder.newFolder(String.format("dir%d", x));
+      for (int y = 0; y < SUB_DIR_COUNT; y++) {
+        File targetDir = new File(subdir.getPath().concat(File.separator)
+            .concat(String.format("subdir%d%d", x, y)));
+        if(!targetDir.mkdirs()) {
+          throw new IOException("Failed to create subdirectory. " +
+              targetDir.toString());
+        }
+        for (int z = 0; z < FILE_COUNT; z++) {
+          Path temp = Paths.get(targetDir.getPath().concat(File.separator)
+              .concat(String.format("File%d.txt", z)));
+          byte[] buf = RandomUtils.nextBytes(r.nextInt(megaByte));
+          Files.write(temp, buf);
+          crc.reset();
+          crc.update(buf, 0, buf.length);
+          tmp = crc.getValue();
+          checksumWrite +=tmp;
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testArchive() throws Exception {
+    File archiveFile = new File(outputFolder.getRoot() + File.separator
+        + "test.container.zip");
+    long zipCheckSum = FileUtil.zip(folder.getRoot(), archiveFile);
+    Assert.assertTrue(zipCheckSum > 0);
+    File decomp = new File(outputFolder.getRoot() + File.separator +
+        "decompress");
+    if (!decomp.exists() && !decomp.mkdirs()) {
+      throw new IOException("Unable to create the destination directory. " +
+          decomp.getPath());
+    }
+
+    FileUtil.unZip(archiveFile, decomp);
+    String[] patterns = {"txt"};
+    Iterator<File> iter = FileUtils.iterateFiles(decomp, patterns, true);
+    int count = 0;
+    while (iter.hasNext()) {
+      count++;
+      byte[] buf = Files.readAllBytes(iter.next().toPath());
+      crc.reset();
+      crc.update(buf, 0, buf.length);
+      tmp = crc.getValue();
+      checksumRead += tmp;
+    }
+    Assert.assertEquals(DIR_COUNT * SUB_DIR_COUNT * FILE_COUNT, count);
+    Assert.assertEquals(checksumWrite, checksumRead);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/package-info.java 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/package-info.java
new file mode 100644
index 0000000..7966941
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm;
+/**
+ Test cases for SCM client classes.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
new file mode 100644
index 0000000..6b26b60
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/TestMetadataStore.java
@@ -0,0 +1,414 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.utils.BatchOperation;
+import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.apache.hadoop.utils.MetadataStore;
+import org.apache.hadoop.utils.MetadataStoreBuilder;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test class for ozone metadata store.
+ */
+@RunWith(Parameterized.class)
+public class TestMetadataStore {
+
+  private final String storeImpl;
+
+  public TestMetadataStore(String metadataImpl) {
+    this.storeImpl = metadataImpl;
+  }
+
+  @Parameters
+  public static Collection<Object[]> data() {
+    return Arrays.asList(new Object[][] {
+        {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB},
+        {OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_ROCKSDB}
+    });
+  }
+
+  private MetadataStore store;
+  private File testDir;
+  private final static int MAX_GETRANGE_LENGTH = 100;
+
+  @Rule
+  public ExpectedException expectedException = ExpectedException.none();
+
+  @Before
+  public void init() throws IOException {
+    testDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+        + "-" + storeImpl.toLowerCase());
+
+    Configuration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
+
+    store = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setCreateIfMissing(true)
+        .setDbFile(testDir)
+        .build();
+
+    // Add 20 entries.
+    // {a0 : a-value0} to {a9 : a-value9}
+    // {b0 : b-value0} to {b9 : b-value9}
+    for (int i=0; i<10; i++) {
+      store.put(getBytes("a" + i), getBytes("a-value" + i));
+      store.put(getBytes("b" + i), getBytes("b-value" + i));
+    }
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    store.close();
+    store.destroy();
+    FileUtils.deleteDirectory(testDir);
+  }
+
+  private byte[] getBytes(String str) {
+    return str == null ? null :
+        DFSUtilClient.string2Bytes(str);
+  }
+
+  private String getString(byte[] bytes) {
+    return bytes == null ? null :
+        DFSUtilClient.bytes2String(bytes);
+  }
+
+  @Test
+  public void testGetDelete() throws IOException {
+    for (int i=0; i<10; i++) {
+      byte[] va = store.get(getBytes("a" + i));
+      Assert.assertEquals("a-value" + i, getString(va));
+
+      byte[] vb = store.get(getBytes("b" + i));
+      Assert.assertEquals("b-value" + i, getString(vb));
+    }
+
+    String keyToDel = "del-" + UUID.randomUUID().toString();
+    store.put(getBytes(keyToDel), getBytes(keyToDel));
+    Assert.assertEquals(keyToDel, getString(store.get(getBytes(keyToDel))));
+    store.delete(getBytes(keyToDel));
+    Assert.assertEquals(null, store.get(getBytes(keyToDel)));
+  }
+
+  @Test
+  public void testPeekFrom() throws IOException {
+    // Test peek from an element that has prev as well as next
+    testPeek("a3", "a2", "a4");
+
+    // Test peek from an element that only has prev
+    testPeek("b9", "b8", null);
+
+    // Test peek from an element that only has next
+    testPeek("a0", null, "a1");
+  }
+
+  private String getExpectedValue(String key) {
+    if (key == null) {
+      return null;
+    }
+    char[] arr = key.toCharArray();
+    return new StringBuffer().append(arr[0]).append("-value")
+        .append(arr[arr.length - 1]).toString();
+  }
+
+  private void testPeek(String peekKey, String prevKey, String nextKey)
+      throws IOException {
+    // Look for current
+    String k = null;
+    String v = null;
+    ImmutablePair<byte[], byte[]> current =
+        store.peekAround(0, getBytes(peekKey));
+    if (current != null) {
+      k = getString(current.getKey());
+      v = getString(current.getValue());
+    }
+    Assert.assertEquals(peekKey, k);
+    Assert.assertEquals(v, getExpectedValue(peekKey));
+
+    // Look for prev
+    k = null;
+    v = null;
+    ImmutablePair<byte[], byte[]> prev =
+        store.peekAround(-1, getBytes(peekKey));
+    if (prev != null) {
+      k = getString(prev.getKey());
+      v = getString(prev.getValue());
+    }
+    Assert.assertEquals(prevKey, k);
+    Assert.assertEquals(v, getExpectedValue(prevKey));
+
+    // Look for next
+    k = null;
+    v = null;
+    ImmutablePair<byte[], byte[]> next =
+        store.peekAround(1, getBytes(peekKey));
+    if (next != null) {
+      k = getString(next.getKey());
+      v = getString(next.getValue());
+    }
+    Assert.assertEquals(nextKey, k);
+    Assert.assertEquals(v, getExpectedValue(nextKey));
+  }
+
+  @Test
+  public void testIterateKeys() throws IOException {
+    // iterate keys from b0
+    ArrayList<String> result = Lists.newArrayList();
+    store.iterate(getBytes("b0"), (k, v) -> {
+      // b-value{i}
+      String value = getString(v);
+      char num = value.charAt(value.length() - 1);
+      // each value adds 1
+      int i = Character.getNumericValue(num) + 1;
+      value =  value.substring(0, value.length() - 1) + i;
+      result.add(value);
+      return true;
+    });
+
+    Assert.assertFalse(result.isEmpty());
+    for (int i=0; i<result.size(); i++) {
+      Assert.assertEquals("b-value" + (i+1), result.get(i));
+    }
+
+    // iterate from a non exist key
+    result.clear();
+    store.iterate(getBytes("xyz"), (k, v) -> {
+      result.add(getString(v));
+      return true;
+    });
+    Assert.assertTrue(result.isEmpty());
+
+    // iterate from the beginning
+    result.clear();
+    store.iterate(null, (k, v) -> {
+      result.add(getString(v));
+      return true;
+    });
+    Assert.assertEquals(20, result.size());
+  }
+
+  @Test
+  public void testGetRangeKVs() throws IOException {
+    List<Map.Entry<byte[], byte[]>> result = null;
+
+    // Set empty startKey will return values from beginning.
+    result = store.getRangeKVs(null, 5);
+    Assert.assertEquals(5, result.size());
+    Assert.assertEquals("a-value2", getString(result.get(2).getValue()));
+
+    // Empty list if startKey doesn't exist.
+    result = store.getRangeKVs(getBytes("a12"), 5);
+    Assert.assertEquals(0, result.size());
+
+    // Returns max available entries after a valid startKey.
+    result = store.getRangeKVs(getBytes("b0"), MAX_GETRANGE_LENGTH);
+    Assert.assertEquals(10, result.size());
+    Assert.assertEquals("b0", getString(result.get(0).getKey()));
+    Assert.assertEquals("b-value0", getString(result.get(0).getValue()));
+    result = store.getRangeKVs(getBytes("b0"), 5);
+    Assert.assertEquals(5, result.size());
+
+    // Both startKey and count are honored.
+    result = store.getRangeKVs(getBytes("a9"), 2);
+    Assert.assertEquals(2, result.size());
+    Assert.assertEquals("a9", getString(result.get(0).getKey()));
+    Assert.assertEquals("a-value9", getString(result.get(0).getValue()));
+    Assert.assertEquals("b0", getString(result.get(1).getKey()));
+    Assert.assertEquals("b-value0", getString(result.get(1).getValue()));
+
+    // Filter keys by prefix.
+    // It should returns all "b*" entries.
+    MetadataKeyFilter filter1 = new KeyPrefixFilter("b");
+    result = store.getRangeKVs(null, 100, filter1);
+    Assert.assertEquals(10, result.size());
+    Assert.assertTrue(result.stream().allMatch(entry ->
+        new String(entry.getKey()).startsWith("b")
+    ));
+    Assert.assertEquals(20, filter1.getKeysScannedNum());
+    Assert.assertEquals(10, filter1.getKeysHintedNum());
+    result = store.getRangeKVs(null, 3, filter1);
+    Assert.assertEquals(3, result.size());
+    result = store.getRangeKVs(getBytes("b3"), 1, filter1);
+    Assert.assertEquals("b-value3", getString(result.get(0).getValue()));
+
+    // Define a customized filter that filters keys by suffix.
+    // Returns all "*2" entries.
+    MetadataKeyFilter filter2 = (preKey, currentKey, nextKey)
+        -> getString(currentKey).endsWith("2");
+    result = store.getRangeKVs(null, MAX_GETRANGE_LENGTH, filter2);
+    Assert.assertEquals(2, result.size());
+    Assert.assertEquals("a2", getString(result.get(0).getKey()));
+    Assert.assertEquals("b2", getString(result.get(1).getKey()));
+    result = store.getRangeKVs(null, 1, filter2);
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("a2", getString(result.get(0).getKey()));
+
+    // Apply multiple filters.
+    result = store.getRangeKVs(null, MAX_GETRANGE_LENGTH, filter1, filter2);
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("b2", getString(result.get(0).getKey()));
+    Assert.assertEquals("b-value2", getString(result.get(0).getValue()));
+
+    // If filter is null, no effect.
+    result = store.getRangeKVs(null, 1, null);
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("a0", getString(result.get(0).getKey()));
+  }
+
+  @Test
+  public void testGetSequentialRangeKVs() throws IOException {
+    MetadataKeyFilter suffixFilter = (preKey, currentKey, nextKey)
+        -> DFSUtil.bytes2String(currentKey).endsWith("2");
+    // Suppose to return a2 and b2
+    List<Map.Entry<byte[], byte[]>> result =
+        store.getRangeKVs(null, MAX_GETRANGE_LENGTH, suffixFilter);
+    Assert.assertEquals(2, result.size());
+    Assert.assertEquals("a2", DFSUtil.bytes2String(result.get(0).getKey()));
+    Assert.assertEquals("b2", DFSUtil.bytes2String(result.get(1).getKey()));
+
+    // Suppose to return just a2, because when it iterates to a3,
+    // the filter no long matches and it should stop from there.
+    result = store.getSequentialRangeKVs(null,
+        MAX_GETRANGE_LENGTH, suffixFilter);
+    Assert.assertEquals(1, result.size());
+    Assert.assertEquals("a2", DFSUtil.bytes2String(result.get(0).getKey()));
+  }
+
+  @Test
+  public void testGetRangeLength() throws IOException {
+    List<Map.Entry<byte[], byte[]>> result = null;
+
+    result = store.getRangeKVs(null, 0);
+    Assert.assertEquals(0, result.size());
+
+    result = store.getRangeKVs(null, 1);
+    Assert.assertEquals(1, result.size());
+
+    // Count less than zero is invalid.
+    expectedException.expect(IllegalArgumentException.class);
+    expectedException.expectMessage("Invalid count given");
+    store.getRangeKVs(null, -1);
+  }
+
+  @Test
+  public void testInvalidStartKey() throws IOException {
+    // If startKey is invalid, the returned list should be empty.
+    List<Map.Entry<byte[], byte[]>> kvs =
+        store.getRangeKVs(getBytes("unknownKey"), MAX_GETRANGE_LENGTH);
+    Assert.assertEquals(kvs.size(), 0);
+  }
+
+  @Test
+  public void testDestroyDB() throws IOException {
+    // create a new DB to test db destroy
+    Configuration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
+
+    File dbDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+        + "-" + storeImpl.toLowerCase() + "-toDestroy");
+    MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setCreateIfMissing(true)
+        .setDbFile(dbDir)
+        .build();
+
+    dbStore.put(getBytes("key1"), getBytes("value1"));
+    dbStore.put(getBytes("key2"), getBytes("value2"));
+
+    Assert.assertFalse(dbStore.isEmpty());
+    Assert.assertTrue(dbDir.exists());
+    Assert.assertTrue(dbDir.listFiles().length > 0);
+
+    dbStore.destroy();
+
+    Assert.assertFalse(dbDir.exists());
+  }
+
+  @Test
+  public void testBatchWrite() throws IOException {
+    Configuration conf = new OzoneConfiguration();
+    conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL, storeImpl);
+
+    File dbDir = GenericTestUtils.getTestDir(getClass().getSimpleName()
+        + "-" + storeImpl.toLowerCase() + "-batchWrite");
+    MetadataStore dbStore = MetadataStoreBuilder.newBuilder()
+        .setConf(conf)
+        .setCreateIfMissing(true)
+        .setDbFile(dbDir)
+        .build();
+
+    List<String> expectedResult = Lists.newArrayList();
+    for (int i = 0; i<10; i++) {
+      dbStore.put(getBytes("batch-" + i), getBytes("batch-value-" + i));
+      expectedResult.add("batch-" + i);
+    }
+
+    BatchOperation batch = new BatchOperation();
+    batch.delete(getBytes("batch-2"));
+    batch.delete(getBytes("batch-3"));
+    batch.delete(getBytes("batch-4"));
+    batch.put(getBytes("batch-new-2"), getBytes("batch-new-value-2"));
+
+    expectedResult.remove("batch-2");
+    expectedResult.remove("batch-3");
+    expectedResult.remove("batch-4");
+    expectedResult.add("batch-new-2");
+
+    dbStore.writeBatch(batch);
+
+    Iterator<String> it = expectedResult.iterator();
+    AtomicInteger count = new AtomicInteger(0);
+    dbStore.iterate(null, (key, value) -> {
+      count.incrementAndGet();
+      return it.hasNext() && it.next().equals(getString(key));
+    });
+
+    Assert.assertEquals(8, count.get());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/TestOzoneAcls.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/TestOzoneAcls.java 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/TestOzoneAcls.java
new file mode 100644
index 0000000..03c45c5
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/TestOzoneAcls.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone;
+
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Set;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+/**
+ * This class is to test acl stoarge and retreival in ozone store.
+ */
+public class TestOzoneAcls {
+
+  @Test
+  public void testAclParse() {
+    HashMap<String, Boolean> testMatrix;
+    testMatrix = new HashMap<>();
+
+    testMatrix.put("user:bilbo:r", Boolean.TRUE);
+    testMatrix.put("user:bilbo:w", Boolean.TRUE);
+    testMatrix.put("user:bilbo:rw", Boolean.TRUE);
+    testMatrix.put("user:bilbo:wr", Boolean.TRUE);
+    testMatrix.put("    user:bilbo:wr   ", Boolean.TRUE);
+
+
+    // ACLs makes no judgement on the quality of
+    // user names. it is for the userAuth interface
+    // to determine if a user name is really a name
+    testMatrix.put(" user:*:rw", Boolean.TRUE);
+    testMatrix.put(" user:~!:rw", Boolean.TRUE);
+
+
+    testMatrix.put("", Boolean.FALSE);
+    testMatrix.put(null, Boolean.FALSE);
+    testMatrix.put(" user:bilbo:", Boolean.FALSE);
+    testMatrix.put(" user:bilbo:rx", Boolean.FALSE);
+    testMatrix.put(" user:bilbo:mk", Boolean.FALSE);
+    testMatrix.put(" user::rw", Boolean.FALSE);
+    testMatrix.put("user11:bilbo:rw", Boolean.FALSE);
+    testMatrix.put(" user:::rw", Boolean.FALSE);
+
+    testMatrix.put(" group:hobbit:r", Boolean.TRUE);
+    testMatrix.put(" group:hobbit:w", Boolean.TRUE);
+    testMatrix.put(" group:hobbit:rw", Boolean.TRUE);
+    testMatrix.put(" group:hobbit:wr", Boolean.TRUE);
+    testMatrix.put(" group:*:rw", Boolean.TRUE);
+    testMatrix.put(" group:~!:rw", Boolean.TRUE);
+
+    testMatrix.put(" group:hobbit:", Boolean.FALSE);
+    testMatrix.put(" group:hobbit:rx", Boolean.FALSE);
+    testMatrix.put(" group:hobbit:mk", Boolean.FALSE);
+    testMatrix.put(" group::", Boolean.FALSE);
+    testMatrix.put(" group::rw", Boolean.FALSE);
+    testMatrix.put(" group22:hobbit:", Boolean.FALSE);
+    testMatrix.put(" group:::rw", Boolean.FALSE);
+
+    testMatrix.put("JUNK group:hobbit:r", Boolean.FALSE);
+    testMatrix.put("JUNK group:hobbit:w", Boolean.FALSE);
+    testMatrix.put("JUNK group:hobbit:rw", Boolean.FALSE);
+    testMatrix.put("JUNK group:hobbit:wr", Boolean.FALSE);
+    testMatrix.put("JUNK group:*:rw", Boolean.FALSE);
+    testMatrix.put("JUNK group:~!:rw", Boolean.FALSE);
+
+    testMatrix.put(" world::r", Boolean.TRUE);
+    testMatrix.put(" world::w", Boolean.TRUE);
+    testMatrix.put(" world::rw", Boolean.TRUE);
+    testMatrix.put(" world::wr", Boolean.TRUE);
+
+    testMatrix.put(" world:bilbo:w", Boolean.FALSE);
+    testMatrix.put(" world:bilbo:rw", Boolean.FALSE);
+
+    Set<String> keys = testMatrix.keySet();
+    for (String key : keys) {
+      if (testMatrix.get(key)) {
+        OzoneAcl.parseAcl(key);
+      } else {
+        try {
+          OzoneAcl.parseAcl(key);
+          // should never get here since parseAcl will throw
+          fail("An exception was expected but did not happen.");
+        } catch (IllegalArgumentException e) {
+          // nothing to do
+        }
+      }
+    }
+  }
+
+  @Test
+  public void testAclValues() {
+    OzoneAcl acl = OzoneAcl.parseAcl("user:bilbo:rw");
+    assertEquals(acl.getName(), "bilbo");
+    assertEquals(OzoneAcl.OzoneACLRights.READ_WRITE, acl.getRights());
+    assertEquals(OzoneAcl.OzoneACLType.USER, acl.getType());
+
+    acl = OzoneAcl.parseAcl("user:bilbo:wr");
+    assertEquals("bilbo", acl.getName());
+    assertEquals(OzoneAcl.OzoneACLRights.READ_WRITE, acl.getRights());
+    assertEquals(OzoneAcl.OzoneACLType.USER, acl.getType());
+
+    acl = OzoneAcl.parseAcl("user:bilbo:r");
+    assertEquals("bilbo", acl.getName());
+    assertEquals(OzoneAcl.OzoneACLRights.READ, acl.getRights());
+    assertEquals(OzoneAcl.OzoneACLType.USER, acl.getType());
+
+    acl = OzoneAcl.parseAcl("user:bilbo:w");
+    assertEquals("bilbo", acl.getName());
+    assertEquals(OzoneAcl.OzoneACLRights.WRITE, acl.getRights());
+    assertEquals(OzoneAcl.OzoneACLType.USER, acl.getType());
+
+    acl = OzoneAcl.parseAcl("group:hobbit:wr");
+    assertEquals(acl.getName(), "hobbit");
+    assertEquals(OzoneAcl.OzoneACLRights.READ_WRITE, acl.getRights());
+    assertEquals(OzoneAcl.OzoneACLType.GROUP, acl.getType());
+
+    acl = OzoneAcl.parseAcl("world::wr");
+    assertEquals(acl.getName(), "");
+    assertEquals(OzoneAcl.OzoneACLRights.READ_WRITE, acl.getRights());
+    assertEquals(OzoneAcl.OzoneACLType.WORLD, acl.getType());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestStateMachine.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestStateMachine.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestStateMachine.java
new file mode 100644
index 0000000..c1470bb
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestStateMachine.java
@@ -0,0 +1,106 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.common;
+
+import org.apache.commons.collections.SetUtils;
+import org.apache.hadoop.ozone.common.statemachine
+    .InvalidStateTransitionException;
+import org.apache.hadoop.ozone.common.statemachine.StateMachine;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.apache.hadoop.ozone.common.TestStateMachine.STATES.CLEANUP;
+import static org.apache.hadoop.ozone.common.TestStateMachine.STATES.CLOSED;
+import static org.apache.hadoop.ozone.common.TestStateMachine.STATES.CREATING;
+import static org.apache.hadoop.ozone.common.TestStateMachine.STATES.FINAL;
+import static org.apache.hadoop.ozone.common.TestStateMachine.STATES.INIT;
+import static org.apache.hadoop.ozone.common.TestStateMachine.STATES
+    .OPERATIONAL;
+
+/**
+ * This class is to test ozone common state machine.
+ */
+public class TestStateMachine {
+
+  /**
+   * STATES used by the test state machine.
+   */
+  public enum STATES {INIT, CREATING, OPERATIONAL, CLOSED, CLEANUP, FINAL};
+
+  /**
+   * EVENTS used by the test state machine.
+   */
+  public enum EVENTS {ALLOCATE, CREATE, UPDATE, CLOSE, DELETE, TIMEOUT};
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  @Test
+  public void testStateMachineStates() throws InvalidStateTransitionException {
+    Set<STATES> finals = new HashSet<>();
+    finals.add(FINAL);
+
+    StateMachine<STATES, EVENTS> stateMachine =
+        new StateMachine<>(INIT, finals);
+
+    stateMachine.addTransition(INIT, CREATING, EVENTS.ALLOCATE);
+    stateMachine.addTransition(CREATING, OPERATIONAL, EVENTS.CREATE);
+    stateMachine.addTransition(OPERATIONAL, OPERATIONAL, EVENTS.UPDATE);
+    stateMachine.addTransition(OPERATIONAL, CLEANUP, EVENTS.DELETE);
+    stateMachine.addTransition(OPERATIONAL, CLOSED, EVENTS.CLOSE);
+    stateMachine.addTransition(CREATING, CLEANUP, EVENTS.TIMEOUT);
+
+    // Initial and Final states
+    Assert.assertEquals("Initial State", INIT, stateMachine.getInitialState());
+    Assert.assertTrue("Final States", SetUtils.isEqualSet(finals,
+        stateMachine.getFinalStates()));
+
+    // Valid state transitions
+    Assert.assertEquals("STATE should be OPERATIONAL after being created",
+        OPERATIONAL, stateMachine.getNextState(CREATING, EVENTS.CREATE));
+    Assert.assertEquals("STATE should be OPERATIONAL after being updated",
+        OPERATIONAL, stateMachine.getNextState(OPERATIONAL, EVENTS.UPDATE));
+    Assert.assertEquals("STATE should be CLEANUP after being deleted",
+        CLEANUP, stateMachine.getNextState(OPERATIONAL, EVENTS.DELETE));
+    Assert.assertEquals("STATE should be CLEANUP after being timeout",
+        CLEANUP, stateMachine.getNextState(CREATING, EVENTS.TIMEOUT));
+    Assert.assertEquals("STATE should be CLOSED after being closed",
+        CLOSED, stateMachine.getNextState(OPERATIONAL, EVENTS.CLOSE));
+
+    // Negative cases: invalid transition
+    expectException();
+    stateMachine.getNextState(OPERATIONAL, EVENTS.CREATE);
+
+    expectException();
+    stateMachine.getNextState(CREATING, EVENTS.CLOSE);
+  }
+
+  /**
+   * We expect an InvalidStateTransitionException.
+   */
+  private void expectException() {
+    exception.expect(InvalidStateTransitionException.class);
+    exception.expectMessage("Invalid event");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java
new file mode 100644
index 0000000..517c1a7
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/TestLeaseManager.java
@@ -0,0 +1,374 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+/**
+ * A generic lease management API which can be used if a service
+ * needs any kind of lease management.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Test class to check functionality and consistency of LeaseManager.
+ */
+public class TestLeaseManager {
+
+  @Rule
+  public ExpectedException exception = ExpectedException.none();
+
+  /**
+   * Dummy resource on which leases can be acquired.
+   */
+  private final class DummyResource {
+
+    private final String name;
+
+    private DummyResource(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public int hashCode() {
+      return name.hashCode();
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if(obj instanceof DummyResource) {
+        return name.equals(((DummyResource) obj).name);
+      }
+      return false;
+    }
+  }
+
+  @Test
+  public void testLeaseAcquireAndRelease() throws LeaseException {
+    //It is assumed that the test case execution won't take more than 5 
seconds,
+    //if it takes more time increase the defaultTimeout value of LeaseManager.
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    DummyResource resourceTwo = new DummyResource("two");
+    DummyResource resourceThree = new DummyResource("three");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
+    Lease<DummyResource> leaseThree = manager.acquire(resourceThree);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
+    Assert.assertEquals(leaseThree, manager.get(resourceThree));
+    Assert.assertFalse(leaseOne.hasExpired());
+    Assert.assertFalse(leaseTwo.hasExpired());
+    Assert.assertFalse(leaseThree.hasExpired());
+    //The below releases should not throw LeaseNotFoundException.
+    manager.release(resourceOne);
+    manager.release(resourceTwo);
+    manager.release(resourceThree);
+    Assert.assertTrue(leaseOne.hasExpired());
+    Assert.assertTrue(leaseTwo.hasExpired());
+    Assert.assertTrue(leaseThree.hasExpired());
+    manager.shutdown();
+  }
+
+  @Test
+  public void testLeaseAlreadyExist() throws LeaseException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    DummyResource resourceTwo = new DummyResource("two");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
+
+    exception.expect(LeaseAlreadyExistException.class);
+    exception.expectMessage("Resource: " + resourceOne);
+    manager.acquire(resourceOne);
+
+    manager.release(resourceOne);
+    manager.release(resourceTwo);
+    manager.shutdown();
+  }
+
+  @Test
+  public void testLeaseNotFound() throws LeaseException, InterruptedException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    DummyResource resourceTwo = new DummyResource("two");
+    DummyResource resourceThree = new DummyResource("three");
+
+    //Case 1: lease was never acquired.
+    exception.expect(LeaseNotFoundException.class);
+    exception.expectMessage("Resource: " + resourceOne);
+    manager.get(resourceOne);
+
+    //Case 2: lease is acquired and released.
+    Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
+    Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
+    Assert.assertFalse(leaseTwo.hasExpired());
+    manager.release(resourceTwo);
+    Assert.assertTrue(leaseTwo.hasExpired());
+    exception.expect(LeaseNotFoundException.class);
+    exception.expectMessage("Resource: " + resourceTwo);
+    manager.get(resourceTwo);
+
+    //Case 3: lease acquired and timed out.
+    Lease<DummyResource> leaseThree = manager.acquire(resourceThree);
+    Assert.assertEquals(leaseThree, manager.get(resourceThree));
+    Assert.assertFalse(leaseThree.hasExpired());
+    long sleepTime = leaseThree.getRemainingTime() + 1000;
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException ex) {
+      //even in case of interrupt we have to wait till lease times out.
+      Thread.sleep(sleepTime);
+    }
+    Assert.assertTrue(leaseThree.hasExpired());
+    exception.expect(LeaseNotFoundException.class);
+    exception.expectMessage("Resource: " + resourceThree);
+    manager.get(resourceThree);
+    manager.shutdown();
+  }
+
+  @Test
+  public void testCustomLeaseTimeout() throws LeaseException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    DummyResource resourceTwo = new DummyResource("two");
+    DummyResource resourceThree = new DummyResource("three");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo, 10000);
+    Lease<DummyResource> leaseThree = manager.acquire(resourceThree, 50000);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertEquals(leaseTwo, manager.get(resourceTwo));
+    Assert.assertEquals(leaseThree, manager.get(resourceThree));
+    Assert.assertFalse(leaseOne.hasExpired());
+    Assert.assertFalse(leaseTwo.hasExpired());
+    Assert.assertFalse(leaseThree.hasExpired());
+    Assert.assertEquals(5000, leaseOne.getLeaseLifeTime());
+    Assert.assertEquals(10000, leaseTwo.getLeaseLifeTime());
+    Assert.assertEquals(50000, leaseThree.getLeaseLifeTime());
+    // Releasing of leases is done in shutdown, so don't have to worry about
+    // lease release
+    manager.shutdown();
+  }
+
+  @Test
+  public void testLeaseCallback() throws LeaseException, InterruptedException {
+    Map<DummyResource, String> leaseStatus = new HashMap<>();
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    leaseStatus.put(resourceOne, "lease in use");
+    leaseOne.registerCallBack(() -> {
+      leaseStatus.put(resourceOne, "lease expired");
+      return null;
+    });
+    // wait for lease to expire
+    long sleepTime = leaseOne.getRemainingTime() + 1000;
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException ex) {
+      //even in case of interrupt we have to wait till lease times out.
+      Thread.sleep(sleepTime);
+    }
+    Assert.assertTrue(leaseOne.hasExpired());
+    exception.expect(LeaseNotFoundException.class);
+    exception.expectMessage("Resource: " + resourceOne);
+    manager.get(resourceOne);
+    // check if callback has been executed
+    Assert.assertEquals("lease expired", leaseStatus.get(resourceOne));
+  }
+
+  @Test
+  public void testCallbackExecutionInCaseOfLeaseRelease()
+      throws LeaseException, InterruptedException {
+    // Callbacks should not be executed in case of lease release
+    Map<DummyResource, String> leaseStatus = new HashMap<>();
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    leaseStatus.put(resourceOne, "lease in use");
+    leaseOne.registerCallBack(() -> {
+      leaseStatus.put(resourceOne, "lease expired");
+      return null;
+    });
+    leaseStatus.put(resourceOne, "lease released");
+    manager.release(resourceOne);
+    Assert.assertTrue(leaseOne.hasExpired());
+    exception.expect(LeaseNotFoundException.class);
+    exception.expectMessage("Resource: " + resourceOne);
+    manager.get(resourceOne);
+    Assert.assertEquals("lease released", leaseStatus.get(resourceOne));
+  }
+
+  @Test
+  public void testLeaseCallbackWithMultipleLeases()
+      throws LeaseException, InterruptedException {
+    Map<DummyResource, String> leaseStatus = new HashMap<>();
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    DummyResource resourceTwo = new DummyResource("two");
+    DummyResource resourceThree = new DummyResource("three");
+    DummyResource resourceFour = new DummyResource("four");
+    DummyResource resourceFive = new DummyResource("five");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Lease<DummyResource> leaseTwo = manager.acquire(resourceTwo);
+    Lease<DummyResource> leaseThree = manager.acquire(resourceThree);
+    Lease<DummyResource> leaseFour = manager.acquire(resourceFour);
+    Lease<DummyResource> leaseFive = manager.acquire(resourceFive);
+    leaseStatus.put(resourceOne, "lease in use");
+    leaseStatus.put(resourceTwo, "lease in use");
+    leaseStatus.put(resourceThree, "lease in use");
+    leaseStatus.put(resourceFour, "lease in use");
+    leaseStatus.put(resourceFive, "lease in use");
+    leaseOne.registerCallBack(() -> {
+      leaseStatus.put(resourceOne, "lease expired");
+      return null;
+    });
+    leaseTwo.registerCallBack(() -> {
+      leaseStatus.put(resourceTwo, "lease expired");
+      return null;
+    });
+    leaseThree.registerCallBack(() -> {
+      leaseStatus.put(resourceThree, "lease expired");
+      return null;
+    });
+    leaseFour.registerCallBack(() -> {
+      leaseStatus.put(resourceFour, "lease expired");
+      return null;
+    });
+    leaseFive.registerCallBack(() -> {
+      leaseStatus.put(resourceFive, "lease expired");
+      return null;
+    });
+
+    // release lease one, two and three
+    leaseStatus.put(resourceOne, "lease released");
+    manager.release(resourceOne);
+    leaseStatus.put(resourceTwo, "lease released");
+    manager.release(resourceTwo);
+    leaseStatus.put(resourceThree, "lease released");
+    manager.release(resourceThree);
+
+    // wait for other leases to expire
+    long sleepTime = leaseFive.getRemainingTime() + 1000;
+
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException ex) {
+      //even in case of interrupt we have to wait till lease times out.
+      Thread.sleep(sleepTime);
+    }
+    Assert.assertTrue(leaseOne.hasExpired());
+    Assert.assertTrue(leaseTwo.hasExpired());
+    Assert.assertTrue(leaseThree.hasExpired());
+    Assert.assertTrue(leaseFour.hasExpired());
+    Assert.assertTrue(leaseFive.hasExpired());
+
+    Assert.assertEquals("lease released", leaseStatus.get(resourceOne));
+    Assert.assertEquals("lease released", leaseStatus.get(resourceTwo));
+    Assert.assertEquals("lease released", leaseStatus.get(resourceThree));
+    Assert.assertEquals("lease expired", leaseStatus.get(resourceFour));
+    Assert.assertEquals("lease expired", leaseStatus.get(resourceFive));
+    manager.shutdown();
+  }
+
+  @Test
+  public void testReuseReleasedLease() throws LeaseException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertFalse(leaseOne.hasExpired());
+
+    manager.release(resourceOne);
+    Assert.assertTrue(leaseOne.hasExpired());
+
+    Lease<DummyResource> sameResourceLease = manager.acquire(resourceOne);
+    Assert.assertEquals(sameResourceLease, manager.get(resourceOne));
+    Assert.assertFalse(sameResourceLease.hasExpired());
+
+    manager.release(resourceOne);
+    Assert.assertTrue(sameResourceLease.hasExpired());
+    manager.shutdown();
+  }
+
+  @Test
+  public void testReuseTimedOutLease()
+      throws LeaseException, InterruptedException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertFalse(leaseOne.hasExpired());
+
+    // wait for lease to expire
+    long sleepTime = leaseOne.getRemainingTime() + 1000;
+    try {
+      Thread.sleep(sleepTime);
+    } catch (InterruptedException ex) {
+      //even in case of interrupt we have to wait till lease times out.
+      Thread.sleep(sleepTime);
+    }
+    Assert.assertTrue(leaseOne.hasExpired());
+
+    Lease<DummyResource> sameResourceLease = manager.acquire(resourceOne);
+    Assert.assertEquals(sameResourceLease, manager.get(resourceOne));
+    Assert.assertFalse(sameResourceLease.hasExpired());
+
+    manager.release(resourceOne);
+    Assert.assertTrue(sameResourceLease.hasExpired());
+    manager.shutdown();
+  }
+
+  @Test
+  public void testRenewLease() throws LeaseException, InterruptedException {
+    LeaseManager<DummyResource> manager = new LeaseManager<>(5000);
+    manager.start();
+    DummyResource resourceOne = new DummyResource("one");
+    Lease<DummyResource> leaseOne = manager.acquire(resourceOne);
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertFalse(leaseOne.hasExpired());
+
+    // add 5 more seconds to the lease
+    leaseOne.renew(5000);
+
+    Thread.sleep(5000);
+
+    // lease should still be active
+    Assert.assertEquals(leaseOne, manager.get(resourceOne));
+    Assert.assertFalse(leaseOne.hasExpired());
+    manager.release(resourceOne);
+    manager.shutdown();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/package-info.java
 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/package-info.java
new file mode 100644
index 0000000..1071309
--- /dev/null
+++ 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/lease/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+/*
+ This package contains lease management unit test classes.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/package-info.java 
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/package-info.java
new file mode 100644
index 0000000..0030d2e
--- /dev/null
+++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/package-info.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+/**
+ * Ozone related test helper classes and tests of common utils.
+ */


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to