http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java
new file mode 100644
index 0000000..b90b08f
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStore.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Interface for key-value store that stores ozone metadata.
+ * Ozone metadata is stored as key value pairs, both key and value
+ * are arbitrary byte arrays.
+ */
+@InterfaceStability.Evolving
+public interface MetadataStore extends Closeable{
+
+  /**
+   * Puts a key-value pair into the store.
+   *
+   * @param key metadata key
+   * @param value metadata value
+   */
+  void put(byte[] key, byte[] value) throws IOException;
+
+  /**
+   * @return true if the metadata store is empty.
+   *
+   * @throws IOException
+   */
+  boolean isEmpty() throws IOException;
+
+  /**
+   * Returns the value mapped to the given key in byte array.
+   *
+   * @param key metadata key
+   * @return value in byte array
+   * @throws IOException
+   */
+  byte[] get(byte[] key) throws IOException;
+
+  /**
+   * Deletes a key from the metadata store.
+   *
+   * @param key metadata key
+   * @throws IOException
+   */
+  void delete(byte[] key) throws IOException;
+
+  /**
+   * Returns a certain range of key value pairs as a list based on a
+   * startKey or count. Further a {@link MetadataKeyFilter} can be added to
+   * filter keys if necessary. To prevent race conditions while listing
+   * entries, this implementation takes a snapshot and lists the entries from
+   * the snapshot. This may, on the other hand, cause the range result slight
+   * different with actual data if data is updating concurrently.
+   * <p>
+   * If the startKey is specified and found in levelDB, this key and the keys
+   * after this key will be included in the result. If the startKey is null
+   * all entries will be included as long as other conditions are satisfied.
+   * If the given startKey doesn't exist and empty list will be returned.
+   * <p>
+   * The count argument is to limit number of total entries to return,
+   * the value for count must be an integer greater than 0.
+   * <p>
+   * This method allows to specify one or more {@link MetadataKeyFilter}
+   * to filter keys by certain condition. Once given, only the entries
+   * whose key passes all the filters will be included in the result.
+   *
+   * @param startKey a start key.
+   * @param count max number of entries to return.
+   * @param filters customized one or more {@link MetadataKeyFilter}.
+   * @return a list of entries found in the database or an empty list if the
+   * startKey is invalid.
+   * @throws IOException if there are I/O errors.
+   * @throws IllegalArgumentException if count is less than 0.
+   */
+  List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
+      int count, MetadataKeyFilter... filters)
+      throws IOException, IllegalArgumentException;
+
+  /**
+   * This method is very similar to {@link #getRangeKVs}, the only
+   * different is this method is supposed to return a sequential range
+   * of elements based on the filters. While iterating the elements,
+   * if it met any entry that cannot pass the filter, the iterator will stop
+   * from this point without looking for next match. If no filter is given,
+   * this method behaves just like {@link #getRangeKVs}.
+   *
+   * @param startKey a start key.
+   * @param count max number of entries to return.
+   * @param filters customized one or more {@link MetadataKeyFilter}.
+   * @return a list of entries found in the database.
+   * @throws IOException
+   * @throws IllegalArgumentException
+   */
+  List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
+      int count, MetadataKeyFilter... filters)
+      throws IOException, IllegalArgumentException;
+
+  /**
+   * A batch of PUT, DELETE operations handled as a single atomic write.
+   *
+   * @throws IOException write fails
+   */
+  void writeBatch(BatchOperation operation) throws IOException;
+
+  /**
+   * Compact the entire database.
+   * @throws IOException
+   */
+  void compactDB() throws IOException;
+
+  /**
+   * Destroy the content of the specified database,
+   * a destroyed database will not be able to load again.
+   * Be very careful with this method.
+   *
+   * @throws IOException if I/O error happens
+   */
+  void destroy() throws IOException;
+
+  /**
+   * Seek the database to a certain key, returns the key-value
+   * pairs around this key based on the given offset. Note, this method
+   * can only support offset -1 (left), 0 (current) and 1 (right),
+   * any other offset given will cause a {@link IllegalArgumentException}.
+   *
+   * @param offset offset to the key
+   * @param from from which key
+   * @return a key-value pair
+   * @throws IOException
+   */
+  ImmutablePair<byte[], byte[]> peekAround(int offset, byte[] from)
+      throws IOException, IllegalArgumentException;
+
+  /**
+   * Iterates entries in the database from a certain key.
+   * Applies the given {@link EntryConsumer} to the key and value of
+   * each entry, the function produces a boolean result which is used
+   * as the criteria to exit from iteration.
+   *
+   * @param from the start key
+   * @param consumer
+   *   a {@link EntryConsumer} applied to each key and value. If the consumer
+   *   returns true, continues the iteration to next entry; otherwise exits
+   *   the iteration.
+   * @throws IOException
+   */
+  void iterate(byte[] from, EntryConsumer consumer)
+      throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
new file mode 100644
index 0000000..9e9c32a
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/MetadataStoreBuilder.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.iq80.leveldb.Options;
+import org.rocksdb.BlockBasedTableConfig;
+import org.rocksdb.Statistics;
+import org.rocksdb.StatsLevel;
+
+import java.io.File;
+import java.io.IOException;
+
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_METADATA_STORE_IMPL_LEVELDB;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_METADATA_STORE_IMPL_ROCKSDB;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_METADATA_STORE_ROCKSDB_STATISTICS;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT;
+import static org.apache.hadoop.ozone.OzoneConfigKeys
+    .OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF;
+
+/**
+ * Builder for metadata store.
+ */
+public class MetadataStoreBuilder {
+
+  private File dbFile;
+  private long cacheSize;
+  private boolean createIfMissing = true;
+  private Configuration conf;
+
+  public static MetadataStoreBuilder newBuilder() {
+    return new MetadataStoreBuilder();
+  }
+
+  public MetadataStoreBuilder setDbFile(File dbPath) {
+    this.dbFile = dbPath;
+    return this;
+  }
+
+  public MetadataStoreBuilder setCacheSize(long cache) {
+    this.cacheSize = cache;
+    return this;
+  }
+
+  public MetadataStoreBuilder setCreateIfMissing(boolean doCreate) {
+    this.createIfMissing = doCreate;
+    return this;
+  }
+
+  public MetadataStoreBuilder setConf(Configuration configuration) {
+    this.conf = configuration;
+    return this;
+  }
+
+  public MetadataStore build() throws IOException {
+    if (dbFile == null) {
+      throw new IllegalArgumentException("Failed to build metadata store, "
+          + "dbFile is required but not found");
+    }
+
+    // Build db store based on configuration
+    MetadataStore store = null;
+    String impl = conf == null ?
+        OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT :
+        conf.getTrimmed(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
+            OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_DEFAULT);
+    if (OZONE_METADATA_STORE_IMPL_LEVELDB.equals(impl)) {
+      Options options = new Options();
+      options.createIfMissing(createIfMissing);
+      if (cacheSize > 0) {
+        options.cacheSize(cacheSize);
+      }
+      store = new LevelDBStore(dbFile, options);
+    } else if (OZONE_METADATA_STORE_IMPL_ROCKSDB.equals(impl)) {
+      org.rocksdb.Options opts = new org.rocksdb.Options();
+      opts.setCreateIfMissing(createIfMissing);
+
+      if (cacheSize > 0) {
+        BlockBasedTableConfig tableConfig = new BlockBasedTableConfig();
+        tableConfig.setBlockCacheSize(cacheSize);
+        opts.setTableFormatConfig(tableConfig);
+      }
+
+      String rocksDbStat = conf == null ?
+          OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT :
+          conf.getTrimmed(OZONE_METADATA_STORE_ROCKSDB_STATISTICS,
+              OZONE_METADATA_STORE_ROCKSDB_STATISTICS_DEFAULT);
+
+      if (!rocksDbStat.equals(OZONE_METADATA_STORE_ROCKSDB_STATISTICS_OFF)) {
+        Statistics statistics = new Statistics();
+        statistics.setStatsLevel(StatsLevel.valueOf(rocksDbStat));
+        opts = opts.setStatistics(statistics);
+
+      }
+      store = new RocksDBStore(dbFile, opts);
+    } else {
+      throw new IllegalArgumentException("Invalid argument for "
+          + OzoneConfigKeys.OZONE_METADATA_STORE_IMPL
+          + ". Expecting " + OZONE_METADATA_STORE_IMPL_LEVELDB
+          + " or " + OZONE_METADATA_STORE_IMPL_ROCKSDB
+          + ", but met " + impl);
+    }
+    return store;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
new file mode 100644
index 0000000..a60e98d
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStore.java
@@ -0,0 +1,382 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.metrics2.util.MBeans;
+import org.apache.ratis.shaded.com.google.common.annotations.VisibleForTesting;
+import org.rocksdb.DbPath;
+import org.rocksdb.Options;
+import org.rocksdb.RocksDB;
+import org.rocksdb.RocksDBException;
+import org.rocksdb.RocksIterator;
+import org.rocksdb.WriteBatch;
+import org.rocksdb.WriteOptions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.IOException;
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * RocksDB implementation of ozone metadata store.
+ */
+public class RocksDBStore implements MetadataStore {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(RocksDBStore.class);
+
+  private RocksDB db = null;
+  private File dbLocation;
+  private WriteOptions writeOptions;
+  private Options dbOptions;
+  private ObjectName statMBeanName;
+
+  public RocksDBStore(File dbFile, Options options)
+      throws IOException {
+    Preconditions.checkNotNull(dbFile, "DB file location cannot be null");
+    RocksDB.loadLibrary();
+    dbOptions = options;
+    dbLocation = dbFile;
+    writeOptions = new WriteOptions();
+    try {
+
+      db = RocksDB.open(dbOptions, dbLocation.getAbsolutePath());
+      if (dbOptions.statistics() != null) {
+
+        Map<String, String> jmxProperties = new HashMap<String, String>();
+        jmxProperties.put("dbName", dbFile.getName());
+        statMBeanName = MBeans.register("Ozone", "RocksDbStore", jmxProperties,
+            new RocksDBStoreMBean(dbOptions.statistics()));
+        if (statMBeanName == null) {
+          LOG.warn("jmx registration failed during RocksDB init, db path :{}",
+              dbFile.getAbsolutePath());
+        }
+      }
+    } catch (RocksDBException e) {
+      throw new IOException(
+          "Failed init RocksDB, db path : " + dbFile.getAbsolutePath(), e);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("RocksDB successfully opened.");
+      LOG.debug("[Option] dbLocation= {}", dbLocation.getAbsolutePath());
+      LOG.debug("[Option] createIfMissing = {}", options.createIfMissing());
+      LOG.debug("[Option] compactionPriority= {}", options.compactionStyle());
+      LOG.debug("[Option] compressionType= {}", options.compressionType());
+      LOG.debug("[Option] maxOpenFiles= {}", options.maxOpenFiles());
+      LOG.debug("[Option] writeBufferSize= {}", options.writeBufferSize());
+    }
+  }
+
+  private IOException toIOException(String msg, RocksDBException e) {
+    String statusCode = e.getStatus() == null ? "N/A" :
+        e.getStatus().getCodeString();
+    String errMessage = e.getMessage() == null ? "Unknown error" :
+        e.getMessage();
+    String output = msg + "; status : " + statusCode
+        + "; message : " + errMessage;
+    return new IOException(output, e);
+  }
+
+  @Override
+  public void put(byte[] key, byte[] value) throws IOException {
+    try {
+      db.put(writeOptions, key, value);
+    } catch (RocksDBException e) {
+      throw toIOException("Failed to put key-value to metadata store", e);
+    }
+  }
+
+  @Override
+  public boolean isEmpty() throws IOException {
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      it.seekToFirst();
+      return !it.isValid();
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+    }
+  }
+
+  @Override
+  public byte[] get(byte[] key) throws IOException {
+    try {
+      return db.get(key);
+    } catch (RocksDBException e) {
+      throw toIOException("Failed to get the value for the given key", e);
+    }
+  }
+
+  @Override
+  public void delete(byte[] key) throws IOException {
+    try {
+      db.delete(key);
+    } catch (RocksDBException e) {
+      throw toIOException("Failed to delete the given key", e);
+    }
+  }
+
+  @Override
+  public List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
+      int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+      throws IOException, IllegalArgumentException {
+    return getRangeKVs(startKey, count, false, filters);
+  }
+
+  @Override
+  public List<Map.Entry<byte[], byte[]>> getSequentialRangeKVs(byte[] startKey,
+      int count, MetadataKeyFilters.MetadataKeyFilter... filters)
+      throws IOException, IllegalArgumentException {
+    return getRangeKVs(startKey, count, true, filters);
+  }
+
+  private List<Map.Entry<byte[], byte[]>> getRangeKVs(byte[] startKey,
+      int count, boolean sequential,
+      MetadataKeyFilters.MetadataKeyFilter... filters)
+      throws IOException, IllegalArgumentException {
+    List<Map.Entry<byte[], byte[]>> result = new ArrayList<>();
+    long start = System.currentTimeMillis();
+    if (count < 0) {
+      throw new IllegalArgumentException(
+          "Invalid count given " + count + ", count must be greater than 0");
+    }
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      if (startKey == null) {
+        it.seekToFirst();
+      } else {
+        if(get(startKey) == null) {
+          // Key not found, return empty list
+          return result;
+        }
+        it.seek(startKey);
+      }
+      while(it.isValid() && result.size() < count) {
+        byte[] currentKey = it.key();
+        byte[] currentValue = it.value();
+
+        it.prev();
+        final byte[] prevKey = it.isValid() ? it.key() : null;
+
+        it.seek(currentKey);
+        it.next();
+        final byte[] nextKey = it.isValid() ? it.key() : null;
+
+        if (filters == null) {
+          result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey,
+              currentValue));
+        } else {
+          if (Arrays.asList(filters).stream()
+              .allMatch(entry -> entry.filterKey(prevKey,
+                  currentKey, nextKey))) {
+            result.add(new AbstractMap.SimpleImmutableEntry<>(currentKey,
+                currentValue));
+          } else {
+            if (result.size() > 0 && sequential) {
+              // if the caller asks for a sequential range of results,
+              // and we met a dis-match, abort iteration from here.
+              // if result is empty, we continue to look for the first match.
+              break;
+            }
+          }
+        }
+      }
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+      long end = System.currentTimeMillis();
+      long timeConsumed = end - start;
+      if (LOG.isDebugEnabled()) {
+        if (filters != null) {
+          for (MetadataKeyFilters.MetadataKeyFilter filter : filters) {
+            int scanned = filter.getKeysScannedNum();
+            int hinted = filter.getKeysHintedNum();
+            if (scanned > 0 || hinted > 0) {
+              LOG.debug(
+                  "getRangeKVs ({}) numOfKeysScanned={}, numOfKeysHinted={}",
+                  filter.getClass().getSimpleName(), 
filter.getKeysScannedNum(),
+                  filter.getKeysHintedNum());
+            }
+          }
+        }
+        LOG.debug("Time consumed for getRangeKVs() is {}ms,"
+            + " result length is {}.", timeConsumed, result.size());
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public void writeBatch(BatchOperation operation)
+      throws IOException {
+    List<BatchOperation.SingleOperation> operations =
+        operation.getOperations();
+    if (!operations.isEmpty()) {
+      try (WriteBatch writeBatch = new WriteBatch()) {
+        for (BatchOperation.SingleOperation opt : operations) {
+          switch (opt.getOpt()) {
+          case DELETE:
+            writeBatch.remove(opt.getKey());
+            break;
+          case PUT:
+            writeBatch.put(opt.getKey(), opt.getValue());
+            break;
+          default:
+            throw new IllegalArgumentException("Invalid operation "
+                + opt.getOpt());
+          }
+        }
+        db.write(writeOptions, writeBatch);
+      } catch (RocksDBException e) {
+        throw toIOException("Batch write operation failed", e);
+      }
+    }
+  }
+
+  @Override
+  public void compactDB() throws IOException {
+    if (db != null) {
+      try {
+        db.compactRange();
+      } catch (RocksDBException e) {
+        throw toIOException("Failed to compact db", e);
+      }
+    }
+  }
+
+  private void deleteQuietly(File fileOrDir) {
+    if (fileOrDir != null && fileOrDir.exists()) {
+      try {
+        FileUtils.forceDelete(fileOrDir);
+      } catch (IOException e) {
+        LOG.warn("Failed to delete dir {}", fileOrDir.getAbsolutePath(), e);
+      }
+    }
+  }
+
+  @Override
+  public void destroy() throws IOException {
+    // Make sure db is closed.
+    close();
+
+    // There is no destroydb java API available,
+    // equivalently we can delete all db directories.
+    deleteQuietly(dbLocation);
+    deleteQuietly(new File(dbOptions.dbLogDir()));
+    deleteQuietly(new File(dbOptions.walDir()));
+    List<DbPath> dbPaths = dbOptions.dbPaths();
+    if (dbPaths != null) {
+      dbPaths.forEach(dbPath -> {
+        deleteQuietly(new File(dbPath.toString()));
+      });
+    }
+  }
+
+  @Override
+  public ImmutablePair<byte[], byte[]> peekAround(int offset,
+      byte[] from) throws IOException, IllegalArgumentException {
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      if (from == null) {
+        it.seekToFirst();
+      } else {
+        it.seek(from);
+      }
+      if (!it.isValid()) {
+        return null;
+      }
+
+      switch (offset) {
+      case 0:
+        break;
+      case 1:
+        it.next();
+        break;
+      case -1:
+        it.prev();
+        break;
+      default:
+        throw new IllegalArgumentException(
+            "Position can only be -1, 0 " + "or 1, but found " + offset);
+      }
+      return it.isValid() ? new ImmutablePair<>(it.key(), it.value()) : null;
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+    }
+  }
+
+  @Override
+  public void iterate(byte[] from, EntryConsumer consumer)
+      throws IOException {
+    RocksIterator it = null;
+    try {
+      it = db.newIterator();
+      if (from != null) {
+        it.seek(from);
+      } else {
+        it.seekToFirst();
+      }
+      while (it.isValid()) {
+        if (!consumer.consume(it.key(), it.value())) {
+          break;
+        }
+        it.next();
+      }
+    } finally {
+      if (it != null) {
+        it.close();
+      }
+    }
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (statMBeanName != null) {
+      MBeans.unregister(statMBeanName);
+    }
+    if (db != null) {
+      db.close();
+    }
+
+  }
+
+  @VisibleForTesting
+  protected ObjectName getStatMBeanName() {
+    return statMBeanName;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStoreMBean.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStoreMBean.java
 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStoreMBean.java
new file mode 100644
index 0000000..88c093e
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/RocksDBStoreMBean.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.hadoop.utils;
+
+import org.rocksdb.HistogramData;
+import org.rocksdb.HistogramType;
+import org.rocksdb.Statistics;
+import org.rocksdb.TickerType;
+
+import javax.management.Attribute;
+import javax.management.AttributeList;
+import javax.management.AttributeNotFoundException;
+import javax.management.DynamicMBean;
+import javax.management.InvalidAttributeValueException;
+import javax.management.MBeanAttributeInfo;
+import javax.management.MBeanException;
+import javax.management.MBeanInfo;
+import javax.management.ReflectionException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Adapter JMX bean to publish all the Rocksdb metrics.
+ */
+public class RocksDBStoreMBean implements DynamicMBean {
+
+  private Statistics statistics;
+
+  private Set<String> histogramAttributes = new HashSet<>();
+
+  public RocksDBStoreMBean(Statistics statistics) {
+    this.statistics = statistics;
+    histogramAttributes.add("Average");
+    histogramAttributes.add("Median");
+    histogramAttributes.add("Percentile95");
+    histogramAttributes.add("Percentile99");
+    histogramAttributes.add("StandardDeviation");
+  }
+
+  @Override
+  public Object getAttribute(String attribute)
+      throws AttributeNotFoundException, MBeanException, ReflectionException {
+    for (String histogramAttribute : histogramAttributes) {
+      if (attribute.endsWith("_" + histogramAttribute.toUpperCase())) {
+        String keyName = attribute
+            .substring(0, attribute.length() - histogramAttribute.length() - 
1);
+        try {
+          HistogramData histogram =
+              statistics.getHistogramData(HistogramType.valueOf(keyName));
+          try {
+            Method method =
+                HistogramData.class.getMethod("get" + histogramAttribute);
+            return method.invoke(histogram);
+          } catch (Exception e) {
+            throw new ReflectionException(e,
+                "Can't read attribute " + attribute);
+          }
+        } catch (IllegalArgumentException exception) {
+          throw new AttributeNotFoundException(
+              "No such attribute in RocksDB stats: " + attribute);
+        }
+      }
+    }
+    try {
+      return statistics.getTickerCount(TickerType.valueOf(attribute));
+    } catch (IllegalArgumentException ex) {
+      throw new AttributeNotFoundException(
+          "No such attribute in RocksDB stats: " + attribute);
+    }
+  }
+
+  @Override
+  public void setAttribute(Attribute attribute)
+      throws AttributeNotFoundException, InvalidAttributeValueException,
+      MBeanException, ReflectionException {
+
+  }
+
+  @Override
+  public AttributeList getAttributes(String[] attributes) {
+    AttributeList result = new AttributeList();
+    for (String attributeName : attributes) {
+      try {
+        Object value = getAttribute(attributeName);
+        result.add(value);
+      } catch (Exception e) {
+        //TODO
+      }
+    }
+    return result;
+  }
+
+  @Override
+  public AttributeList setAttributes(AttributeList attributes) {
+    return null;
+  }
+
+  @Override
+  public Object invoke(String actionName, Object[] params, String[] signature)
+      throws MBeanException, ReflectionException {
+    return null;
+  }
+
+  @Override
+  public MBeanInfo getMBeanInfo() {
+
+    List<MBeanAttributeInfo> attributes = new ArrayList<>();
+    for (TickerType tickerType : TickerType.values()) {
+      attributes.add(new MBeanAttributeInfo(tickerType.name(), "long",
+          "RocksDBStat: " + tickerType.name(), true, false, false));
+    }
+    for (HistogramType histogramType : HistogramType.values()) {
+      for (String histogramAttribute : histogramAttributes) {
+        attributes.add(new MBeanAttributeInfo(
+            histogramType.name() + "_" + histogramAttribute.toUpperCase(),
+            "long", "RocksDBStat: " + histogramType.name(), true, false,
+            false));
+      }
+    }
+
+    return new MBeanInfo("", "RocksDBStat",
+        attributes.toArray(new MBeanAttributeInfo[0]), null, null, null);
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/package-info.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/package-info.java
new file mode 100644
index 0000000..4466337
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/utils/package-info.java
@@ -0,0 +1,18 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.utils;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java 
b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
new file mode 100644
index 0000000..3a55831
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/RatisHelper.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package org.apache.ratis;
+
+import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.shaded.com.google.protobuf.ByteString;
+import org.apache.ratis.util.SizeInBytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * Ratis helper methods.
+ */
+public interface RatisHelper {
+  Logger LOG = LoggerFactory.getLogger(RatisHelper.class);
+
+  static String toRaftPeerIdString(DatanodeDetails id) {
+    return id.getUuidString() + "_" + id.getRatisPort();
+  }
+
+  static String toRaftPeerAddressString(DatanodeDetails id) {
+    return id.getIpAddress() + ":" + id.getRatisPort();
+  }
+
+  static RaftPeerId toRaftPeerId(DatanodeDetails id) {
+    return RaftPeerId.valueOf(toRaftPeerIdString(id));
+  }
+
+  static RaftPeer toRaftPeer(DatanodeDetails id) {
+    return new RaftPeer(toRaftPeerId(id), toRaftPeerAddressString(id));
+  }
+
+  static List<RaftPeer> toRaftPeers(Pipeline pipeline) {
+    return toRaftPeers(pipeline.getMachines());
+  }
+
+  static <E extends DatanodeDetails> List<RaftPeer> toRaftPeers(
+      List<E> datanodes) {
+    return datanodes.stream().map(RatisHelper::toRaftPeer)
+        .collect(Collectors.toList());
+  }
+
+  /* TODO: use a dummy id for all groups for the moment.
+   *       It should be changed to a unique id for each group.
+   */
+  RaftGroupId DUMMY_GROUP_ID =
+      RaftGroupId.valueOf(ByteString.copyFromUtf8("AOzoneRatisGroup"));
+
+  RaftGroup EMPTY_GROUP = new RaftGroup(DUMMY_GROUP_ID,
+      Collections.emptyList());
+
+  static RaftGroup emptyRaftGroup() {
+    return EMPTY_GROUP;
+  }
+
+  static RaftGroup newRaftGroup(List<DatanodeDetails> datanodes) {
+    final List<RaftPeer> newPeers = datanodes.stream()
+        .map(RatisHelper::toRaftPeer)
+        .collect(Collectors.toList());
+    return RatisHelper.newRaftGroup(newPeers);
+  }
+
+  static RaftGroup newRaftGroup(Collection<RaftPeer> peers) {
+    return peers.isEmpty()? emptyRaftGroup()
+        : new RaftGroup(DUMMY_GROUP_ID, peers);
+  }
+
+  static RaftGroup newRaftGroup(Pipeline pipeline) {
+    return newRaftGroup(toRaftPeers(pipeline));
+  }
+
+  static RaftClient newRaftClient(RpcType rpcType, Pipeline pipeline) {
+    return newRaftClient(rpcType, toRaftPeerId(pipeline.getLeader()),
+        newRaftGroup(pipeline));
+  }
+
+  static RaftClient newRaftClient(RpcType rpcType, RaftPeer leader) {
+    return newRaftClient(rpcType, leader.getId(),
+        newRaftGroup(new ArrayList<>(Arrays.asList(leader))));
+  }
+
+  static RaftClient newRaftClient(
+      RpcType rpcType, RaftPeerId leader, RaftGroup group) {
+    LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, leader, 
group);
+    final RaftProperties properties = new RaftProperties();
+    RaftConfigKeys.Rpc.setType(properties, rpcType);
+    GrpcConfigKeys.setMessageSizeMax(properties,
+        SizeInBytes.valueOf(OzoneConfigKeys.DFS_CONTAINER_CHUNK_MAX_SIZE));
+
+    return RaftClient.newBuilder()
+        .setRaftGroup(group)
+        .setLeaderId(leader)
+        .setProperties(properties)
+        .build();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/ratis/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/ratis/package-info.java 
b/hadoop-hdds/common/src/main/java/org/apache/ratis/package-info.java
new file mode 100644
index 0000000..c13c20c
--- /dev/null
+++ b/hadoop-hdds/common/src/main/java/org/apache/ratis/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis;
+
+/**
+ * This package contains classes related to Apache Ratis.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java
 
b/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java
new file mode 100644
index 0000000..29242ad
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/ShadedProtoUtil.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package org.apache.ratis.shaded.com.google.protobuf;
+
+/** Utilities for the shaded protobuf in Ratis. */
+public interface ShadedProtoUtil {
+  /**
+   * @param bytes
+   * @return the wrapped shaded {@link ByteString} (no coping).
+   */
+  static ByteString asShadedByteString(byte[] bytes) {
+    return ByteString.wrap(bytes);
+  }
+
+  /**
+   * @param shaded
+   * @return a {@link com.google.protobuf.ByteString} (require coping).
+   */
+  static com.google.protobuf.ByteString asByteString(ByteString shaded) {
+    return com.google.protobuf.ByteString.copyFrom(
+        shaded.asReadOnlyByteBuffer());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java
 
b/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java
new file mode 100644
index 0000000..032dd96
--- /dev/null
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/ratis/shaded/com/google/protobuf/package-info.java
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.ratis.shaded.com.google.protobuf;
+
+/**
+ * This package contains classes related to the shaded protobuf in Apache 
Ratis.
+ */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto 
b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
new file mode 100644
index 0000000..a6270ef
--- /dev/null
+++ b/hadoop-hdds/common/src/main/proto/DatanodeContainerProtocol.proto
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and Unstable.
+ * Please see 
http://hadoop.apache.org/docs/stable/hadoop-project-dist/hadoop-common/InterfaceClassification.html
+ * for what changes are allowed for a *Unstable* .proto interface.
+ */
+
+// This file contains protocol buffers that are used to transfer data
+// to and from the datanode.
+option java_package = "org.apache.hadoop.hdds.protocol.proto";
+option java_outer_classname = "ContainerProtos";
+option java_generate_equals_and_hash = true;
+package hadoop.hdds;
+import "hdfs.proto";
+import "hdds.proto";
+
+/**
+ * Commands that are used to manipulate the state of containers on a datanode.
+ *
+ * These commands allow us to work against the datanode - from
+ * StorageContainer Manager as well as clients.
+ *
+ *  1. CreateContainer - This call is usually made by Storage Container
+ *     manager, when we need to create a new container on a given datanode.
+ *
+ *  2. ReadContainer - Allows end user to stat a container. For example
+ *     this allows us to return the metadata of a container.
+ *
+ *  3. UpdateContainer - Updates a container metadata.
+
+ *  4. DeleteContainer - This call is made to delete a container.
+ *
+ *  5. ListContainer - Returns the list of containers on this
+ *     datanode. This will be used by tests and tools.
+ *
+ *  6. PutKey - Given a valid container, creates a key.
+ *
+ *  7. GetKey - Allows user to read the metadata of a Key.
+ *
+ *  8. DeleteKey - Deletes a given key.
+ *
+ *  9. ListKey - Returns a list of keys that are present inside
+ *      a given container.
+ *
+ *  10. ReadChunk - Allows us to read a chunk.
+ *
+ *  11. DeleteChunk - Delete an unused chunk.
+ *
+ *  12. WriteChunk - Allows us to write a chunk
+ *
+ *  13. ListChunk - Given a Container/Key returns the list of Chunks.
+ *
+ *  14. CompactChunk - Re-writes a chunk based on Offsets.
+ *
+ *  15. PutSmallFile - A single RPC that combines both putKey and WriteChunk.
+ *
+ *  16. GetSmallFile - A single RPC that combines both getKey and ReadChunk.
+ *
+ *  17. CloseContainer - Closes an open container and makes it immutable.
+ *
+ *  18. CopyContainer - Copies a container from a remote machine.
+ */
+
+enum Type {
+  CreateContainer = 1;
+  ReadContainer = 2;
+  UpdateContainer = 3;
+  DeleteContainer = 4;
+  ListContainer = 5;
+
+  PutKey = 6;
+  GetKey = 7;
+  DeleteKey = 8;
+  ListKey = 9;
+
+  ReadChunk = 10;
+  DeleteChunk = 11;
+  WriteChunk = 12;
+  ListChunk = 13;
+  CompactChunk = 14;
+
+  /** Combines Key and Chunk Operation into Single RPC. */
+  PutSmallFile = 15;
+  GetSmallFile = 16;
+  CloseContainer = 17;
+
+}
+
+
+enum Result {
+  SUCCESS = 1;
+  UNSUPPORTED_REQUEST = 2;
+  MALFORMED_REQUEST = 3;
+  CONTAINER_INTERNAL_ERROR = 4;
+  INVALID_CONFIG = 5;
+  INVALID_FILE_HASH_FOUND = 6;
+  CONTAINER_EXISTS = 7;
+  NO_SUCH_ALGORITHM = 8;
+  CONTAINER_NOT_FOUND = 9;
+  IO_EXCEPTION = 10;
+  UNABLE_TO_READ_METADATA_DB = 11;
+  NO_SUCH_KEY = 12;
+  OVERWRITE_FLAG_REQUIRED = 13;
+  UNABLE_TO_FIND_DATA_DIR = 14;
+  INVALID_WRITE_SIZE = 15;
+  CHECKSUM_MISMATCH = 16;
+  UNABLE_TO_FIND_CHUNK = 17;
+  PROTOC_DECODING_ERROR = 18;
+  INVALID_ARGUMENT = 19;
+  PUT_SMALL_FILE_ERROR = 20;
+  GET_SMALL_FILE_ERROR = 21;
+  CLOSED_CONTAINER_IO = 22;
+  ERROR_CONTAINER_NOT_EMPTY = 23;
+  ERROR_IN_COMPACT_DB = 24;
+  UNCLOSED_CONTAINER_IO = 25;
+  DELETE_ON_OPEN_CONTAINER = 26;
+  CLOSED_CONTAINER_RETRY = 27;
+}
+
+message ContainerCommandRequestProto {
+  required Type cmdType = 1; // Type of the command
+
+  // A string that identifies this command, we generate  Trace ID in Ozone
+  // frontend and this allows us to trace that command all over ozone.
+  optional string traceID = 2;
+
+  // One of the following command is available when the corresponding
+  // cmdType is set. At the protocol level we allow only
+  // one command in each packet.
+  // TODO : Upgrade to Protobuf 2.6 or later.
+  optional   CreateContainerRequestProto createContainer = 3;
+  optional   ReadContainerRequestProto readContainer = 4;
+  optional   UpdateContainerRequestProto updateContainer = 5;
+  optional   DeleteContainerRequestProto deleteContainer = 6;
+  optional   ListContainerRequestProto listContainer = 7;
+
+  optional   PutKeyRequestProto putKey = 8;
+  optional   GetKeyRequestProto getKey = 9;
+  optional   DeleteKeyRequestProto deleteKey = 10;
+  optional   ListKeyRequestProto listKey = 11;
+
+  optional   ReadChunkRequestProto readChunk = 12;
+  optional   WriteChunkRequestProto writeChunk = 13;
+  optional   DeleteChunkRequestProto deleteChunk = 14;
+  optional   ListChunkRequestProto listChunk = 15;
+
+  optional   PutSmallFileRequestProto putSmallFile = 16;
+  optional   GetSmallFileRequestProto getSmallFile = 17;
+  optional   CloseContainerRequestProto closeContainer = 18;
+  required   string datanodeUuid = 19;
+}
+
+message ContainerCommandResponseProto {
+  required Type cmdType = 1;
+  optional string traceID = 2;
+
+  optional   CreateContainerResponseProto createContainer = 3;
+  optional   ReadContainerResponseProto readContainer = 4;
+  optional   UpdateContainerResponseProto updateContainer = 5;
+  optional   DeleteContainerResponseProto deleteContainer = 6;
+  optional   ListContainerResponseProto listContainer = 7;
+
+  optional   PutKeyResponseProto putKey = 8;
+  optional   GetKeyResponseProto getKey = 9;
+  optional   DeleteKeyResponseProto deleteKey = 10;
+  optional   ListKeyResponseProto listKey = 11;
+
+  optional  WriteChunkResponseProto writeChunk = 12;
+  optional  ReadChunkResponseProto readChunk = 13;
+  optional  DeleteChunkResponseProto deleteChunk = 14;
+  optional  ListChunkResponseProto listChunk = 15;
+
+  required Result result = 17;
+  optional string message = 18;
+
+  optional PutSmallFileResponseProto putSmallFile = 19;
+  optional GetSmallFileResponseProto getSmallFile = 20;
+  optional CloseContainerResponseProto closeContainer = 21;
+
+}
+
+message ContainerData {
+  required string name = 1;
+  repeated KeyValue metadata = 2;
+  optional string dbPath = 3;
+  optional string containerPath = 4;
+  optional string hash = 6;
+  optional int64 bytesUsed = 7;
+  optional int64 size = 8;
+  optional int64 keyCount = 9;
+  //TODO: change required after we switch container ID from string to long
+  optional int64 containerID = 10;
+  optional LifeCycleState state = 11 [default = OPEN];
+}
+
+message ContainerMeta {
+  required string fileName = 1;
+  required string hash = 2;
+}
+
+// Container Messages.
+message  CreateContainerRequestProto {
+  required Pipeline pipeline = 1;
+  required ContainerData containerData = 2;
+}
+
+message  CreateContainerResponseProto {
+}
+
+message  ReadContainerRequestProto {
+  required Pipeline pipeline = 1;
+  required string name = 2;
+}
+
+message  ReadContainerResponseProto {
+  optional ContainerData containerData = 2;
+}
+
+message  UpdateContainerRequestProto {
+  required Pipeline pipeline = 1;
+  required ContainerData containerData = 2;
+  optional bool forceUpdate = 3 [default = false];
+}
+
+message  UpdateContainerResponseProto {
+}
+
+message  DeleteContainerRequestProto {
+  required Pipeline pipeline = 1;
+  required string name = 2;
+  optional bool forceDelete = 3 [default = false];
+}
+
+message  DeleteContainerResponseProto {
+}
+
+message  ListContainerRequestProto {
+  required Pipeline pipeline = 1;
+  optional string prefix = 2;
+  required uint32 count = 3; // Max Results to return
+  optional string prevKey = 4;  // if this is not set query from start.
+}
+
+message  ListContainerResponseProto {
+  repeated ContainerData containerData = 1;
+}
+
+message CloseContainerRequestProto {
+  required Pipeline pipeline = 1;
+}
+
+message CloseContainerResponseProto {
+  optional Pipeline pipeline = 1;
+  optional string hash = 2;
+}
+
+message KeyData {
+  required string containerName = 1;
+  required string name = 2;
+  optional int64 flags = 3; // for future use.
+  repeated KeyValue metadata = 4;
+  repeated ChunkInfo chunks = 5;
+}
+
+// Key Messages.
+message  PutKeyRequestProto {
+  required Pipeline pipeline = 1;
+  required KeyData keyData = 2;
+}
+
+message  PutKeyResponseProto {
+}
+
+message  GetKeyRequestProto  {
+  required Pipeline pipeline = 1;
+  required KeyData keyData = 2;
+}
+
+message  GetKeyResponseProto  {
+  required KeyData keyData = 1;
+}
+
+
+message  DeleteKeyRequestProto {
+  required Pipeline pipeline = 1;
+  required string name = 2;
+}
+
+message   DeleteKeyResponseProto {
+}
+
+message  ListKeyRequestProto {
+  required Pipeline pipeline = 1;
+  optional string prefix = 2; // if specified returns keys that match prefix.
+  required string prevKey = 3;
+  required uint32 count = 4;
+
+}
+
+message  ListKeyResponseProto {
+  repeated KeyData keyData = 1;
+}
+
+// Chunk Operations
+
+message ChunkInfo {
+  required string chunkName = 1;
+  required uint64 offset = 2;
+  required uint64 len = 3;
+  optional string checksum = 4;
+  repeated KeyValue metadata = 5;
+}
+
+enum Stage {
+    WRITE_DATA = 1;
+    COMMIT_DATA = 2;
+    COMBINED = 3;
+}
+
+message  WriteChunkRequestProto  {
+  required Pipeline pipeline = 1;
+  required string keyName = 2;
+  required ChunkInfo chunkData = 3;
+  optional bytes data = 4;
+  optional Stage stage = 5 [default = COMBINED];
+}
+
+message  WriteChunkResponseProto {
+}
+
+message  ReadChunkRequestProto  {
+  required Pipeline pipeline = 1;
+  required string keyName = 2;
+  required ChunkInfo chunkData = 3;
+}
+
+message  ReadChunkResponseProto {
+  required Pipeline pipeline = 1;
+  required ChunkInfo chunkData = 2;
+  required bytes data = 3;
+}
+
+message  DeleteChunkRequestProto {
+  required Pipeline pipeline = 1;
+  required string keyName = 2;
+  required ChunkInfo chunkData = 3;
+}
+
+message  DeleteChunkResponseProto {
+}
+
+message  ListChunkRequestProto {
+  required Pipeline pipeline = 1;
+  required string keyName = 2;
+  required string prevChunkName = 3;
+  required uint32 count = 4;
+}
+
+message  ListChunkResponseProto {
+  repeated ChunkInfo chunkData = 1;
+}
+
+/** For small file access combines write chunk and putKey into a single
+RPC */
+
+message PutSmallFileRequestProto {
+  required PutKeyRequestProto key = 1;
+  required ChunkInfo chunkInfo = 2;
+  required bytes data = 3;
+}
+
+
+message PutSmallFileResponseProto {
+
+}
+
+message GetSmallFileRequestProto {
+  required GetKeyRequestProto key = 1;
+}
+
+message GetSmallFileResponseProto {
+  required ReadChunkResponseProto data = 1;
+}
+
+message CopyContainerRequestProto {
+  required string containerName = 1;
+  required uint64 readOffset = 2;
+  optional uint64 len = 3;
+}
+
+message CopyContainerResponseProto {
+  required string archiveName = 1;
+  required uint64 readOffset = 2;
+  required uint64 len = 3;
+  required bool eof = 4;
+  repeated bytes data = 5;
+  optional int64 checksum = 6;
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto 
b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
new file mode 100644
index 0000000..38d2e16
--- /dev/null
+++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.hdds.protocol.proto";
+option java_outer_classname = "ScmBlockLocationProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdds;
+
+import "hdfs.proto";
+import "hdds.proto";
+
+
+// SCM Block protocol
+/**
+ * keys - batch of block keys to find
+ */
+message GetScmBlockLocationsRequestProto {
+  repeated string keys = 1;
+}
+
+/**
+ * locatedBlocks - for each requested hash, nodes that currently host the
+ *     container for that object key hash
+ */
+message GetScmBlockLocationsResponseProto {
+  repeated ScmLocatedBlockProto locatedBlocks = 1;
+}
+
+/**
+ * Holds the nodes that currently host the blocks for a key.
+ */
+message ScmLocatedBlockProto {
+  required string key = 1;
+  required hadoop.hdds.Pipeline pipeline = 2;
+}
+
+/**
+* Request send to SCM asking allocate block of specified size.
+*/
+message AllocateScmBlockRequestProto {
+  required uint64 size = 1;
+  required ReplicationType type = 2;
+  required hadoop.hdds.ReplicationFactor factor = 3;
+  required string owner = 4;
+
+}
+
+/**
+ * A delete key request sent by KSM to SCM, it contains
+ * multiple number of keys (and their blocks).
+ */
+message DeleteScmKeyBlocksRequestProto {
+  repeated KeyBlocks keyBlocks = 1;
+}
+
+/**
+ * A object key and all its associated blocks.
+ * We need to encapsulate object key name plus the blocks in this potocol
+ * because SCM needs to response KSM with the keys it has deleted.
+ * If the response only contains blocks, it will be very expensive for
+ * KSM to figure out what keys have been deleted.
+ */
+message KeyBlocks {
+  required string key = 1;
+  repeated string blocks = 2;
+}
+
+/**
+ * A delete key response from SCM to KSM, it contains multiple child-results.
+ * Each child-result represents a key deletion result, only if all blocks of
+ * a key are successfully deleted, this key result is considered as succeed.
+ */
+message DeleteScmKeyBlocksResponseProto {
+  repeated DeleteKeyBlocksResultProto results = 1;
+}
+
+/**
+ * A key deletion result. It contains all the block deletion results.
+ */
+message DeleteKeyBlocksResultProto {
+  required string objectKey = 1;
+  repeated DeleteScmBlockResult blockResults = 2;
+}
+
+message DeleteScmBlockResult {
+  enum Result {
+    success = 1;
+    chillMode = 2;
+    errorNotFound = 3;
+    unknownFailure = 4;
+  }
+  required Result result = 1;
+  required string key = 2;
+}
+
+/**
+ * Reply from SCM indicating that the container.
+ */
+message AllocateScmBlockResponseProto {
+  enum Error {
+    success = 1;
+    errorNotEnoughSpace = 2;
+    errorSizeTooBig = 3;
+    unknownFailure = 4;
+  }
+  required Error errorCode = 1;
+  required string key = 2;
+  required hadoop.hdds.Pipeline pipeline = 3;
+  required bool createContainer = 4;
+  optional string errorMessage = 5;
+}
+
+/**
+ * Protocol used from KeySpaceManager to StorageContainerManager.
+ * See request and response messages for details of the RPC calls.
+ */
+service ScmBlockLocationProtocolService {
+
+  /**
+   * Find the set of nodes that currently host the block, as
+   * identified by the key.  This method supports batch lookup by
+   * passing multiple keys.
+   */
+  rpc getScmBlockLocations(GetScmBlockLocationsRequestProto)
+      returns (GetScmBlockLocationsResponseProto);
+
+  /**
+   * Creates a block entry in SCM.
+   */
+  rpc allocateScmBlock(AllocateScmBlockRequestProto)
+      returns (AllocateScmBlockResponseProto);
+
+  /**
+   * Deletes blocks for a set of object keys from SCM.
+   */
+  rpc deleteScmKeyBlocks(DeleteScmKeyBlocksRequestProto)
+      returns (DeleteScmKeyBlocksResponseProto);
+
+  /**
+   * Gets the scmInfo from SCM.
+   */
+  rpc getScmInfo(hadoop.hdds.GetScmInfoRequestProto)
+      returns (hadoop.hdds.GetScmInfoRespsonseProto);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto 
b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
new file mode 100644
index 0000000..d7540a3
--- /dev/null
+++ b/hadoop-hdds/common/src/main/proto/StorageContainerLocationProtocol.proto
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.hdds.protocol.proto";
+option java_outer_classname = "StorageContainerLocationProtocolProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdds;
+
+import "hdfs.proto";
+import "hdds.proto";
+
+/**
+* Request send to SCM asking where the container should be created.
+*/
+message ContainerRequestProto {
+  required string containerName = 1;
+  // Ozone only support replciation of either 1 or 3.
+  required ReplicationFactor replicationFactor = 2;
+  required ReplicationType  replicationType = 3;
+  required string owner = 4;
+
+}
+
+/**
+ * Reply from SCM indicating that the container.
+ */
+message ContainerResponseProto {
+  enum Error {
+    success = 1;
+    errorContainerAlreadyExists = 2;
+    errorContainerMissing = 3;
+  }
+  required Error errorCode = 1;
+  required Pipeline pipeline = 2;
+  optional string errorMessage = 3;
+}
+
+message GetContainerRequestProto {
+  required string containerName = 1;
+}
+
+message GetContainerResponseProto {
+  required Pipeline pipeline = 1;
+}
+
+message SCMListContainerRequestProto {
+  required uint32 count = 1;
+  optional string startName = 2;
+  optional string prefixName = 3;
+}
+
+message SCMListContainerResponseProto {
+  repeated SCMContainerInfo containers = 1;
+}
+
+message SCMDeleteContainerRequestProto {
+  required string containerName = 1;
+}
+
+message SCMDeleteContainerResponseProto {
+  // Empty response
+}
+
+message ObjectStageChangeRequestProto {
+  enum Type {
+    container = 1;
+    pipeline = 2;
+  }
+  // delete/copy operation may be added later
+  enum Op {
+    create = 1;
+    close = 2;
+  }
+  enum Stage {
+    begin = 1;
+    complete = 2;
+  }
+  required string name = 1;
+  required Type type = 2;
+  required Op op= 3;
+  required Stage stage = 4;
+}
+
+message ObjectStageChangeResponseProto {
+  // Empty response
+}
+
+/*
+ NodeQueryRequest sends a request to SCM asking to send a list of nodes that
+ match the NodeState that we are requesting.
+*/
+message NodeQueryRequestProto {
+
+
+  // Repeated, So we can specify more than one status type.
+  // These NodeState types are additive for now, in the sense that
+  // if you specify HEALTHY and FREE_NODE members --
+  // Then you get all healthy node which are not raft members.
+  //
+  // if you specify all healthy and dead nodes, you will get nothing
+  // back. Server is not going to dictate what combinations make sense,
+  // it is entirely up to the caller.
+  // TODO: Support operators like OR and NOT. Currently it is always an
+  // implied AND.
+
+  repeated NodeState query = 1;
+  required QueryScope scope = 2;
+  optional string poolName = 3; // if scope is pool, then pool name is needed.
+}
+
+message NodeQueryResponseProto {
+  required NodePool datanodes = 1;
+}
+
+/**
+  Request to create a replication pipeline.
+ */
+message PipelineRequestProto {
+  required ReplicationType replicationType = 1;
+  required ReplicationFactor replicationFactor = 2;
+
+  // if datanodes are specified then pipelines are created using those
+  // datanodes.
+  optional NodePool nodePool = 3;
+  optional string pipelineID = 4;
+}
+
+message  PipelineResponseProto {
+  enum Error {
+    success = 1;
+    errorPipelineAlreadyExists = 2;
+  }
+  required Error errorCode = 1;
+  optional Pipeline  pipeline = 2;
+  optional string errorMessage = 3;
+}
+
+/**
+ * Protocol used from an HDFS node to StorageContainerManager.  See the request
+ * and response messages for details of the RPC calls.
+ */
+service StorageContainerLocationProtocolService {
+
+  /**
+   * Creates a container entry in SCM.
+   */
+  rpc allocateContainer(ContainerRequestProto) returns 
(ContainerResponseProto);
+
+  /**
+   * Returns the pipeline for a given container.
+   */
+  rpc getContainer(GetContainerRequestProto) returns 
(GetContainerResponseProto);
+
+  rpc listContainer(SCMListContainerRequestProto) returns 
(SCMListContainerResponseProto);
+
+  /**
+   * Deletes a container in SCM.
+   */
+  rpc deleteContainer(SCMDeleteContainerRequestProto) returns 
(SCMDeleteContainerResponseProto);
+
+  /**
+  * Returns a set of Nodes that meet a criteria.
+  */
+  rpc queryNode(NodeQueryRequestProto)  returns (NodeQueryResponseProto);
+
+  /**
+  * Notify from client when begin or finish container or pipeline operations 
on datanodes.
+  */
+  rpc notifyObjectStageChange(ObjectStageChangeRequestProto) returns 
(ObjectStageChangeResponseProto);
+
+  /*
+  *  Apis that Manage Pipelines.
+  *
+  * Pipelines are abstractions offered by SCM and Datanode that allows users
+  * to create a replication pipeline.
+  *
+  *  These following APIs allow command line programs like SCM CLI to list
+  * and manage pipelines.
+  */
+
+  /**
+  *  Creates a replication pipeline.
+  */
+  rpc allocatePipeline(PipelineRequestProto)
+      returns (PipelineResponseProto);
+
+  /**
+  *  Returns information about SCM.
+  */
+  rpc getScmInfo(GetScmInfoRequestProto)
+      returns (GetScmInfoRespsonseProto);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/651a05a1/hadoop-hdds/common/src/main/proto/hdds.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdds/common/src/main/proto/hdds.proto 
b/hadoop-hdds/common/src/main/proto/hdds.proto
new file mode 100644
index 0000000..f7b2f72
--- /dev/null
+++ b/hadoop-hdds/common/src/main/proto/hdds.proto
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * These .proto interfaces are private and unstable.
+ * Please see http://wiki.apache.org/hadoop/Compatibility
+ * for what changes are allowed for a *unstable* .proto interface.
+ */
+
+option java_package = "org.apache.hadoop.hdds.protocol.proto";
+option java_outer_classname = "HddsProtos";
+option java_generic_services = true;
+option java_generate_equals_and_hash = true;
+package hadoop.hdds;
+
+message DatanodeDetailsProto {
+    // TODO: make the port as a seperate proto message and use it here
+    required string uuid = 1;  // UUID assigned to the Datanode.
+    required string ipAddress = 2;     // IP address
+    required string hostName = 3;      // hostname
+    optional uint32 infoPort = 4;      // datanode http port
+    optional uint32 infoSecurePort = 5 [default = 0]; // datanode https port
+    optional uint32 containerPort = 6 [default = 0];  // Ozone stand_alone 
protocol
+    optional uint32 ratisPort = 7 [default = 0];      //Ozone ratis port
+    optional uint32 ozoneRestPort = 8 [default = 0];
+}
+
+message PipelineChannel {
+    required string leaderID = 1;
+    repeated DatanodeDetailsProto members = 2;
+    optional LifeCycleState state = 3 [default = OPEN];
+    optional ReplicationType type = 4 [default = STAND_ALONE];
+    optional ReplicationFactor factor = 5 [default = ONE];
+    optional string name = 6;
+}
+
+// A pipeline is composed of PipelineChannel (Ratis/StandAlone) that back a
+// container.
+message Pipeline {
+    required string containerName = 1;
+    required PipelineChannel pipelineChannel = 2;
+}
+
+message KeyValue {
+    required string key = 1;
+    optional string value = 2;
+}
+
+/**
+ * Type of the node.
+ */
+enum NodeType {
+    KSM = 1;
+    SCM = 2;
+    DATANODE = 3;
+}
+
+// Should we rename NodeState to DatanodeState?
+/**
+ * Enum that represents the Node State. This is used in calls to getNodeList
+ * and getNodeCount.
+ */
+enum NodeState {
+    HEALTHY             = 1;
+    STALE               = 2;
+    DEAD                = 3;
+    DECOMMISSIONING     = 4;
+    DECOMMISSIONED      = 5;
+    RAFT_MEMBER         = 6;
+    FREE_NODE           = 7; // Not a member in raft.
+    INVALID             = 8;
+}
+
+enum QueryScope {
+    CLUSTER = 1;
+    POOL = 2;
+}
+
+message Node {
+    required DatanodeDetailsProto nodeID = 1;
+    repeated NodeState nodeStates = 2;
+}
+
+message NodePool {
+    repeated Node nodes = 1;
+}
+
+/**
+ * LifeCycleState for SCM object creation state machine:
+ *    ->Allocated: allocated on SCM but clean has not started creating it yet.
+ *    ->Creating: allocated and assigned to client to create but not ack-ed 
yet.
+ *    ->Open: allocated on SCM and created on datanodes and ack-ed by a client.
+ *    ->Close: container closed due to space all used or error?
+ *    ->Timeout -> container failed to create on datanodes or ack-ed by client.
+ *    ->Deleting(TBD) -> container will be deleted after timeout
+ * 1. ALLOCATE-ed containers on SCM can't serve key/block related operation
+ *    until ACK-ed explicitly which changes the state to OPEN.
+ * 2. Only OPEN/CLOSED containers can serve key/block related operation.
+ * 3. ALLOCATE-ed containers that are not ACK-ed timely will be TIMEOUT and
+ *    CLEANUP asynchronously.
+ */
+
+enum LifeCycleState {
+    ALLOCATED = 1;
+    CREATING = 2; // Used for container allocated/created by different client.
+    OPEN =3; // Mostly an update to SCM via HB or client call.
+    CLOSING = 4;
+    CLOSED = 5; // !!State after this has not been used yet.
+    DELETING = 6;
+    DELETED = 7; // object is deleted.
+}
+
+enum LifeCycleEvent {
+    CREATE = 1; // A request to client to create this object
+    CREATED = 2;
+    FINALIZE = 3;
+    CLOSE = 4; // !!Event after this has not been used yet.
+    UPDATE = 5;
+    TIMEOUT = 6; // creation has timed out from SCM's View.
+    DELETE = 7;
+    CLEANUP = 8;
+}
+
+message SCMContainerInfo {
+    // TODO : Remove the container name from pipeline.
+    required string containerName = 1;
+    required LifeCycleState state = 2;
+    required Pipeline pipeline = 3;
+    // This is not total size of container, but space allocated by SCM for
+    // clients to write blocks
+    required uint64 allocatedBytes = 4;
+    required uint64 usedBytes = 5;
+    required uint64 numberOfKeys = 6;
+    optional int64 stateEnterTime = 7;
+    required string owner = 8;
+    required int64 containerID = 9;
+}
+
+message GetScmInfoRequestProto {
+}
+
+message GetScmInfoRespsonseProto {
+    required string clusterId = 1;
+    required string scmId = 2;
+}
+
+
+enum ReplicationType {
+    RATIS = 1;
+    STAND_ALONE = 2;
+    CHAINED = 3;
+}
+
+enum ReplicationFactor {
+    ONE = 1;
+    THREE = 3;
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to