http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
new file mode 100644
index 0000000..fa5df11
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/container/common/helpers/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.helpers;
+
+/**
+ * Helper classes for the container protocol communication.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java
new file mode 100644
index 0000000..dfa9315
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/Lease.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+import org.apache.hadoop.util.Time;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * This class represents the lease created on a resource. Callback can be
+ * registered on the lease which will be executed in case of timeout.
+ *
+ * @param <T> Resource type for which the lease can be associated
+ */
+public class Lease<T> {
+
+  /**
+   * The resource for which this lease is created.
+   */
+  private final T resource;
+
+  private final long creationTime;
+
+  /**
+   * Lease lifetime in milliseconds.
+   */
+  private volatile long leaseTimeout;
+
+  private boolean expired;
+
+  /**
+   * Functions to be called in case of timeout.
+   */
+  private List<Callable<Void>> callbacks;
+
+
+  /**
+   * Creates a lease on the specified resource with given timeout.
+   *
+   * @param resource
+   *        Resource for which the lease has to be created
+   * @param timeout
+   *        Lease lifetime in milliseconds
+   */
+  public Lease(T resource, long timeout) {
+    this.resource = resource;
+    this.leaseTimeout = timeout;
+    this.callbacks = Collections.synchronizedList(new ArrayList<>());
+    this.creationTime = Time.monotonicNow();
+    this.expired = false;
+  }
+
+  /**
+   * Returns true if the lease has expired, else false.
+   *
+   * @return true if expired, else false
+   */
+  public boolean hasExpired() {
+    return expired;
+  }
+
+  /**
+   * Registers a callback which will be executed in case of timeout. Callbacks
+   * are executed in a separate Thread.
+   *
+   * @param callback
+   *        The Callable which has to be executed
+   * @throws LeaseExpiredException
+   *         If the lease has already timed out
+   */
+  public void registerCallBack(Callable<Void> callback)
+      throws LeaseExpiredException {
+    if(hasExpired()) {
+      throw new LeaseExpiredException("Resource: " + resource);
+    }
+    callbacks.add(callback);
+  }
+
+  /**
+   * Returns the time elapsed since the creation of lease.
+   *
+   * @return elapsed time in milliseconds
+   * @throws LeaseExpiredException
+   *         If the lease has already timed out
+   */
+  public long getElapsedTime() throws LeaseExpiredException {
+    if(hasExpired()) {
+      throw new LeaseExpiredException("Resource: " + resource);
+    }
+    return Time.monotonicNow() - creationTime;
+  }
+
+  /**
+   * Returns the time available before timeout.
+   *
+   * @return remaining time in milliseconds
+   * @throws LeaseExpiredException
+   *         If the lease has already timed out
+   */
+  public long getRemainingTime() throws LeaseExpiredException {
+    if(hasExpired()) {
+      throw new LeaseExpiredException("Resource: " + resource);
+    }
+    return leaseTimeout - getElapsedTime();
+  }
+
+  /**
+   * Returns total lease lifetime.
+   *
+   * @return total lifetime of lease in milliseconds
+   * @throws LeaseExpiredException
+   *         If the lease has already timed out
+   */
+  public long getLeaseLifeTime() throws LeaseExpiredException {
+    if(hasExpired()) {
+      throw new LeaseExpiredException("Resource: " + resource);
+    }
+    return leaseTimeout;
+  }
+
+  /**
+   * Renews the lease timeout period.
+   *
+   * @param timeout
+   *        Time to be added to the lease in milliseconds
+   * @throws LeaseExpiredException
+   *         If the lease has already timed out
+   */
+  public void renew(long timeout) throws LeaseExpiredException {
+    if(hasExpired()) {
+      throw new LeaseExpiredException("Resource: " + resource);
+    }
+    leaseTimeout += timeout;
+  }
+
+  @Override
+  public int hashCode() {
+    return resource.hashCode();
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if(obj instanceof Lease) {
+      return resource.equals(((Lease) obj).resource);
+    }
+    return false;
+  }
+
+  @Override
+  public String toString() {
+    return "Lease<" + resource.toString() + ">";
+  }
+
+  /**
+   * Returns the callbacks to be executed for the lease in case of timeout.
+   *
+   * @return callbacks to be executed
+   */
+  List<Callable<Void>> getCallbacks() {
+    return callbacks;
+  }
+
+  /**
+   * Expires/Invalidates the lease.
+   */
+  void invalidate() {
+    callbacks = null;
+    expired = true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseAlreadyExistException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseAlreadyExistException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseAlreadyExistException.java
new file mode 100644
index 0000000..a39ea22
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseAlreadyExistException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+/**
+ * This exception represents that there is already a lease acquired on the
+ * same resource.
+ */
+public class LeaseAlreadyExistException  extends LeaseException {
+
+  /**
+   * Constructs an {@code LeaseAlreadyExistException} with {@code null}
+   * as its error detail message.
+   */
+  public LeaseAlreadyExistException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code LeaseAlreadyExistException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public LeaseAlreadyExistException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseCallbackExecutor.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseCallbackExecutor.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseCallbackExecutor.java
new file mode 100644
index 0000000..1b7391b
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseCallbackExecutor.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.Callable;
+
+/**
+ * This class is responsible for executing the callbacks of a lease in case of
+ * timeout.
+ */
+public class LeaseCallbackExecutor<T> implements Runnable {
+
+  private static final Logger LOG = LoggerFactory.getLogger(Lease.class);
+
+  private final T resource;
+  private final List<Callable<Void>> callbacks;
+
+  /**
+   * Constructs LeaseCallbackExecutor instance with list of callbacks.
+   *
+   * @param resource
+   *        The resource for which the callbacks are executed
+   * @param callbacks
+   *        Callbacks to be executed by this executor
+   */
+  public LeaseCallbackExecutor(T resource, List<Callable<Void>> callbacks) {
+    this.resource = resource;
+    this.callbacks = callbacks;
+  }
+
+  @Override
+  public void run() {
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Executing callbacks for lease on {}", resource);
+    }
+    for(Callable<Void> callback : callbacks) {
+      try {
+        callback.call();
+      } catch (Exception e) {
+        LOG.warn("Exception while executing callback for lease on {}",
+            resource, e);
+      }
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseException.java
new file mode 100644
index 0000000..418f412
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+/**
+ * This exception represents all lease related exceptions.
+ */
+public class LeaseException extends Exception {
+
+  /**
+   * Constructs an {@code LeaseException} with {@code null}
+   * as its error detail message.
+   */
+  public LeaseException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code LeaseException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public LeaseException(String message) {
+    super(message);
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseExpiredException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseExpiredException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseExpiredException.java
new file mode 100644
index 0000000..440a023
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseExpiredException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+/**
+ * This exception represents that the lease that is being accessed has expired.
+ */
+public class LeaseExpiredException extends LeaseException {
+
+  /**
+   * Constructs an {@code LeaseExpiredException} with {@code null}
+   * as its error detail message.
+   */
+  public LeaseExpiredException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code LeaseExpiredException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public LeaseExpiredException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java
new file mode 100644
index 0000000..b8390dd
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManager.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+/**
+ * LeaseManager is someone who can provide you leases based on your
+ * requirement. If you want to return the lease back before it expires,
+ * you can give it back to Lease Manager. He is the one responsible for
+ * the lifecycle of leases. The resource for which lease is created
+ * should have proper {@code equals} method implementation, resource
+ * equality is checked while the lease is created.
+ *
+ * @param <T> Type of leases that this lease manager can create
+ */
+public class LeaseManager<T> {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LeaseManager.class);
+
+  private final long defaultTimeout;
+  private Map<T, Lease<T>> activeLeases;
+  private LeaseMonitor leaseMonitor;
+  private Thread leaseMonitorThread;
+  private boolean isRunning;
+
+  /**
+   * Creates an instance of lease manager.
+   *
+   * @param defaultTimeout
+   *        Default timeout in milliseconds to be used for lease creation.
+   */
+  public LeaseManager(long defaultTimeout) {
+    this.defaultTimeout = defaultTimeout;
+  }
+
+  /**
+   * Starts the lease manager service.
+   */
+  public void start() {
+    LOG.debug("Starting LeaseManager service");
+    activeLeases = new ConcurrentHashMap<>();
+    leaseMonitor = new LeaseMonitor();
+    leaseMonitorThread = new Thread(leaseMonitor);
+    leaseMonitorThread.setName("LeaseManager#LeaseMonitor");
+    leaseMonitorThread.setDaemon(true);
+    leaseMonitorThread.setUncaughtExceptionHandler((thread, throwable) -> {
+      // Let us just restart this thread after logging an error.
+      // if this thread is not running we cannot handle Lease expiry.
+      LOG.error("LeaseMonitor thread encountered an error. Thread: {}",
+          thread.toString(), throwable);
+      leaseMonitorThread.start();
+    });
+    LOG.debug("Starting LeaseManager#LeaseMonitor Thread");
+    leaseMonitorThread.start();
+    isRunning = true;
+  }
+
+  /**
+   * Returns a lease for the specified resource with default timeout.
+   *
+   * @param resource
+   *        Resource for which lease has to be created
+   * @throws LeaseAlreadyExistException
+   *         If there is already a lease on the resource
+   */
+  public synchronized Lease<T> acquire(T resource)
+      throws LeaseAlreadyExistException {
+    return acquire(resource, defaultTimeout);
+  }
+
+  /**
+   * Returns a lease for the specified resource with the timeout provided.
+   *
+   * @param resource
+   *        Resource for which lease has to be created
+   * @param timeout
+   *        The timeout in milliseconds which has to be set on the lease
+   * @throws LeaseAlreadyExistException
+   *         If there is already a lease on the resource
+   */
+  public synchronized Lease<T> acquire(T resource, long timeout)
+      throws LeaseAlreadyExistException {
+    checkStatus();
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Acquiring lease on {} for {} milliseconds", resource, 
timeout);
+    }
+    if(activeLeases.containsKey(resource)) {
+      throw new LeaseAlreadyExistException("Resource: " + resource);
+    }
+    Lease<T> lease = new Lease<>(resource, timeout);
+    activeLeases.put(resource, lease);
+    leaseMonitorThread.interrupt();
+    return lease;
+  }
+
+  /**
+   * Returns a lease associated with the specified resource.
+   *
+   * @param resource
+   *        Resource for which the lease has to be returned
+   * @throws LeaseNotFoundException
+   *         If there is no active lease on the resource
+   */
+  public Lease<T> get(T resource) throws LeaseNotFoundException {
+    checkStatus();
+    Lease<T> lease = activeLeases.get(resource);
+    if(lease != null) {
+      return lease;
+    }
+    throw new LeaseNotFoundException("Resource: " + resource);
+  }
+
+  /**
+   * Releases the lease associated with the specified resource.
+   *
+   * @param resource
+   *        The for which the lease has to be released
+   * @throws LeaseNotFoundException
+   *         If there is no active lease on the resource
+   */
+  public synchronized void release(T resource)
+      throws LeaseNotFoundException {
+    checkStatus();
+    if(LOG.isDebugEnabled()) {
+      LOG.debug("Releasing lease on {}", resource);
+    }
+    Lease<T> lease = activeLeases.remove(resource);
+    if(lease == null) {
+      throw new LeaseNotFoundException("Resource: " + resource);
+    }
+    lease.invalidate();
+  }
+
+  /**
+   * Shuts down the LeaseManager and releases the resources. All the active
+   * {@link Lease} will be released (callbacks on leases will not be
+   * executed).
+   */
+  public void shutdown() {
+    checkStatus();
+    LOG.debug("Shutting down LeaseManager service");
+    leaseMonitor.disable();
+    leaseMonitorThread.interrupt();
+    for(T resource : activeLeases.keySet()) {
+      try {
+        release(resource);
+      }  catch(LeaseNotFoundException ex) {
+        //Ignore the exception, someone might have released the lease
+      }
+    }
+    isRunning = false;
+  }
+
+  /**
+   * Throws {@link LeaseManagerNotRunningException} if the service is not
+   * running.
+   */
+  private void checkStatus() {
+    if(!isRunning) {
+      throw new LeaseManagerNotRunningException("LeaseManager not running.");
+    }
+  }
+
+  /**
+   * Monitors the leases and expires them based on the timeout, also
+   * responsible for executing the callbacks of expired leases.
+   */
+  private final class LeaseMonitor implements Runnable {
+
+    private boolean monitor = true;
+    private ExecutorService executorService;
+
+    private LeaseMonitor() {
+      this.monitor = true;
+      this.executorService = Executors.newCachedThreadPool();
+    }
+
+    @Override
+    public void run() {
+      while(monitor) {
+        LOG.debug("LeaseMonitor: checking for lease expiry");
+        long sleepTime = Long.MAX_VALUE;
+
+        for (T resource : activeLeases.keySet()) {
+          try {
+            Lease<T> lease = get(resource);
+            long remainingTime = lease.getRemainingTime();
+            if (remainingTime <= 0) {
+              //Lease has timed out
+              List<Callable<Void>> leaseCallbacks = lease.getCallbacks();
+              release(resource);
+              executorService.execute(
+                  new LeaseCallbackExecutor(resource, leaseCallbacks));
+            } else {
+              sleepTime = remainingTime > sleepTime ?
+                  sleepTime : remainingTime;
+            }
+          } catch (LeaseNotFoundException | LeaseExpiredException ex) {
+            //Ignore the exception, someone might have released the lease
+          }
+        }
+
+        try {
+          if(!Thread.interrupted()) {
+            Thread.sleep(sleepTime);
+          }
+        } catch (InterruptedException ignored) {
+          // This means a new lease is added to activeLeases.
+        }
+      }
+    }
+
+    /**
+     * Disables lease monitor, next interrupt call on the thread
+     * will stop lease monitor.
+     */
+    public void disable() {
+      monitor = false;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManagerNotRunningException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManagerNotRunningException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManagerNotRunningException.java
new file mode 100644
index 0000000..ced31de
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseManagerNotRunningException.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+/**
+ * This exception represents that there LeaseManager service is not running.
+ */
+public class LeaseManagerNotRunningException  extends RuntimeException {
+
+  /**
+   * Constructs an {@code LeaseManagerNotRunningException} with {@code null}
+   * as its error detail message.
+   */
+  public LeaseManagerNotRunningException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code LeaseManagerNotRunningException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public LeaseManagerNotRunningException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseNotFoundException.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseNotFoundException.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseNotFoundException.java
new file mode 100644
index 0000000..c292d33
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/LeaseNotFoundException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.lease;
+
+/**
+ * This exception represents that the lease that is being accessed does not
+ * exist.
+ */
+public class LeaseNotFoundException extends LeaseException {
+
+  /**
+   * Constructs an {@code LeaseNotFoundException} with {@code null}
+   * as its error detail message.
+   */
+  public LeaseNotFoundException() {
+    super();
+  }
+
+  /**
+   * Constructs an {@code LeaseNotFoundException} with the specified
+   * detail message.
+   *
+   * @param message
+   *        The detail message (which is saved for later retrieval
+   *        by the {@link #getMessage()} method)
+   */
+  public LeaseNotFoundException(String message) {
+    super(message);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/package-info.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/package-info.java
new file mode 100644
index 0000000..48ee2e1
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/lease/package-info.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+/**
+ * A generic lease management API which can be used if a service
+ * needs any kind of lease management.
+ */
+
+package org.apache.hadoop.ozone.lease;
+/*
+ This package contains lease management related classes.
+ */
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/package-info.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/package-info.java
new file mode 100644
index 0000000..db399db
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/package-info.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone;
+
+/**
+ This package contains class that support ozone implementation on the datanode
+ side.
+
+ Main parts of ozone on datanode are:
+
+ 1. REST Interface - This code lives under the web directory and listens to the
+ WebHDFS port.
+
+ 2. Datanode container classes: This support persistence of ozone objects on
+ datanode. These classes live under container directory.
+
+ 3. Client and Shell: We also support a ozone REST client lib, they are under
+ web/client and web/ozShell.
+
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..fa79341
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java
@@ -0,0 +1,170 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import com.google.common.collect.Sets;
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
+import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
+import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .AllocateScmBlockRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .AllocateScmBlockResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .DeleteKeyBlocksResultProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .DeleteScmKeyBlocksRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .DeleteScmKeyBlocksResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .GetScmBlockLocationsRequestProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .GetScmBlockLocationsResponseProto;
+import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
+    .ScmLocatedBlockProto;
+import org.apache.hadoop.ozone.common.BlockGroup;
+import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is the server-side translator that forwards requests received on
+ * {@link StorageContainerLocationProtocolPB} to the
+ * {@link StorageContainerLocationProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public final class ScmBlockLocationProtocolServerSideTranslatorPB
+    implements ScmBlockLocationProtocolPB {
+
+  private final ScmBlockLocationProtocol impl;
+
+  /**
+   * Creates a new ScmBlockLocationProtocolServerSideTranslatorPB.
+   *
+   * @param impl {@link ScmBlockLocationProtocol} server implementation
+   */
+  public ScmBlockLocationProtocolServerSideTranslatorPB(
+      ScmBlockLocationProtocol impl) throws IOException {
+    this.impl = impl;
+  }
+
+
+  @Override
+  public GetScmBlockLocationsResponseProto getScmBlockLocations(
+      RpcController controller, GetScmBlockLocationsRequestProto req)
+      throws ServiceException {
+    Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
+        req.getKeysCount());
+    for (String key : req.getKeysList()) {
+      keys.add(key);
+    }
+    final Set<AllocatedBlock> blocks;
+    try {
+      blocks = impl.getBlockLocations(keys);
+    } catch (IOException ex) {
+      throw new ServiceException(ex);
+    }
+    GetScmBlockLocationsResponseProto.Builder resp =
+        GetScmBlockLocationsResponseProto.newBuilder();
+    for (AllocatedBlock block: blocks) {
+      ScmLocatedBlockProto.Builder locatedBlock =
+          ScmLocatedBlockProto.newBuilder()
+              .setKey(block.getKey())
+              .setPipeline(block.getPipeline().getProtobufMessage());
+      resp.addLocatedBlocks(locatedBlock.build());
+    }
+    return resp.build();
+  }
+
+  @Override
+  public AllocateScmBlockResponseProto allocateScmBlock(
+      RpcController controller, AllocateScmBlockRequestProto request)
+      throws ServiceException {
+    try {
+      AllocatedBlock allocatedBlock =
+          impl.allocateBlock(request.getSize(), request.getType(),
+              request.getFactor(), request.getOwner());
+      if (allocatedBlock != null) {
+        return
+            AllocateScmBlockResponseProto.newBuilder()
+                .setKey(allocatedBlock.getKey())
+                .setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
+                .setCreateContainer(allocatedBlock.getCreateContainer())
+                .setErrorCode(AllocateScmBlockResponseProto.Error.success)
+                .build();
+      } else {
+        return AllocateScmBlockResponseProto.newBuilder()
+            .setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
+            .build();
+      }
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks(
+      RpcController controller, DeleteScmKeyBlocksRequestProto req)
+      throws ServiceException {
+    DeleteScmKeyBlocksResponseProto.Builder resp =
+        DeleteScmKeyBlocksResponseProto.newBuilder();
+    try {
+      List<BlockGroup> infoList = req.getKeyBlocksList().stream()
+          .map(BlockGroup::getFromProto).collect(Collectors.toList());
+      final List<DeleteBlockGroupResult> results =
+          impl.deleteKeyBlocks(infoList);
+      for (DeleteBlockGroupResult result: results) {
+        DeleteKeyBlocksResultProto.Builder deleteResult =
+            DeleteKeyBlocksResultProto
+            .newBuilder()
+            .setObjectKey(result.getObjectKey())
+            .addAllBlockResults(result.getBlockResultProtoList());
+        resp.addResults(deleteResult.build());
+      }
+    } catch (IOException ex) {
+      throw new ServiceException(ex);
+    }
+    return resp.build();
+  }
+
+  @Override
+  public HddsProtos.GetScmInfoRespsonseProto getScmInfo(
+      RpcController controller, HddsProtos.GetScmInfoRequestProto req)
+      throws ServiceException {
+    ScmInfo scmInfo;
+    try {
+      scmInfo = impl.getScmInfo();
+    } catch (IOException ex) {
+      throw new ServiceException(ex);
+    }
+    return HddsProtos.GetScmInfoRespsonseProto.newBuilder()
+        .setClusterId(scmInfo.getClusterId())
+        .setScmId(scmInfo.getScmId())
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
new file mode 100644
index 0000000..4974268
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/StorageContainerLocationProtocolServerSideTranslatorPB.java
@@ -0,0 +1,212 @@
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.ozone.protocolPB;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdds.scm.ScmInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
+import 
org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.GetContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.GetContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.PipelineRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.PipelineResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
+import org.apache.hadoop.hdds.protocol.proto
+    .StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
+
+import java.io.IOException;
+import java.util.EnumSet;
+import java.util.List;
+
+/**
+ * This class is the server-side translator that forwards requests received on
+ * {@link StorageContainerLocationProtocolPB} to the
+ * {@link StorageContainerLocationProtocol} server implementation.
+ */
+@InterfaceAudience.Private
+public final class StorageContainerLocationProtocolServerSideTranslatorPB
+    implements StorageContainerLocationProtocolPB {
+
+  private final StorageContainerLocationProtocol impl;
+
+  /**
+   * Creates a new StorageContainerLocationProtocolServerSideTranslatorPB.
+   *
+   * @param impl {@link StorageContainerLocationProtocol} server implementation
+   */
+  public StorageContainerLocationProtocolServerSideTranslatorPB(
+      StorageContainerLocationProtocol impl) throws IOException {
+    this.impl = impl;
+  }
+
+  @Override
+  public ContainerResponseProto allocateContainer(RpcController unused,
+      ContainerRequestProto request) throws ServiceException {
+    try {
+      Pipeline pipeline = impl.allocateContainer(request.getReplicationType(),
+          request.getReplicationFactor(), request.getContainerName(),
+          request.getOwner());
+      return ContainerResponseProto.newBuilder()
+          .setPipeline(pipeline.getProtobufMessage())
+          .setErrorCode(ContainerResponseProto.Error.success)
+          .build();
+
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public GetContainerResponseProto getContainer(
+      RpcController controller, GetContainerRequestProto request)
+      throws ServiceException {
+    try {
+      Pipeline pipeline = impl.getContainer(request.getContainerName());
+      return GetContainerResponseProto.newBuilder()
+          .setPipeline(pipeline.getProtobufMessage())
+          .build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public SCMListContainerResponseProto listContainer(RpcController controller,
+      SCMListContainerRequestProto request) throws ServiceException {
+    try {
+      String startName = null;
+      String prefixName = null;
+      int count = -1;
+
+      // Arguments check.
+      if (request.hasPrefixName()) {
+        // End container name is given.
+        prefixName = request.getPrefixName();
+      }
+      if (request.hasStartName()) {
+        // End container name is given.
+        startName = request.getStartName();
+      }
+
+      count = request.getCount();
+      List<ContainerInfo> containerList =
+          impl.listContainer(startName, prefixName, count);
+      SCMListContainerResponseProto.Builder builder =
+          SCMListContainerResponseProto.newBuilder();
+      for (ContainerInfo container : containerList) {
+        builder.addContainers(container.getProtobuf());
+      }
+      return builder.build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public SCMDeleteContainerResponseProto deleteContainer(
+      RpcController controller, SCMDeleteContainerRequestProto request)
+      throws ServiceException {
+    try {
+      impl.deleteContainer(request.getContainerName());
+      return SCMDeleteContainerResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public StorageContainerLocationProtocolProtos.NodeQueryResponseProto
+      queryNode(RpcController controller,
+      StorageContainerLocationProtocolProtos.NodeQueryRequestProto request)
+      throws ServiceException {
+    try {
+      EnumSet<HddsProtos.NodeState> nodeStateEnumSet = EnumSet.copyOf(request
+          .getQueryList());
+      HddsProtos.NodePool datanodes = impl.queryNode(nodeStateEnumSet,
+          request.getScope(), request.getPoolName());
+      return StorageContainerLocationProtocolProtos
+          .NodeQueryResponseProto.newBuilder()
+          .setDatanodes(datanodes)
+          .build();
+    } catch (Exception e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public ObjectStageChangeResponseProto notifyObjectStageChange(
+      RpcController controller, ObjectStageChangeRequestProto request)
+      throws ServiceException {
+    try {
+      impl.notifyObjectStageChange(request.getType(), request.getName(),
+          request.getOp(), request.getStage());
+      return ObjectStageChangeResponseProto.newBuilder().build();
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+  }
+
+  @Override
+  public PipelineResponseProto allocatePipeline(
+      RpcController controller, PipelineRequestProto request)
+      throws ServiceException {
+    // TODO : Wiring this up requires one more patch.
+    return null;
+  }
+
+  @Override
+  public HddsProtos.GetScmInfoRespsonseProto getScmInfo(
+      RpcController controller, HddsProtos.GetScmInfoRequestProto req)
+      throws ServiceException {
+    try {
+      ScmInfo scmInfo = impl.getScmInfo();
+      return HddsProtos.GetScmInfoRespsonseProto.newBuilder()
+          .setClusterId(scmInfo.getClusterId())
+          .setScmId(scmInfo.getScmId())
+          .build();
+    } catch (IOException ex) {
+      throw new ServiceException(ex);
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java
new file mode 100644
index 0000000..860386d
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/package-info.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.protocolPB;
+
+/**
+ * This package contains classes for the Protocol Buffers binding of Ozone
+ * protocols.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/web/utils/JsonUtils.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/web/utils/JsonUtils.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/web/utils/JsonUtils.java
new file mode 100644
index 0000000..af56da3
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/web/utils/JsonUtils.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.ozone.web.utils;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.fasterxml.jackson.databind.ObjectWriter;
+import com.fasterxml.jackson.databind.type.CollectionType;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * JSON Utility functions used in ozone.
+ */
+public final class JsonUtils {
+
+  // Reuse ObjectMapper instance for improving performance.
+  // ObjectMapper is thread safe as long as we always configure instance
+  // before use.
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+  private static final ObjectReader READER = MAPPER.readerFor(Object.class);
+  private static final ObjectWriter WRITTER =
+      MAPPER.writerWithDefaultPrettyPrinter();
+
+  private JsonUtils() {
+    // Never constructed
+  }
+
+  public static String toJsonStringWithDefaultPrettyPrinter(String jsonString)
+      throws IOException {
+    Object json = READER.readValue(jsonString);
+    return WRITTER.writeValueAsString(json);
+  }
+
+  public static String toJsonString(Object obj) throws IOException {
+    return MAPPER.writeValueAsString(obj);
+  }
+
+  /**
+   * Deserialize a list of elements from a given string,
+   * each element in the list is in the given type.
+   *
+   * @param str json string.
+   * @param elementType element type.
+   * @return List of elements of type elementType
+   * @throws IOException
+   */
+  public static List<?> toJsonList(String str, Class<?> elementType)
+      throws IOException {
+    CollectionType type = MAPPER.getTypeFactory()
+        .constructCollectionType(List.class, elementType);
+    return MAPPER.readValue(str, type);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java
new file mode 100644
index 0000000..431da64
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundService.java
@@ -0,0 +1,161 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An abstract class for a background service in ozone.
+ * A background service schedules multiple child tasks in parallel
+ * in a certain period. In each interval, it waits until all the tasks
+ * finish execution and then schedule next interval.
+ */
+public abstract class BackgroundService {
+
+  @VisibleForTesting
+  public static final Logger LOG =
+      LoggerFactory.getLogger(BackgroundService.class);
+
+  // Executor to launch child tasks
+  private final ScheduledExecutorService exec;
+  private final ThreadGroup threadGroup;
+  private final ThreadFactory threadFactory;
+  private final String serviceName;
+  private final long interval;
+  private final long serviceTimeout;
+  private final TimeUnit unit;
+  private final PeriodicalTask service;
+
+  public BackgroundService(String serviceName, long interval,
+      TimeUnit unit, int threadPoolSize, long serviceTimeout) {
+    this.interval = interval;
+    this.unit = unit;
+    this.serviceName = serviceName;
+    this.serviceTimeout = serviceTimeout;
+    threadGroup = new ThreadGroup(serviceName);
+    ThreadFactory tf = r -> new Thread(threadGroup, r);
+    threadFactory = new ThreadFactoryBuilder()
+        .setThreadFactory(tf)
+        .setDaemon(true)
+        .setNameFormat(serviceName + "#%d")
+        .build();
+    exec = Executors.newScheduledThreadPool(threadPoolSize, threadFactory);
+    service = new PeriodicalTask();
+  }
+
+  protected ExecutorService getExecutorService() {
+    return this.exec;
+  }
+
+  @VisibleForTesting
+  public int getThreadCount() {
+    return threadGroup.activeCount();
+  }
+
+  @VisibleForTesting
+  public void triggerBackgroundTaskForTesting() {
+    service.run();
+  }
+
+  // start service
+  public void start() {
+    exec.scheduleWithFixedDelay(service, 0, interval, unit);
+  }
+
+  public abstract BackgroundTaskQueue getTasks();
+
+  /**
+   * Run one or more background tasks concurrently.
+   * Wait until all tasks to return the result.
+   */
+  public class PeriodicalTask implements Runnable {
+    @Override
+    public synchronized void run() {
+      LOG.debug("Running background service : {}", serviceName);
+      BackgroundTaskQueue tasks = getTasks();
+      if (tasks.isEmpty()) {
+        // No task found, or some problems to init tasks
+        // return and retry in next interval.
+        return;
+      }
+
+      LOG.debug("Number of background tasks to execute : {}", tasks.size());
+      CompletionService<BackgroundTaskResult> taskCompletionService =
+          new ExecutorCompletionService<>(exec);
+
+      List<Future<BackgroundTaskResult>> results = Lists.newArrayList();
+      while (tasks.size() > 0) {
+        BackgroundTask task = tasks.poll();
+        Future<BackgroundTaskResult> result =
+            taskCompletionService.submit(task);
+        results.add(result);
+      }
+
+      results.parallelStream().forEach(taskResultFuture -> {
+        try {
+          // Collect task results
+          BackgroundTaskResult result = serviceTimeout > 0
+              ? taskResultFuture.get(serviceTimeout, TimeUnit.MILLISECONDS)
+              : taskResultFuture.get();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("task execution result size {}", result.getSize());
+          }
+        } catch (InterruptedException | ExecutionException e) {
+          LOG.warn(
+              "Background task fails to execute, "
+                  + "retrying in next interval", e);
+        } catch (TimeoutException e) {
+          LOG.warn("Background task executes timed out, "
+              + "retrying in next interval", e);
+        }
+      });
+    }
+  }
+
+  // shutdown and make sure all threads are properly released.
+  public void shutdown() {
+    LOG.info("Shutting down service {}", this.serviceName);
+    exec.shutdown();
+    try {
+      if (!exec.awaitTermination(60, TimeUnit.SECONDS)) {
+        exec.shutdownNow();
+      }
+    } catch (InterruptedException e) {
+      exec.shutdownNow();
+    }
+    if (threadGroup.activeCount() == 0 && !threadGroup.isDestroyed()) {
+      threadGroup.destroy();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
new file mode 100644
index 0000000..47e8ebc
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTask.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import java.util.concurrent.Callable;
+
+/**
+ * A task thread to run by {@link BackgroundService}.
+ */
+public interface BackgroundTask<T> extends Callable<T> {
+
+  int getPriority();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
new file mode 100644
index 0000000..b56ef0c
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskQueue.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import java.util.PriorityQueue;
+
+/**
+ * A priority queue that stores a number of {@link BackgroundTask}.
+ */
+public class BackgroundTaskQueue {
+
+  private final PriorityQueue<BackgroundTask> tasks;
+
+  public BackgroundTaskQueue() {
+    tasks = new PriorityQueue<>((task1, task2)
+        -> task1.getPriority() - task2.getPriority());
+  }
+
+  /**
+   * @return the head task in this queue.
+   */
+  public synchronized BackgroundTask poll() {
+    return tasks.poll();
+  }
+
+  /**
+   * Add a {@link BackgroundTask} to the queue,
+   * the task will be sorted by its priority.
+   *
+   * @param task
+   */
+  public synchronized void add(BackgroundTask task) {
+    tasks.add(task);
+  }
+
+  /**
+   * @return true if the queue contains no task, false otherwise.
+   */
+  public synchronized boolean isEmpty() {
+    return tasks.isEmpty();
+  }
+
+  /**
+   * @return the size of the queue.
+   */
+  public synchronized int size() {
+    return tasks.size();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
new file mode 100644
index 0000000..198300f
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BackgroundTaskResult.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership.  The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations 
under
+ * the License.
+ */
+
+package org.apache.hadoop.utils;
+
+/**
+ * Result of a {@link BackgroundTask}.
+ */
+public interface BackgroundTaskResult {
+
+  /**
+   * Returns the size of entries included in this result.
+   */
+  int getSize();
+
+  /**
+   * An empty task result implementation.
+   */
+  class EmptyTaskResult implements BackgroundTaskResult {
+
+    public static EmptyTaskResult newResult() {
+      return new EmptyTaskResult();
+    }
+
+    @Override
+    public int getSize() {
+      return 0;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java
new file mode 100644
index 0000000..47699eb
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/BatchOperation.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import com.google.common.collect.Lists;
+
+import java.util.List;
+
+/**
+ * An utility class to store a batch of DB write operations.
+ */
+public class BatchOperation {
+
+  /**
+   * Enum for write operations.
+   */
+  public enum Operation {
+    DELETE, PUT
+  }
+
+  private List<SingleOperation> operations =
+      Lists.newArrayList();
+
+  /**
+   * Add a PUT operation into the batch.
+   */
+  public void put(byte[] key, byte[] value) {
+    operations.add(new SingleOperation(Operation.PUT, key, value));
+  }
+
+  /**
+   * Add a DELETE operation into the batch.
+   */
+  public void delete(byte[] key) {
+    operations.add(new SingleOperation(Operation.DELETE, key, null));
+
+  }
+
+  public List<SingleOperation> getOperations() {
+    return operations;
+  }
+
+  /**
+   * A SingleOperation represents a PUT or DELETE operation
+   * and the data the operation needs to manipulates.
+   */
+  public static class SingleOperation {
+
+    private Operation opt;
+    private byte[] key;
+    private byte[] value;
+
+    public SingleOperation(Operation opt, byte[] key, byte[] value) {
+      this.opt = opt;
+      if (key == null) {
+        throw new IllegalArgumentException("key cannot be null");
+      }
+      this.key = key.clone();
+      this.value = value == null ? null : value.clone();
+    }
+
+    public Operation getOpt() {
+      return opt;
+    }
+
+    public byte[] getKey() {
+      return key.clone();
+    }
+
+    public byte[] getValue() {
+      return value == null ? null : value.clone();
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
new file mode 100644
index 0000000..c407398
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/EntryConsumer.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import java.io.IOException;
+
+/**
+ * A consumer for metadata store key-value entries.
+ * Used by {@link MetadataStore} class.
+ */
+@FunctionalInterface
+public interface EntryConsumer {
+
+  /**
+   * Consumes a key and value and produces a boolean result.
+   * @param key key
+   * @param value value
+   * @return a boolean value produced by the consumer
+   * @throws IOException
+   */
+  boolean consume(byte[] key, byte[] value) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
new file mode 100644
index 0000000..83ca83d
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/LevelDBStore.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+import org.fusesource.leveldbjni.JniDBFactory;
+import org.iq80.leveldb.DB;
+import org.iq80.leveldb.DBIterator;
+import org.iq80.leveldb.Options;
+import org.iq80.leveldb.ReadOptions;
+import org.iq80.leveldb.Snapshot;
+import org.iq80.leveldb.WriteBatch;
+import org.iq80.leveldb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * LevelDB interface.
+ */
+public class LevelDBStore implements MetadataStore {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(LevelDBStore.class);
+
+  private DB db;
+  private final File dbFile;
+  private final Options dbOptions;
+  private final WriteOptions writeOptions;
+
+  public LevelDBStore(File dbPath, boolean createIfMissing)
+      throws IOException {
+    dbOptions = new Options();
+    dbOptions.createIfMissing(createIfMissing);
+    this.dbFile = dbPath;
+    this.writeOptions = new WriteOptions().sync(true);
+    openDB(dbPath, dbOptions);
+  }
+
+  /**
+   * Opens a DB file.
+   *
+   * @param dbPath          - DB File path
+   * @throws IOException
+   */
+  public LevelDBStore(File dbPath, Options options)
+      throws IOException {
+    dbOptions = options;
+    this.dbFile = dbPath;
+    this.writeOptions = new WriteOptions().sync(true);
+    openDB(dbPath, dbOptions);
+  }
+
+  private void openDB(File dbPath, Options options) throws IOException {
+    dbPath.getParentFile().mkdirs();
+    db = JniDBFactory.factory.open(dbPath, options);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("LevelDB successfully opened");
+      LOG.debug("[Option] cacheSize = " + options.cacheSize());
+      LOG.debug("[Option] createIfMissing = " + options.createIfMissing());
+      LOG.debug("[Option] blockSize = " + options.blockSize());
+      LOG.debug("[Option] compressionType= " + options.compressionType());
+      LOG.debug("[Option] maxOpenFiles= " + options.maxOpenFiles());
+      LOG.debug("[Option] writeBufferSize= "+ options.writeBufferSize());
+    }
+  }
+
+  /**
+   * Puts a Key into file.
+   *
+   * @param key   - key
+   * @param value - value
+   */
+  @Override
+  public void put(byte[] key, byte[] value) {
+    db.put(key, value, writeOptions);
+  }
+
+  /**
+   * Get Key.
+   *
+   * @param key key
+   * @return value
+   */
+  @Override
+  public byte[] get(byte[] key) {
+    return db.get(key);
+  }
+
+  /**
+   * Delete Key.
+   *
+   * @param key - Key
+   */
+  @Override
+  public void delete(byte[] key) {
+    db.delete(key);
+  }
+
+  /**
+   * Closes the DB.
+   *
+   * @throws IOException
+   */
+  @Override
+  public void close() throws IOException {
+    if (db != null){
+      db.close();
+    }
+  }
+
+  /**
+   * Returns true if the DB is empty.
+   *
+   * @return boolean
+   * @throws IOException
+   */
+  @Override
+  public boolean isEmpty() throws IOException {
+    try (DBIterator iter = db.iterator()) {
+      iter.seekToFirst();
+      boolean hasNext = !iter.hasNext();
+      return hasNext;
+    }
+  }
+
+  /**
+   * Returns the actual levelDB object.
+   * @return DB handle.
+   */
+  public DB getDB() {
+    return db;
+  }
+
+  /**
+   * Returns an iterator on all the key-value pairs in the DB.
+   * @return an iterator on DB entries.
+   */
+  public DBIterator getIterator() {
+    return db.iterator();
+  }
+
+
+  @Override
+  public void destroy() throws IOException {
+    close();
+    JniDBFactory.factory.destroy(dbFile, dbOptions);
+  }
+
+  @Override
+  public ImmutablePair<byte[], byte[]> peekAround(int offset,
+      byte[] from) throws IOException, IllegalArgumentException {
+    try (DBIterator it = db.iterator()) {
+      if (from == null) {
+        it.seekToFirst();
+      } else {
+        it.seek(from);
+      }
+      if (!it.hasNext()) {
+        return null;
+      }
+      switch (offset) {
+      case 0:
+        Entry<byte[], byte[]> current = it.next();
+        return new ImmutablePair<>(current.getKey(), current.getValue());
+      case 1:
+        if (it.next() != null && it.hasNext()) {
+          Entry<byte[], byte[]> next = it.peekNext();
+          return new ImmutablePair<>(next.getKey(), next.getValue());
+        }
+        break;
+      case -1:
+        if (it.hasPrev()) {
+          Entry<byte[], byte[]> prev = it.peekPrev();
+          return new ImmutablePair<>(prev.getKey(), prev.getValue());
+        }
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Position can only be -1, 0 " + "or 1, but found " + offset);
+      }
+    }
+    return null;
+  }
+
+  @Override
+  public void iterate(byte[] from, EntryConsumer consumer)
+      throws IOException {
+    try (DBIterator iter = db.iterator()) {
+      if (from != null) {
+        iter.seek(from);
+      } else {
+        iter.seekToFirst();
+      }
+      while (iter.hasNext()) {
+        Entry<byte[], byte[]> current = iter.next();
+        if (!consumer.consume(current.getKey(),
+            current.getValue())) {
+          break;
+        }
+      }
+    }
+  }
+
+  /**
+   * Compacts the DB by removing deleted keys etc.
+   * @throws IOException if there is an error.
+   */
+  @Override
+  public void compactDB() throws IOException {
+    if(db != null) {
+      // From LevelDB docs : begin == null and end == null means the whole DB.
+      db.compactRange(null, null);
+    }
+  }
+
+  @Override
+  public void writeBatch(BatchOperation operation) throws IOException {
+    List<BatchOperation.SingleOperation> operations =
+        operation.getOperations();
+    if (!operations.isEmpty()) {
+      try (WriteBatch writeBatch = db.createWriteBatch()) {
+        for (BatchOperation.SingleOperation opt : operations) {
+          switch (opt.getOpt()) {
+          case DELETE:
+            writeBatch.delete(opt.getKey());
+            break;
+          case PUT:
+            writeBatch.put(opt.getKey(), opt.getValue());
+            break;
+          default:
+            throw new IllegalArgumentException("Invalid operation "
+                + opt.getOpt());
+          }
+        }
+        db.write(writeBatch);
+      }
+    }
+  }
+
+  @Override
+  public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
+      int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+      throws IOException, IllegalArgumentException {
+    return getRangeKVs(startKey, count, false, filters);
+  }
+
+  @Override
+  public List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
+      int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+      throws IOException, IllegalArgumentException {
+    return getRangeKVs(startKey, count, true, filters);
+  }
+
+  /**
+   * Returns a certain range of key value pairs as a list based on a
+   * startKey or count. Further a {@link MetadataKeyFilter} can be added to
+   * filter keys if necessary. To prevent race conditions while listing
+   * entries, this implementation takes a snapshot and lists the entries from
+   * the snapshot. This may, on the other hand, cause the range result slight
+   * different with actual data if data is updating concurrently.
+   * <p>
+   * If the startKey is specified and found in levelDB, this key and the keys
+   * after this key will be included in the result. If the startKey is null
+   * all entries will be included as long as other conditions are satisfied.
+   * If the given startKey doesn't exist, an empty list will be returned.
+   * <p>
+   * The count argument is to limit number of total entries to return,
+   * the value for count must be an integer greater than 0.
+   * <p>
+   * This method allows to specify one or more {@link MetadataKeyFilter}
+   * to filter keys by certain condition. Once given, only the entries
+   * whose key passes all the filters will be included in the result.
+   *
+   * @param startKey a start key.
+   * @param count max number of entries to return.
+   * @param filters customized one or more {@link MetadataKeyFilter}.
+   * @return a list of entries found in the database or an empty list if the
+   * startKey is invalid.
+   * @throws IOException if there are I/O errors.
+   * @throws IllegalArgumentException if count is less than 0.
+   */
+  private List<Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
+      int count, boolean sequential, MetadataKeyFilter... filters)
+      throws IOException {
+    List<Entry<byte[], byte[]>> result = new ArrayList<>();
+    long start = System.currentTimeMillis();
+    if (count < 0) {
+      throw new IllegalArgumentException(
+          "Invalid count given " + count + ", count must be greater than 0");
+    }
+    Snapshot snapShot = null;
+    DBIterator dbIter = null;
+    try {
+      snapShot = db.getSnapshot();
+      ReadOptions readOptions = new ReadOptions().snapshot(snapShot);
+      dbIter = db.iterator(readOptions);
+      if (startKey == null) {
+        dbIter.seekToFirst();
+      } else {
+        if (db.get(startKey) == null) {
+          // Key not found, return empty list
+          return result;
+        }
+        dbIter.seek(startKey);
+      }
+      while (dbIter.hasNext() && result.size() < count) {
+        byte[] preKey = dbIter.hasPrev() ? dbIter.peekPrev().getKey() : null;
+        byte[] nextKey = dbIter.hasNext() ? dbIter.peekNext().getKey() : null;
+        Entry<byte[], byte[]> current = dbIter.next();
+
+        if (filters == null) {
+          result.add(current);
+        } else {
+          if (Arrays.asList(filters).stream().allMatch(
+              entry -> entry.filterKey(preKey, current.getKey(), nextKey))) {
+            result.add(current);
+          } else {
+            if (result.size() > 0 && sequential) {
+              // if the caller asks for a sequential range of results,
+              // and we met a dis-match, abort iteration from here.
+              // if result is empty, we continue to look for the first match.
+              break;
+            }
+          }
+        }
+      }
+    } finally {
+      if (snapShot != null) {
+        snapShot.close();
+      }
+      if (dbIter != null) {
+        dbIter.close();
+      }
+      if (LOG.isDebugEnabled()) {
+        if (filters != null) {
+          for (MetadataKeyFilters.MetadataKeyFilter filter : filters) {
+            int scanned = filter.getKeysScannedNum();
+            int hinted = filter.getKeysHintedNum();
+            if (scanned > 0 || hinted > 0) {
+              LOG.debug(
+                  "getRangeKVs ({}) numOfKeysScanned={}, numOfKeysHinted={}",
+                  filter.getClass().getSimpleName(), 
filter.getKeysScannedNum(),
+                  filter.getKeysHintedNum());
+            }
+          }
+        }
+        long end = System.currentTimeMillis();
+        long timeConsumed = end - start;
+        LOG.debug("Time consumed for getRangeKVs() is {}ms,"
+            + " result length is {}.", timeConsumed, result.size());
+      }
+    }
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
new file mode 100644
index 0000000..3ff0a94
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataKeyFilters.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.hadoop.utils;
+
+import com.google.common.base.Strings;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.ozone.OzoneConsts;
+
+/**
+ * An utility class to filter levelDB keys.
+ */
+public final class MetadataKeyFilters {
+
+  private static KeyPrefixFilter deletingKeyFilter =
+      new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX);
+
+  private static KeyPrefixFilter normalKeyFilter =
+      new MetadataKeyFilters.KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX,
+          true);
+
+  private MetadataKeyFilters() {
+  }
+
+  public static KeyPrefixFilter getDeletingKeyFilter() {
+    return deletingKeyFilter;
+  }
+
+  public static KeyPrefixFilter getNormalKeyFilter() {
+    return normalKeyFilter;
+  }
+  /**
+   * Interface for levelDB key filters.
+   */
+  public interface MetadataKeyFilter {
+    /**
+     * Filter levelDB key with a certain condition.
+     *
+     * @param preKey     previous key.
+     * @param currentKey current key.
+     * @param nextKey    next key.
+     * @return true if a certain condition satisfied, return false otherwise.
+     */
+    boolean filterKey(byte[] preKey, byte[] currentKey, byte[] nextKey);
+
+    default int getKeysScannedNum() {
+      return 0;
+    }
+
+    default int getKeysHintedNum() {
+      return 0;
+    }
+  }
+
+  /**
+   * Utility class to filter key by a string prefix. This filter
+   * assumes keys can be parsed to a string.
+   */
+  public static class KeyPrefixFilter implements MetadataKeyFilter {
+
+    private String keyPrefix = null;
+    private int keysScanned = 0;
+    private int keysHinted = 0;
+    private Boolean negative;
+
+    public KeyPrefixFilter(String keyPrefix) {
+      this(keyPrefix, false);
+    }
+
+    public KeyPrefixFilter(String keyPrefix, boolean negative) {
+      this.keyPrefix = keyPrefix;
+      this.negative = negative;
+    }
+
+    @Override
+    public boolean filterKey(byte[] preKey, byte[] currentKey,
+        byte[] nextKey) {
+      keysScanned++;
+      boolean accept = false;
+      if (Strings.isNullOrEmpty(keyPrefix)) {
+        accept = true;
+      } else {
+        if (currentKey != null &&
+            DFSUtil.bytes2String(currentKey).startsWith(keyPrefix)) {
+          keysHinted++;
+          accept = true;
+        } else {
+          accept = false;
+        }
+      }
+      return (negative) ? !accept : accept;
+    }
+
+    @Override
+    public int getKeysScannedNum() {
+      return keysScanned;
+    }
+
+    @Override
+    public int getKeysHintedNum() {
+      return keysHinted;
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to