HDFS-10675. Datanode support to read from external stores.

Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/970028f0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/970028f0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/970028f0

Branch: refs/heads/HDFS-9806
Commit: 970028f04bafd9b2aac52ee8969c42a8fb6f6b25
Parents: 60f95fb
Author: Virajith Jalaparti <viraj...@apache.org>
Authored: Wed Mar 29 14:29:28 2017 -0700
Committer: Virajith Jalaparti <viraj...@apache.org>
Committed: Fri Dec 1 18:16:57 2017 -0800

----------------------------------------------------------------------
 .../java/org/apache/hadoop/fs/StorageType.java  |   3 +-
 .../org/apache/hadoop/fs/shell/TestCount.java   |   3 +-
 .../hadoop/hdfs/protocol/HdfsConstants.java     |   4 +
 .../hadoop/hdfs/protocolPB/PBHelperClient.java  |   4 +
 .../src/main/proto/hdfs.proto                   |   1 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |  15 +
 .../hadoop/hdfs/server/common/BlockAlias.java   |  29 +
 .../hadoop/hdfs/server/common/BlockFormat.java  |  82 +++
 .../hadoop/hdfs/server/common/FileRegion.java   | 121 +++++
 .../hdfs/server/common/FileRegionProvider.java  |  37 ++
 .../hadoop/hdfs/server/common/Storage.java      |  71 ++-
 .../hadoop/hdfs/server/common/StorageInfo.java  |   6 +
 .../server/common/TextFileRegionFormat.java     | 442 ++++++++++++++++
 .../server/common/TextFileRegionProvider.java   |  88 ++++
 .../server/datanode/BlockPoolSliceStorage.java  |  21 +-
 .../hdfs/server/datanode/DataStorage.java       |  44 +-
 .../hdfs/server/datanode/DirectoryScanner.java  |  19 +-
 .../datanode/FinalizedProvidedReplica.java      |  91 ++++
 .../hdfs/server/datanode/ProvidedReplica.java   | 248 +++++++++
 .../hdfs/server/datanode/ReplicaBuilder.java    | 100 +++-
 .../hdfs/server/datanode/ReplicaInfo.java       |  20 +-
 .../hdfs/server/datanode/StorageLocation.java   |  26 +-
 .../server/datanode/fsdataset/FsDatasetSpi.java |   4 +-
 .../server/datanode/fsdataset/FsVolumeSpi.java  |  32 +-
 .../fsdataset/impl/DefaultProvidedVolumeDF.java |  58 ++
 .../datanode/fsdataset/impl/FsDatasetImpl.java  |  40 +-
 .../datanode/fsdataset/impl/FsDatasetUtil.java  |  25 +-
 .../datanode/fsdataset/impl/FsVolumeImpl.java   |  19 +-
 .../fsdataset/impl/FsVolumeImplBuilder.java     |   6 +
 .../fsdataset/impl/ProvidedVolumeDF.java        |  34 ++
 .../fsdataset/impl/ProvidedVolumeImpl.java      | 526 +++++++++++++++++++
 .../apache/hadoop/hdfs/server/mover/Mover.java  |   2 +-
 .../server/namenode/FSImageCompression.java     |   2 +-
 .../hadoop/hdfs/server/namenode/NNStorage.java  |  10 +-
 .../src/main/resources/hdfs-default.xml         |  78 +++
 .../org/apache/hadoop/hdfs/TestDFSRollback.java |   6 +-
 .../hadoop/hdfs/TestDFSStartupVersions.java     |   2 +-
 .../org/apache/hadoop/hdfs/TestDFSUpgrade.java  |   4 +-
 .../apache/hadoop/hdfs/UpgradeUtilities.java    |  16 +-
 .../hdfs/server/common/TestTextBlockFormat.java | 160 ++++++
 .../server/datanode/SimulatedFSDataset.java     |   6 +-
 .../extdataset/ExternalDatasetImpl.java         |   5 +-
 .../fsdataset/impl/TestFsDatasetImpl.java       |  17 +-
 .../fsdataset/impl/TestProvidedImpl.java        | 426 +++++++++++++++
 .../hdfs/server/namenode/TestClusterId.java     |   5 +-
 45 files changed, 2873 insertions(+), 85 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
index 0948801..2ecd206 100644
--- 
a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
+++ 
b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StorageType.java
@@ -37,7 +37,8 @@ public enum StorageType {
   RAM_DISK(true),
   SSD(false),
   DISK(false),
-  ARCHIVE(false);
+  ARCHIVE(false),
+  PROVIDED(false);
 
   private final boolean isTransient;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java
----------------------------------------------------------------------
diff --git 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java
 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java
index a782958..b5adfcf 100644
--- 
a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java
+++ 
b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/shell/TestCount.java
@@ -285,7 +285,7 @@ public class TestCount {
         // <----13---> <-------17------> <----13-----> <------17------->
         "    SSD_QUOTA     REM_SSD_QUOTA    DISK_QUOTA    REM_DISK_QUOTA " +
         // <----13---> <-------17------>
-        "ARCHIVE_QUOTA REM_ARCHIVE_QUOTA " +
+        "ARCHIVE_QUOTA REM_ARCHIVE_QUOTA PROVIDED_QUOTA REM_PROVIDED_QUOTA " +
         "PATHNAME";
     verify(out).println(withStorageTypeHeader);
     verifyNoMoreInteractions(out);
@@ -340,6 +340,7 @@ public class TestCount {
         "    SSD_QUOTA     REM_SSD_QUOTA " +
         "   DISK_QUOTA    REM_DISK_QUOTA " +
         "ARCHIVE_QUOTA REM_ARCHIVE_QUOTA " +
+        "PROVIDED_QUOTA REM_PROVIDED_QUOTA " +
         "PATHNAME";
     verify(out).println(withStorageTypeHeader);
     verifyNoMoreInteractions(out);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
index 8245d1b..e9e6103 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
@@ -47,6 +47,10 @@ public final class HdfsConstants {
   public static final String WARM_STORAGE_POLICY_NAME = "WARM";
   public static final byte COLD_STORAGE_POLICY_ID = 2;
   public static final String COLD_STORAGE_POLICY_NAME = "COLD";
+  // branch HDFS-9806 XXX temporary until HDFS-7076
+  public static final byte PROVIDED_STORAGE_POLICY_ID = 1;
+  public static final String PROVIDED_STORAGE_POLICY_NAME = "PROVIDED";
+
 
   public static final int DEFAULT_DATA_SOCKET_SIZE = 0;
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
index fbc6dbf..460112e 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java
@@ -405,6 +405,8 @@ public class PBHelperClient {
       return StorageTypeProto.ARCHIVE;
     case RAM_DISK:
       return StorageTypeProto.RAM_DISK;
+    case PROVIDED:
+      return StorageTypeProto.PROVIDED;
     default:
       throw new IllegalStateException(
           "BUG: StorageType not found, type=" + type);
@@ -421,6 +423,8 @@ public class PBHelperClient {
       return StorageType.ARCHIVE;
     case RAM_DISK:
       return StorageType.RAM_DISK;
+    case PROVIDED:
+      return StorageType.PROVIDED;
     default:
       throw new IllegalStateException(
           "BUG: StorageTypeProto not found, type=" + type);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
index a423a4b..06578ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto
@@ -205,6 +205,7 @@ enum StorageTypeProto {
   SSD = 2;
   ARCHIVE = 3;
   RAM_DISK = 4;
+  PROVIDED = 5;
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 97b8b1a..ca753ce 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -328,6 +328,21 @@ public class DFSConfigKeys extends CommonConfigurationKeys 
{
       "dfs.namenode.edits.asynclogging";
   public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = true;
 
+  public static final String DFS_PROVIDER_CLASS = "dfs.provider.class";
+  public static final String DFS_PROVIDER_DF_CLASS = "dfs.provided.df.class";
+  public static final String DFS_PROVIDER_STORAGEUUID = 
"dfs.provided.storage.id";
+  public static final String DFS_PROVIDER_STORAGEUUID_DEFAULT =  "DS-PROVIDED";
+  public static final String DFS_PROVIDER_BLK_FORMAT_CLASS = 
"dfs.provided.blockformat.class";
+
+  public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER = 
"dfs.provided.textprovider.delimiter";
+  public static final String DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT = ",";
+
+  public static final String DFS_PROVIDED_BLOCK_MAP_READ_PATH = 
"dfs.provided.textprovider.read.path";
+  public static final String DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT = 
"file:///tmp/blocks.csv";
+
+  public static final String DFS_PROVIDED_BLOCK_MAP_CODEC = 
"dfs.provided.textprovider.read.codec";
+  public static final String DFS_PROVIDED_BLOCK_MAP_WRITE_PATH  = 
"dfs.provided.textprovider.write.path";
+
   public static final String  DFS_LIST_LIMIT = "dfs.ls.limit";
   public static final int     DFS_LIST_LIMIT_DEFAULT = 1000;
   public static final String  DFS_CONTENT_SUMMARY_LIMIT_KEY = 
"dfs.content-summary.limit";

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java
new file mode 100644
index 0000000..b2fac97
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockAlias.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Interface used to load provided blocks.
+ */
+public interface BlockAlias {
+
+  Block getBlock();
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java
new file mode 100644
index 0000000..66e7fdf
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/BlockFormat.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * An abstract class used to read and write block maps for provided blocks.
+ */
+public abstract class BlockFormat<T extends BlockAlias>  {
+
+  /**
+   * An abstract class that is used to read {@link BlockAlias}es
+   * for provided blocks.
+   */
+  public static abstract class Reader<U extends BlockAlias>
+      implements Iterable<U>, Closeable {
+
+    /**
+     * reader options.
+     */
+    public interface Options { }
+
+    public abstract U resolve(Block ident) throws IOException;
+
+  }
+
+  /**
+   * Returns the reader for the provided block map.
+   * @param opts reader options
+   * @return {@link Reader} to the block map.
+   * @throws IOException
+   */
+  public abstract Reader<T> getReader(Reader.Options opts) throws IOException;
+
+  /**
+   * An abstract class used as a writer for the provided block map.
+   */
+  public static abstract class Writer<U extends BlockAlias>
+      implements Closeable {
+    /**
+     * writer options.
+     */
+    public interface Options { }
+
+    public abstract void store(U token) throws IOException;
+
+  }
+
+  /**
+   * Returns the writer for the provided block map.
+   * @param opts writer options.
+   * @return {@link Writer} to the block map.
+   * @throws IOException
+   */
+  public abstract Writer<T> getWriter(Writer.Options opts) throws IOException;
+
+  /**
+   * Refresh based on the underlying block map.
+   * @throws IOException
+   */
+  public abstract void refresh() throws IOException;
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java
new file mode 100644
index 0000000..c568b90
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegion.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.common;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+
+/**
+ * This class is used to represent provided blocks that are file regions,
+ * i.e., can be described using (path, offset, length).
+ */
+public class FileRegion implements BlockAlias {
+
+  private final Path path;
+  private final long offset;
+  private final long length;
+  private final long blockId;
+  private final String bpid;
+  private final long genStamp;
+
+  public FileRegion(long blockId, Path path, long offset,
+      long length, String bpid, long genStamp) {
+    this.path = path;
+    this.offset = offset;
+    this.length = length;
+    this.blockId = blockId;
+    this.bpid = bpid;
+    this.genStamp = genStamp;
+  }
+
+  public FileRegion(long blockId, Path path, long offset,
+      long length, String bpid) {
+    this(blockId, path, offset, length, bpid,
+        HdfsConstants.GRANDFATHER_GENERATION_STAMP);
+
+  }
+
+  public FileRegion(long blockId, Path path, long offset,
+      long length, long genStamp) {
+    this(blockId, path, offset, length, null, genStamp);
+
+  }
+
+  public FileRegion(long blockId, Path path, long offset, long length) {
+    this(blockId, path, offset, length, null);
+  }
+
+  @Override
+  public Block getBlock() {
+    return new Block(blockId, length, genStamp);
+  }
+
+  @Override
+  public boolean equals(Object other) {
+    if (!(other instanceof FileRegion)) {
+      return false;
+    }
+    FileRegion o = (FileRegion) other;
+    return blockId == o.blockId
+      && offset == o.offset
+      && length == o.length
+      && genStamp == o.genStamp
+      && path.equals(o.path);
+  }
+
+  @Override
+  public int hashCode() {
+    return (int)(blockId & Integer.MIN_VALUE);
+  }
+
+  public Path getPath() {
+    return path;
+  }
+
+  public long getOffset() {
+    return offset;
+  }
+
+  public long getLength() {
+    return length;
+  }
+
+  public long getGenerationStamp() {
+    return genStamp;
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("{ block=\"").append(getBlock()).append("\"");
+    sb.append(", path=\"").append(getPath()).append("\"");
+    sb.append(", off=\"").append(getOffset()).append("\"");
+    sb.append(", len=\"").append(getBlock().getNumBytes()).append("\"");
+    sb.append(", genStamp=\"").append(getBlock()
+        .getGenerationStamp()).append("\"");
+    sb.append(", bpid=\"").append(bpid).append("\"");
+    sb.append(" }");
+    return sb.toString();
+  }
+
+  public String getBlockPoolId() {
+    return this.bpid;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java
new file mode 100644
index 0000000..2e94239
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/FileRegionProvider.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+/**
+ * This class is a stub for reading file regions from the block map.
+ */
+public class FileRegionProvider implements Iterable<FileRegion> {
+  @Override
+  public Iterator<FileRegion> iterator() {
+    return Collections.emptyListIterator();
+  }
+
+  public void refresh() throws IOException {
+    return;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 414d3a7..9ad61d7 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -40,6 +40,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
@@ -196,7 +197,10 @@ public abstract class Storage extends StorageInfo {
     Iterator<StorageDirectory> it =
       (dirType == null) ? dirIterator() : dirIterator(dirType);
     for ( ;it.hasNext(); ) {
-      list.add(new File(it.next().getCurrentDir(), fileName));
+      File currentDir = it.next().getCurrentDir();
+      if (currentDir != null) {
+        list.add(new File(currentDir, fileName));
+      }
     }
     return list;
   }
@@ -328,10 +332,20 @@ public abstract class Storage extends StorageInfo {
      */
     public StorageDirectory(String bpid, StorageDirType dirType,
         boolean isShared, StorageLocation location) {
-      this(new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT)), dirType,
+      this(getBlockPoolCurrentDir(bpid, location), dirType,
           isShared, location);
     }
 
+    private static File getBlockPoolCurrentDir(String bpid,
+        StorageLocation location) {
+      if (location == null ||
+          location.getStorageType() == StorageType.PROVIDED) {
+        return null;
+      } else {
+        return new File(location.getBpURI(bpid, STORAGE_DIR_CURRENT));
+      }
+    }
+
     private StorageDirectory(File dir, StorageDirType dirType,
         boolean isShared, StorageLocation location) {
       this.root = dir;
@@ -347,7 +361,8 @@ public abstract class Storage extends StorageInfo {
     }
 
     private static File getStorageLocationFile(StorageLocation location) {
-      if (location == null) {
+      if (location == null ||
+          location.getStorageType() == StorageType.PROVIDED) {
         return null;
       }
       try {
@@ -406,6 +421,10 @@ public abstract class Storage extends StorageInfo {
      */
     public void clearDirectory() throws IOException {
       File curDir = this.getCurrentDir();
+      if (curDir == null) {
+        //if the directory is null, there is nothing to do.
+        return;
+      }
       if (curDir.exists()) {
         File[] files = FileUtil.listFiles(curDir);
         LOG.info("Will remove files: " + Arrays.toString(files));
@@ -423,6 +442,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getCurrentDir() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_DIR_CURRENT);
     }
 
@@ -443,6 +465,9 @@ public abstract class Storage extends StorageInfo {
      * @return the version file path
      */
     public File getVersionFile() {
+      if (root == null) {
+        return null;
+      }
       return new File(new File(root, STORAGE_DIR_CURRENT), 
STORAGE_FILE_VERSION);
     }
 
@@ -452,6 +477,9 @@ public abstract class Storage extends StorageInfo {
      * @return the previous version file path
      */
     public File getPreviousVersionFile() {
+      if (root == null) {
+        return null;
+      }
       return new File(new File(root, STORAGE_DIR_PREVIOUS), 
STORAGE_FILE_VERSION);
     }
 
@@ -462,6 +490,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getPreviousDir() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_DIR_PREVIOUS);
     }
 
@@ -476,6 +507,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getPreviousTmp() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_TMP_PREVIOUS);
     }
 
@@ -490,6 +524,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getRemovedTmp() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_TMP_REMOVED);
     }
 
@@ -503,6 +540,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getFinalizedTmp() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_TMP_FINALIZED);
     }
 
@@ -517,6 +557,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getLastCheckpointTmp() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_TMP_LAST_CKPT);
     }
 
@@ -530,6 +573,9 @@ public abstract class Storage extends StorageInfo {
      * @return the directory path
      */
     public File getPreviousCheckpoint() {
+      if (root == null) {
+        return null;
+      }
       return new File(root, STORAGE_PREVIOUS_CKPT);
     }
 
@@ -543,7 +589,7 @@ public abstract class Storage extends StorageInfo {
     private void checkEmptyCurrent() throws InconsistentFSStateException,
         IOException {
       File currentDir = getCurrentDir();
-      if(!currentDir.exists()) {
+      if(currentDir == null || !currentDir.exists()) {
         // if current/ does not exist, it's safe to format it.
         return;
       }
@@ -589,6 +635,13 @@ public abstract class Storage extends StorageInfo {
     public StorageState analyzeStorage(StartupOption startOpt, Storage storage,
         boolean checkCurrentIsEmpty)
         throws IOException {
+
+      if (location != null &&
+          location.getStorageType() == StorageType.PROVIDED) {
+        //currently we assume that PROVIDED storages are always NORMAL
+        return StorageState.NORMAL;
+      }
+
       assert root != null : "root is null";
       boolean hadMkdirs = false;
       String rootPath = root.getCanonicalPath();
@@ -710,6 +763,10 @@ public abstract class Storage extends StorageInfo {
      */
     public void doRecover(StorageState curState) throws IOException {
       File curDir = getCurrentDir();
+      if (curDir == null || root == null) {
+        //at this point, we do not support recovery on PROVIDED storages
+        return;
+      }
       String rootPath = root.getCanonicalPath();
       switch(curState) {
       case COMPLETE_UPGRADE:  // mv previous.tmp -> previous
@@ -883,7 +940,8 @@ public abstract class Storage extends StorageInfo {
     
     @Override
     public String toString() {
-      return "Storage Directory " + this.root;
+      return "Storage Directory root= " + this.root +
+          "; location= " + this.location;
     }
 
     /**
@@ -1153,6 +1211,9 @@ public abstract class Storage extends StorageInfo {
   }
   
   public void writeProperties(File to, StorageDirectory sd) throws IOException 
{
+    if (to == null) {
+      return;
+    }
     Properties props = new Properties();
     setPropertiesFromFields(props, sd);
     writeProperties(to, props);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
index 50363c9..28871e5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
@@ -152,6 +152,9 @@ public class StorageInfo {
    */
   protected void setFieldsFromProperties(
       Properties props, StorageDirectory sd) throws IOException {
+    if (props == null) {
+      return;
+    }
     setLayoutVersion(props, sd);
     setNamespaceID(props, sd);
     setcTime(props, sd);
@@ -241,6 +244,9 @@ public class StorageInfo {
   }
 
   public static Properties readPropertiesFile(File from) throws IOException {
+    if (from == null) {
+      return null;
+    }
     RandomAccessFile file = new RandomAccessFile(from, "rws");
     FileInputStream in = null;
     Properties props = new Properties();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java
new file mode 100644
index 0000000..eacd08f
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionFormat.java
@@ -0,0 +1,442 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.BufferedReader;
+import java.io.BufferedWriter;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.OutputStream;
+import java.io.OutputStreamWriter;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Collections;
+import java.util.IdentityHashMap;
+import java.util.NoSuchElementException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.MultipleIOException;
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.CompressionCodecFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class is used for block maps stored as text files,
+ * with a specified delimiter.
+ */
+public class TextFileRegionFormat
+    extends BlockFormat<FileRegion> implements Configurable {
+
+  private Configuration conf;
+  private ReaderOptions readerOpts = TextReader.defaults();
+  private WriterOptions writerOpts = TextWriter.defaults();
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(TextFileRegionFormat.class);
+  @Override
+  public void setConf(Configuration conf) {
+    readerOpts.setConf(conf);
+    writerOpts.setConf(conf);
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Reader<FileRegion> getReader(Reader.Options opts)
+      throws IOException {
+    if (null == opts) {
+      opts = readerOpts;
+    }
+    if (!(opts instanceof ReaderOptions)) {
+      throw new IllegalArgumentException("Invalid options " + opts.getClass());
+    }
+    ReaderOptions o = (ReaderOptions) opts;
+    Configuration readerConf = (null == o.getConf())
+        ? new Configuration()
+            : o.getConf();
+    return createReader(o.file, o.delim, readerConf);
+  }
+
+  @VisibleForTesting
+  TextReader createReader(Path file, String delim, Configuration cfg)
+      throws IOException {
+    FileSystem fs = file.getFileSystem(cfg);
+    if (fs instanceof LocalFileSystem) {
+      fs = ((LocalFileSystem)fs).getRaw();
+    }
+    CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
+    CompressionCodec codec = factory.getCodec(file);
+    return new TextReader(fs, file, codec, delim);
+  }
+
+  @Override
+  public Writer<FileRegion> getWriter(Writer.Options opts) throws IOException {
+    if (null == opts) {
+      opts = writerOpts;
+    }
+    if (!(opts instanceof WriterOptions)) {
+      throw new IllegalArgumentException("Invalid options " + opts.getClass());
+    }
+    WriterOptions o = (WriterOptions) opts;
+    Configuration cfg = (null == o.getConf())
+        ? new Configuration()
+            : o.getConf();
+    if (o.codec != null) {
+      CompressionCodecFactory factory = new CompressionCodecFactory(cfg);
+      CompressionCodec codec = factory.getCodecByName(o.codec);
+      String name = o.file.getName() + codec.getDefaultExtension();
+      o.filename(new Path(o.file.getParent(), name));
+      return createWriter(o.file, codec, o.delim, cfg);
+    }
+    return createWriter(o.file, null, o.delim, conf);
+  }
+
+  @VisibleForTesting
+  TextWriter createWriter(Path file, CompressionCodec codec, String delim,
+      Configuration cfg) throws IOException {
+    FileSystem fs = file.getFileSystem(cfg);
+    if (fs instanceof LocalFileSystem) {
+      fs = ((LocalFileSystem)fs).getRaw();
+    }
+    OutputStream tmp = fs.create(file);
+    java.io.Writer out = new BufferedWriter(new OutputStreamWriter(
+          (null == codec) ? tmp : codec.createOutputStream(tmp), "UTF-8"));
+    return new TextWriter(out, delim);
+  }
+
+  /**
+   * Class specifying reader options for the {@link TextFileRegionFormat}.
+   */
+  public static class ReaderOptions
+      implements TextReader.Options, Configurable {
+
+    private Configuration conf;
+    private String delim =
+        DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
+    private Path file = new Path(
+        new File(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT)
+        .toURI().toString());
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      String tmpfile = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_READ_PATH,
+          DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
+      file = new Path(tmpfile);
+      delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
+          DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
+      LOG.info("TextFileRegionFormat: read path " + tmpfile.toString());
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public ReaderOptions filename(Path file) {
+      this.file = file;
+      return this;
+    }
+
+    @Override
+    public ReaderOptions delimiter(String delim) {
+      this.delim = delim;
+      return this;
+    }
+  }
+
+  /**
+   * Class specifying writer options for the {@link TextFileRegionFormat}.
+   */
+  public static class WriterOptions
+      implements TextWriter.Options, Configurable {
+
+    private Configuration conf;
+    private String codec = null;
+    private Path file =
+        new Path(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_PATH_DEFAULT);
+    private String delim =
+        DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT;
+
+    @Override
+    public void setConf(Configuration conf) {
+      this.conf = conf;
+      String tmpfile = conf.get(
+          DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_WRITE_PATH, file.toString());
+      file = new Path(tmpfile);
+      codec = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_CODEC);
+      delim = conf.get(DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER,
+          DFSConfigKeys.DFS_PROVIDED_BLOCK_MAP_DELIMITER_DEFAULT);
+    }
+
+    @Override
+    public Configuration getConf() {
+      return conf;
+    }
+
+    @Override
+    public WriterOptions filename(Path file) {
+      this.file = file;
+      return this;
+    }
+
+    public String getCodec() {
+      return codec;
+    }
+
+    public Path getFile() {
+      return file;
+    }
+
+    @Override
+    public WriterOptions codec(String codec) {
+      this.codec = codec;
+      return this;
+    }
+
+    @Override
+    public WriterOptions delimiter(String delim) {
+      this.delim = delim;
+      return this;
+    }
+
+  }
+
+  /**
+   * This class is used as a reader for block maps which
+   * are stored as delimited text files.
+   */
+  public static class TextReader extends Reader<FileRegion> {
+
+    /**
+     * Options for {@link TextReader}.
+     */
+    public interface Options extends Reader.Options {
+      Options filename(Path file);
+      Options delimiter(String delim);
+    }
+
+    static ReaderOptions defaults() {
+      return new ReaderOptions();
+    }
+
+    private final Path file;
+    private final String delim;
+    private final FileSystem fs;
+    private final CompressionCodec codec;
+    private final Map<FRIterator, BufferedReader> iterators;
+
+    protected TextReader(FileSystem fs, Path file, CompressionCodec codec,
+        String delim) {
+      this(fs, file, codec, delim,
+          new IdentityHashMap<FRIterator, BufferedReader>());
+    }
+
+    TextReader(FileSystem fs, Path file, CompressionCodec codec, String delim,
+        Map<FRIterator, BufferedReader> iterators) {
+      this.fs = fs;
+      this.file = file;
+      this.codec = codec;
+      this.delim = delim;
+      this.iterators = Collections.synchronizedMap(iterators);
+    }
+
+    @Override
+    public FileRegion resolve(Block ident) throws IOException {
+      // consider layering index w/ composable format
+      Iterator<FileRegion> i = iterator();
+      try {
+        while (i.hasNext()) {
+          FileRegion f = i.next();
+          if (f.getBlock().equals(ident)) {
+            return f;
+          }
+        }
+      } finally {
+        BufferedReader r = iterators.remove(i);
+        if (r != null) {
+          // null on last element
+          r.close();
+        }
+      }
+      return null;
+    }
+
+    class FRIterator implements Iterator<FileRegion> {
+
+      private FileRegion pending;
+
+      @Override
+      public boolean hasNext() {
+        return pending != null;
+      }
+
+      @Override
+      public FileRegion next() {
+        if (null == pending) {
+          throw new NoSuchElementException();
+        }
+        FileRegion ret = pending;
+        try {
+          pending = nextInternal(this);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+        return ret;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    }
+
+    private FileRegion nextInternal(Iterator<FileRegion> i) throws IOException 
{
+      BufferedReader r = iterators.get(i);
+      if (null == r) {
+        throw new IllegalStateException();
+      }
+      String line = r.readLine();
+      if (null == line) {
+        iterators.remove(i);
+        return null;
+      }
+      String[] f = line.split(delim);
+      if (f.length != 6) {
+        throw new IOException("Invalid line: " + line);
+      }
+      return new FileRegion(Long.parseLong(f[0]), new Path(f[1]),
+          Long.parseLong(f[2]), Long.parseLong(f[3]), f[5],
+          Long.parseLong(f[4]));
+    }
+
+    public InputStream createStream() throws IOException {
+      InputStream i = fs.open(file);
+      if (codec != null) {
+        i = codec.createInputStream(i);
+      }
+      return i;
+    }
+
+    @Override
+    public Iterator<FileRegion> iterator() {
+      FRIterator i = new FRIterator();
+      try {
+        BufferedReader r =
+            new BufferedReader(new InputStreamReader(createStream(), "UTF-8"));
+        iterators.put(i, r);
+        i.pending = nextInternal(i);
+      } catch (IOException e) {
+        iterators.remove(i);
+        throw new RuntimeException(e);
+      }
+      return i;
+    }
+
+    @Override
+    public void close() throws IOException {
+      ArrayList<IOException> ex = new ArrayList<>();
+      synchronized (iterators) {
+        for (Iterator<BufferedReader> i = iterators.values().iterator();
+             i.hasNext();) {
+          try {
+            BufferedReader r = i.next();
+            r.close();
+          } catch (IOException e) {
+            ex.add(e);
+          } finally {
+            i.remove();
+          }
+        }
+        iterators.clear();
+      }
+      if (!ex.isEmpty()) {
+        throw MultipleIOException.createIOException(ex);
+      }
+    }
+
+  }
+
+  /**
+   * This class is used as a writer for block maps which
+   * are stored as delimited text files.
+   */
+  public static class TextWriter extends Writer<FileRegion> {
+
+    /**
+     * Interface for Writer options.
+     */
+    public interface Options extends Writer.Options {
+      Options codec(String codec);
+      Options filename(Path file);
+      Options delimiter(String delim);
+    }
+
+    public static WriterOptions defaults() {
+      return new WriterOptions();
+    }
+
+    private final String delim;
+    private final java.io.Writer out;
+
+    public TextWriter(java.io.Writer out, String delim) {
+      this.out = out;
+      this.delim = delim;
+    }
+
+    @Override
+    public void store(FileRegion token) throws IOException {
+      out.append(String.valueOf(token.getBlock().getBlockId())).append(delim);
+      out.append(token.getPath().toString()).append(delim);
+      out.append(Long.toString(token.getOffset())).append(delim);
+      out.append(Long.toString(token.getLength())).append(delim);
+      out.append(Long.toString(token.getGenerationStamp())).append(delim);
+      out.append(token.getBlockPoolId()).append("\n");
+    }
+
+    @Override
+    public void close() throws IOException {
+      out.close();
+    }
+
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    //nothing to do;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java
new file mode 100644
index 0000000..0fa667e
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/TextFileRegionProvider.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.common;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * This class is used to read file regions from block maps
+ * specified using delimited text.
+ */
+public class TextFileRegionProvider
+    extends FileRegionProvider implements Configurable {
+
+  private Configuration conf;
+  private BlockFormat<FileRegion> fmt;
+
+  @SuppressWarnings("unchecked")
+  @Override
+  public void setConf(Configuration conf) {
+    fmt = ReflectionUtils.newInstance(
+        conf.getClass(DFSConfigKeys.DFS_PROVIDER_BLK_FORMAT_CLASS,
+            TextFileRegionFormat.class,
+            BlockFormat.class),
+        conf);
+    ((Configurable)fmt).setConf(conf); //redundant?
+    this.conf = conf;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return conf;
+  }
+
+  @Override
+  public Iterator<FileRegion> iterator() {
+    try {
+      final BlockFormat.Reader<FileRegion> r = fmt.getReader(null);
+      return new Iterator<FileRegion>() {
+
+        private final Iterator<FileRegion> inner = r.iterator();
+
+        @Override
+        public boolean hasNext() {
+          return inner.hasNext();
+        }
+
+        @Override
+        public FileRegion next() {
+          return inner.next();
+        }
+
+        @Override
+        public void remove() {
+          throw new UnsupportedOperationException();
+        }
+      };
+    } catch (IOException e) {
+      throw new RuntimeException("Failed to read provided blocks", e);
+    }
+  }
+
+  @Override
+  public void refresh() throws IOException {
+    fmt.refresh();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index bc41715..012d1f5 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
@@ -360,6 +361,9 @@ public class BlockPoolSliceStorage extends Storage {
   private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
       StartupOption startOpt, List<Callable<StorageDirectory>> callables,
       Configuration conf) throws IOException {
+    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+      return false; // regular startup for PROVIDED storage directories
+    }
     if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
       Preconditions.checkState(!getTrashRootDir(sd).exists(),
           sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " 
+
@@ -439,6 +443,10 @@ public class BlockPoolSliceStorage extends Storage {
         LayoutVersion.Feature.FEDERATION, layoutVersion)) {
       return;
     }
+    //no upgrades for storage directories that are PROVIDED
+    if (bpSd.getRoot() == null) {
+      return;
+    }
     final int oldLV = getLayoutVersion();
     LOG.info("Upgrading block pool storage directory " + bpSd.getRoot()
         + ".\n   old LV = " + oldLV
@@ -589,8 +597,9 @@ public class BlockPoolSliceStorage extends Storage {
       throws IOException {
     File prevDir = bpSd.getPreviousDir();
     // regular startup if previous dir does not exist
-    if (!prevDir.exists())
+    if (prevDir == null || !prevDir.exists()) {
       return;
+    }
     // read attributes out of the VERSION file of previous directory
     BlockPoolSliceStorage prevInfo = new BlockPoolSliceStorage();
     prevInfo.readPreviousVersionProperties(bpSd);
@@ -631,6 +640,10 @@ public class BlockPoolSliceStorage extends Storage {
    * that holds the snapshot.
    */
   void doFinalize(File dnCurDir) throws IOException {
+    LOG.info("doFinalize: " + dnCurDir);
+    if (dnCurDir == null) {
+      return; //we do nothing if the directory is null
+    }
     File bpRoot = getBpRoot(blockpoolID, dnCurDir);
     StorageDirectory bpSd = new StorageDirectory(bpRoot);
     // block pool level previous directory
@@ -841,6 +854,9 @@ public class BlockPoolSliceStorage extends Storage {
   public void setRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
       throws IOException {
     for (StorageDirectory sd : dnStorageDirs) {
+      if (sd.getCurrentDir() == null) {
+        return;
+      }
       File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
       File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
       if (!storagesWithRollingUpgradeMarker.contains(bpRoot.toString())) {
@@ -863,6 +879,9 @@ public class BlockPoolSliceStorage extends Storage {
   public void clearRollingUpgradeMarkers(List<StorageDirectory> dnStorageDirs)
       throws IOException {
     for (StorageDirectory sd : dnStorageDirs) {
+      if (sd.getCurrentDir() == null) {
+        continue;
+      }
       File bpRoot = getBpRoot(blockpoolID, sd.getCurrentDir());
       File markerFile = new File(bpRoot, ROLLING_UPGRADE_MARKER_FILE);
       if (!storagesWithoutRollingUpgradeMarker.contains(bpRoot.toString())) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 6d6e96a..a1bde31 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.HardLink;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.protocol.Block;
@@ -129,22 +130,31 @@ public class DataStorage extends Storage {
     this.datanodeUuid = newDatanodeUuid;
   }
 
-  private static boolean createStorageID(StorageDirectory sd, int lv) {
+  private static boolean createStorageID(StorageDirectory sd, int lv,
+      Configuration conf) {
     // Clusters previously upgraded from layout versions earlier than
     // ADD_DATANODE_AND_STORAGE_UUIDS failed to correctly generate a
     // new storage ID. We check for that and fix it now.
     final boolean haveValidStorageId = DataNodeLayoutVersion.supports(
         LayoutVersion.Feature.ADD_DATANODE_AND_STORAGE_UUIDS, lv)
         && DatanodeStorage.isValidStorageId(sd.getStorageUuid());
-    return createStorageID(sd, !haveValidStorageId);
+    return createStorageID(sd, !haveValidStorageId, conf);
   }
 
   /** Create an ID for this storage.
    * @return true if a new storage ID was generated.
    * */
   public static boolean createStorageID(
-      StorageDirectory sd, boolean regenerateStorageIds) {
+      StorageDirectory sd, boolean regenerateStorageIds, Configuration conf) {
     final String oldStorageID = sd.getStorageUuid();
+    if (sd.getStorageLocation() != null &&
+        sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+      // We only support one provided storage per datanode for now.
+      // TODO support multiple provided storage ids per datanode.
+      sd.setStorageUuid(conf.get(DFSConfigKeys.DFS_PROVIDER_STORAGEUUID,
+          DFSConfigKeys.DFS_PROVIDER_STORAGEUUID_DEFAULT));
+      return false;
+    }
     if (oldStorageID == null || regenerateStorageIds) {
       sd.setStorageUuid(DatanodeStorage.generateUuid());
       LOG.info("Generated new storageID " + sd.getStorageUuid() +
@@ -273,7 +283,7 @@ public class DataStorage extends Storage {
         LOG.info("Storage directory with location " + location
             + " is not formatted for namespace " + nsInfo.getNamespaceID()
             + ". Formatting...");
-        format(sd, nsInfo, datanode.getDatanodeUuid());
+        format(sd, nsInfo, datanode.getDatanodeUuid(), datanode.getConf());
         break;
       default:  // recovery part is common
         sd.doRecover(curState);
@@ -547,15 +557,15 @@ public class DataStorage extends Storage {
   }
 
   void format(StorageDirectory sd, NamespaceInfo nsInfo,
-              String datanodeUuid) throws IOException {
+              String newDatanodeUuid, Configuration conf) throws IOException {
     sd.clearDirectory(); // create directory
     this.layoutVersion = HdfsServerConstants.DATANODE_LAYOUT_VERSION;
     this.clusterID = nsInfo.getClusterID();
     this.namespaceID = nsInfo.getNamespaceID();
     this.cTime = 0;
-    setDatanodeUuid(datanodeUuid);
+    setDatanodeUuid(newDatanodeUuid);
 
-    createStorageID(sd, false);
+    createStorageID(sd, false, conf);
     writeProperties(sd);
   }
 
@@ -600,6 +610,9 @@ public class DataStorage extends Storage {
 
   private void setFieldsFromProperties(Properties props, StorageDirectory sd,
       boolean overrideLayoutVersion, int toLayoutVersion) throws IOException {
+    if (props == null) {
+      return;
+    }
     if (overrideLayoutVersion) {
       this.layoutVersion = toLayoutVersion;
     } else {
@@ -694,6 +707,10 @@ public class DataStorage extends Storage {
   private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
       StartupOption startOpt, List<Callable<StorageDirectory>> callables,
       Configuration conf) throws IOException {
+    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED) {
+      createStorageID(sd, layoutVersion, conf);
+      return false; // regular start up for PROVIDED storage directories
+    }
     if (startOpt == StartupOption.ROLLBACK) {
       doRollback(sd, nsInfo); // rollback if applicable
     }
@@ -724,7 +741,7 @@ public class DataStorage extends Storage {
 
     // regular start up.
     if (this.layoutVersion == HdfsServerConstants.DATANODE_LAYOUT_VERSION) {
-      createStorageID(sd, layoutVersion);
+      createStorageID(sd, layoutVersion, conf);
       return false; // need to write properties
     }
 
@@ -733,7 +750,7 @@ public class DataStorage extends Storage {
       if (federationSupported) {
         // If the existing on-disk layout version supports federation,
         // simply update the properties.
-        upgradeProperties(sd);
+        upgradeProperties(sd, conf);
       } else {
         doUpgradePreFederation(sd, nsInfo, callables, conf);
       }
@@ -829,15 +846,16 @@ public class DataStorage extends Storage {
 
     // 4. Write version file under <SD>/current
     clusterID = nsInfo.getClusterID();
-    upgradeProperties(sd);
+    upgradeProperties(sd, conf);
     
     // 5. Rename <SD>/previous.tmp to <SD>/previous
     rename(tmpDir, prevDir);
     LOG.info("Upgrade of " + sd.getRoot()+ " is complete");
   }
 
-  void upgradeProperties(StorageDirectory sd) throws IOException {
-    createStorageID(sd, layoutVersion);
+  void upgradeProperties(StorageDirectory sd, Configuration conf)
+      throws IOException {
+    createStorageID(sd, layoutVersion, conf);
     LOG.info("Updating layout version from " + layoutVersion
         + " to " + HdfsServerConstants.DATANODE_LAYOUT_VERSION
         + " for storage " + sd.getRoot());
@@ -989,7 +1007,7 @@ public class DataStorage extends Storage {
     // then finalize it. Else finalize the corresponding BP.
     for (StorageDirectory sd : getStorageDirs()) {
       File prevDir = sd.getPreviousDir();
-      if (prevDir.exists()) {
+      if (prevDir != null && prevDir.exists()) {
         // data node level storage finalize
         doFinalize(sd);
       } else {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
index 966bcb0..3b6d06c 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java
@@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
@@ -105,7 +106,7 @@ public class DirectoryScanner implements Runnable {
    * @param b whether to retain diffs
    */
   @VisibleForTesting
-  void setRetainDiffs(boolean b) {
+  public void setRetainDiffs(boolean b) {
     retainDiffs = b;
   }
 
@@ -215,7 +216,8 @@ public class DirectoryScanner implements Runnable {
    * @param dataset the dataset to scan
    * @param conf the Configuration object
    */
-  DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset, Configuration 
conf) {
+  public DirectoryScanner(DataNode datanode, FsDatasetSpi<?> dataset,
+      Configuration conf) {
     this.datanode = datanode;
     this.dataset = dataset;
     int interval = (int) conf.getTimeDuration(
@@ -369,15 +371,14 @@ public class DirectoryScanner implements Runnable {
    * Reconcile differences between disk and in-memory blocks
    */
   @VisibleForTesting
-  void reconcile() throws IOException {
+  public void reconcile() throws IOException {
     scan();
     for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
       String bpid = entry.getKey();
       LinkedList<ScanInfo> diff = entry.getValue();
       
       for (ScanInfo info : diff) {
-        dataset.checkAndUpdate(bpid, info.getBlockId(), info.getBlockFile(),
-            info.getMetaFile(), info.getVolume());
+        dataset.checkAndUpdate(bpid, info);
       }
     }
     if (!retainDiffs) clear();
@@ -429,11 +430,12 @@ public class DirectoryScanner implements Runnable {
           }
           // Block file and/or metadata file exists on the disk
           // Block exists in memory
-          if (info.getBlockFile() == null) {
+          if (info.getVolume().getStorageType() != StorageType.PROVIDED &&
+              info.getBlockFile() == null) {
             // Block metadata file exits and block file is missing
             addDifference(diffRecord, statsRecord, info);
           } else if (info.getGenStamp() != memBlock.getGenerationStamp()
-              || info.getBlockFileLength() != memBlock.getNumBytes()) {
+              || info.getBlockLength() != memBlock.getNumBytes()) {
             // Block metadata file is missing or has wrong generation stamp,
             // or block file length is different than expected
             statsRecord.mismatchBlocks++;
@@ -611,6 +613,9 @@ public class DirectoryScanner implements Runnable {
       for (String bpid : bpList) {
         LinkedList<ScanInfo> report = new LinkedList<>();
 
+        perfTimer.reset().start();
+        throttleTimer.reset().start();
+
         try {
           result.put(bpid, volume.compileReport(bpid, report, this));
         } catch (InterruptedException ex) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
new file mode 100644
index 0000000..722d573
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FinalizedProvidedReplica.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+
+/**
+ * This class is used for provided replicas that are finalized.
+ */
+public class FinalizedProvidedReplica extends ProvidedReplica {
+
+  public FinalizedProvidedReplica(long blockId, URI fileURI,
+      long fileOffset, long blockLen, long genStamp,
+      FsVolumeSpi volume, Configuration conf) {
+    super(blockId, fileURI, fileOffset, blockLen, genStamp, volume, conf);
+  }
+
+  @Override
+  public ReplicaState getState() {
+    return ReplicaState.FINALIZED;
+  }
+
+  @Override
+  public long getBytesOnDisk() {
+    return getNumBytes();
+  }
+
+  @Override
+  public long getVisibleLength() {
+    return getNumBytes(); //all bytes are visible
+  }
+
+  @Override  // Object
+  public boolean equals(Object o) {
+    return super.equals(o);
+  }
+
+  @Override  // Object
+  public int hashCode() {
+    return super.hashCode();
+  }
+
+  @Override
+  public String toString() {
+    return super.toString();
+  }
+
+  @Override
+  public ReplicaInfo getOriginalReplica() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getOriginalReplica");
+  }
+
+  @Override
+  public long getRecoveryID() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support getRecoveryID");
+  }
+
+  @Override
+  public void setRecoveryID(long recoveryId) {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support setRecoveryID");
+  }
+
+  @Override
+  public ReplicaRecoveryInfo createInfo() {
+    throw new UnsupportedOperationException("Replica of type " + getState() +
+        " does not support createInfo");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
new file mode 100644
index 0000000..b021ea2
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ProvidedReplica.java
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetUtil;
+import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This abstract class is used as a base class for provided replicas.
+ */
+public abstract class ProvidedReplica extends ReplicaInfo {
+
+  public static final Logger LOG =
+      LoggerFactory.getLogger(ProvidedReplica.class);
+
+  // Null checksum information for provided replicas.
+  // Shared across all replicas.
+  static final byte[] NULL_CHECKSUM_ARRAY =
+      FsDatasetUtil.createNullChecksumByteArray();
+  private URI fileURI;
+  private long fileOffset;
+  private Configuration conf;
+  private FileSystem remoteFS;
+
+  /**
+   * Constructor.
+   * @param blockId block id
+   * @param fileURI remote URI this block is to be read from
+   * @param fileOffset the offset in the remote URI
+   * @param blockLen the length of the block
+   * @param genStamp the generation stamp of the block
+   * @param volume the volume this block belongs to
+   */
+  public ProvidedReplica(long blockId, URI fileURI, long fileOffset,
+      long blockLen, long genStamp, FsVolumeSpi volume, Configuration conf) {
+    super(volume, blockId, blockLen, genStamp);
+    this.fileURI = fileURI;
+    this.fileOffset = fileOffset;
+    this.conf = conf;
+    try {
+      this.remoteFS = FileSystem.get(fileURI, this.conf);
+    } catch (IOException e) {
+      LOG.warn("Failed to obtain filesystem for " + fileURI);
+      this.remoteFS = null;
+    }
+  }
+
+  public ProvidedReplica(ProvidedReplica r) {
+    super(r);
+    this.fileURI = r.fileURI;
+    this.fileOffset = r.fileOffset;
+    this.conf = r.conf;
+    try {
+      this.remoteFS = FileSystem.newInstance(fileURI, this.conf);
+    } catch (IOException e) {
+      this.remoteFS = null;
+    }
+  }
+
+  @Override
+  public URI getBlockURI() {
+    return this.fileURI;
+  }
+
+  @Override
+  public InputStream getDataInputStream(long seekOffset) throws IOException {
+    if (remoteFS != null) {
+      FSDataInputStream ins = remoteFS.open(new Path(fileURI));
+      ins.seek(fileOffset + seekOffset);
+      return new FSDataInputStream(ins);
+    } else {
+      throw new IOException("Remote filesystem for provided replica " + this +
+          " does not exist");
+    }
+  }
+
+  @Override
+  public OutputStream getDataOutputStream(boolean append) throws IOException {
+    throw new UnsupportedOperationException(
+        "OutputDataStream is not implemented for ProvidedReplica");
+  }
+
+  @Override
+  public URI getMetadataURI() {
+    return null;
+  }
+
+  @Override
+  public OutputStream getMetadataOutputStream(boolean append)
+      throws IOException {
+    return null;
+  }
+
+  @Override
+  public boolean blockDataExists() {
+    if(remoteFS != null) {
+      try {
+        return remoteFS.exists(new Path(fileURI));
+      } catch (IOException e) {
+        return false;
+      }
+    } else {
+      return false;
+    }
+  }
+
+  @Override
+  public boolean deleteBlockData() {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not support deleting block data");
+  }
+
+  @Override
+  public long getBlockDataLength() {
+    return this.getNumBytes();
+  }
+
+  @Override
+  public LengthInputStream getMetadataInputStream(long offset)
+      throws IOException {
+    return new LengthInputStream(new ByteArrayInputStream(NULL_CHECKSUM_ARRAY),
+        NULL_CHECKSUM_ARRAY.length);
+  }
+
+  @Override
+  public boolean metadataExists() {
+    return NULL_CHECKSUM_ARRAY == null ? false : true;
+  }
+
+  @Override
+  public boolean deleteMetadata() {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not support deleting metadata");
+  }
+
+  @Override
+  public long getMetadataLength() {
+    return NULL_CHECKSUM_ARRAY == null ? 0 : NULL_CHECKSUM_ARRAY.length;
+  }
+
+  @Override
+  public boolean renameMeta(URI destURI) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not support renaming metadata");
+  }
+
+  @Override
+  public boolean renameData(URI destURI) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not support renaming data");
+  }
+
+  @Override
+  public boolean getPinning(LocalFileSystem localFS) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void setPinning(LocalFileSystem localFS) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not support pinning");
+  }
+
+  @Override
+  public void bumpReplicaGS(long newGS) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support writes");
+  }
+
+  @Override
+  public boolean breakHardLinksIfNeeded() throws IOException {
+    return false;
+  }
+
+  @Override
+  public ReplicaRecoveryInfo createInfo()
+      throws UnsupportedOperationException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support writes");
+  }
+
+  @Override
+  public int compareWith(ScanInfo info) {
+    //local scanning cannot find any provided blocks.
+    if (info.getFileRegion().equals(
+        new FileRegion(this.getBlockId(), new Path(fileURI),
+            fileOffset, this.getNumBytes(), this.getGenerationStamp()))) {
+      return 0;
+    } else {
+      return (int) (info.getBlockLength() - getNumBytes());
+    }
+  }
+
+  @Override
+  public void truncateBlock(long newLength) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support truncate");
+  }
+
+  @Override
+  public void updateWithReplica(StorageLocation replicaLocation) {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support update");
+  }
+
+  @Override
+  public void copyMetadata(URI destination) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support copy metadata");
+  }
+
+  @Override
+  public void copyBlockdata(URI destination) throws IOException {
+    throw new UnsupportedOperationException(
+        "ProvidedReplica does not yet support copy data");
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
index 280aaa0..639467f 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaBuilder.java
@@ -18,9 +18,13 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import java.io.File;
+import java.net.URI;
 
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.common.FileRegion;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
 
 /**
@@ -42,11 +46,20 @@ public class ReplicaBuilder {
 
   private ReplicaInfo fromReplica;
 
+  private URI uri;
+  private long offset;
+  private Configuration conf;
+  private FileRegion fileRegion;
+
   public ReplicaBuilder(ReplicaState state) {
     volume = null;
     writer = null;
     block = null;
     length = -1;
+    fileRegion = null;
+    conf = null;
+    fromReplica = null;
+    uri = null;
     this.state = state;
   }
 
@@ -105,6 +118,26 @@ public class ReplicaBuilder {
     return this;
   }
 
+  public ReplicaBuilder setURI(URI uri) {
+    this.uri = uri;
+    return this;
+  }
+
+  public ReplicaBuilder setConf(Configuration conf) {
+    this.conf = conf;
+    return this;
+  }
+
+  public ReplicaBuilder setOffset(long offset) {
+    this.offset = offset;
+    return this;
+  }
+
+  public ReplicaBuilder setFileRegion(FileRegion fileRegion) {
+    this.fileRegion = fileRegion;
+    return this;
+  }
+
   public LocalReplicaInPipeline buildLocalReplicaInPipeline()
       throws IllegalArgumentException {
     LocalReplicaInPipeline info = null;
@@ -176,7 +209,7 @@ public class ReplicaBuilder {
     }
   }
 
-  private ReplicaInfo buildFinalizedReplica() throws IllegalArgumentException {
+  private LocalReplica buildFinalizedReplica() throws IllegalArgumentException 
{
     if (null != fromReplica &&
         fromReplica.getState() == ReplicaState.FINALIZED) {
       return new FinalizedReplica((FinalizedReplica)fromReplica);
@@ -193,7 +226,7 @@ public class ReplicaBuilder {
     }
   }
 
-  private ReplicaInfo buildRWR() throws IllegalArgumentException {
+  private LocalReplica buildRWR() throws IllegalArgumentException {
 
     if (null != fromReplica && fromReplica.getState() == ReplicaState.RWR) {
       return new ReplicaWaitingToBeRecovered(
@@ -211,7 +244,7 @@ public class ReplicaBuilder {
     }
   }
 
-  private ReplicaInfo buildRUR() throws IllegalArgumentException {
+  private LocalReplica buildRUR() throws IllegalArgumentException {
     if (null == fromReplica) {
       throw new IllegalArgumentException(
           "Missing a valid replica to recover from");
@@ -228,8 +261,53 @@ public class ReplicaBuilder {
     }
   }
 
-  public ReplicaInfo build() throws IllegalArgumentException {
-    ReplicaInfo info = null;
+  private ProvidedReplica buildProvidedFinalizedReplica()
+      throws IllegalArgumentException {
+    ProvidedReplica info = null;
+    if (fromReplica != null) {
+      throw new IllegalArgumentException("Finalized PROVIDED replica " +
+          "cannot be constructed from another replica");
+    }
+    if (fileRegion == null && uri == null) {
+      throw new IllegalArgumentException(
+          "Trying to construct a provided replica on " + volume +
+          " without enough information");
+    }
+    if (fileRegion == null) {
+      info = new FinalizedProvidedReplica(blockId, uri, offset,
+          length, genStamp, volume, conf);
+    } else {
+      info = new FinalizedProvidedReplica(fileRegion.getBlock().getBlockId(),
+          fileRegion.getPath().toUri(),
+          fileRegion.getOffset(),
+          fileRegion.getBlock().getNumBytes(),
+          fileRegion.getBlock().getGenerationStamp(),
+          volume, conf);
+    }
+    return info;
+  }
+
+  private ProvidedReplica buildProvidedReplica()
+      throws IllegalArgumentException {
+    ProvidedReplica info = null;
+    switch(this.state) {
+    case FINALIZED:
+      info = buildProvidedFinalizedReplica();
+      break;
+    case RWR:
+    case RUR:
+    case RBW:
+    case TEMPORARY:
+    default:
+      throw new IllegalArgumentException("Unknown replica state " +
+          state + " for PROVIDED replica");
+    }
+    return info;
+  }
+
+  private LocalReplica buildLocalReplica()
+      throws IllegalArgumentException {
+    LocalReplica info = null;
     switch(this.state) {
     case FINALIZED:
       info = buildFinalizedReplica();
@@ -249,4 +327,16 @@ public class ReplicaBuilder {
     }
     return info;
   }
+
+  public ReplicaInfo build() throws IllegalArgumentException {
+
+    ReplicaInfo info = null;
+    if(volume != null && volume.getStorageType() == StorageType.PROVIDED) {
+      info = buildProvidedReplica();
+    } else {
+      info = buildLocalReplica();
+    }
+
+    return info;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
index 65e9ba7..3718799 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInfo.java
@@ -50,6 +50,17 @@ abstract public class ReplicaInfo extends Block
       new FileIoProvider(null, null);
 
   /**
+   * Constructor.
+   * @param block a block
+   * @param vol volume where replica is located
+   * @param dir directory path where block and meta files are located
+   */
+  ReplicaInfo(Block block, FsVolumeSpi vol) {
+    this(vol, block.getBlockId(), block.getNumBytes(),
+        block.getGenerationStamp());
+  }
+
+  /**
   * Constructor
   * @param vol volume where replica is located
   * @param blockId block id
@@ -62,7 +73,14 @@ abstract public class ReplicaInfo extends Block
   }
   
   /**
-   * Get the volume where this replica is located on disk.
+   * Copy constructor.
+   * @param from where to copy from
+   */
+  ReplicaInfo(ReplicaInfo from) {
+    this(from, from.getVolume());
+  }
+
+  /**
    * @return the volume where this replica is located on disk
    */
   public FsVolumeSpi getVolume() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
index b4d5794..fb7acfd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
@@ -98,6 +98,16 @@ public class StorageLocation
 
   public boolean matchesStorageDirectory(StorageDirectory sd,
       String bpid) throws IOException {
+    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED &&
+        storageType == StorageType.PROVIDED) {
+      return matchesStorageDirectory(sd);
+    }
+    if (sd.getStorageLocation().getStorageType() == StorageType.PROVIDED ||
+        storageType == StorageType.PROVIDED) {
+      //only one of these is PROVIDED; so it cannot be a match!
+      return false;
+    }
+    //both storage directories are local
     return this.getBpURI(bpid, Storage.STORAGE_DIR_CURRENT).normalize()
         .equals(sd.getRoot().toURI().normalize());
   }
@@ -197,6 +207,10 @@ public class StorageLocation
     if (conf == null) {
       conf = new HdfsConfiguration();
     }
+    if (storageType == StorageType.PROVIDED) {
+      //skip creation if the storage type is PROVIDED
+      return;
+    }
 
     LocalFileSystem localFS = FileSystem.getLocal(conf);
     FsPermission permission = new FsPermission(conf.get(
@@ -213,10 +227,14 @@ public class StorageLocation
 
   @Override  // Checkable
   public VolumeCheckResult check(CheckContext context) throws IOException {
-    DiskChecker.checkDir(
-        context.localFileSystem,
-        new Path(baseURI),
-        context.expectedPermission);
+    //we assume provided storage locations are always healthy,
+    //and check only for local storages.
+    if (storageType != StorageType.PROVIDED) {
+      DiskChecker.checkDir(
+          context.localFileSystem,
+          new Path(baseURI),
+          context.expectedPermission);
+    }
     return VolumeCheckResult.HEALTHY;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/970028f0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 7be42e8..f4bf839 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
 import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.UnexpectedReplicaStateException;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.ScanInfo;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import 
org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -252,8 +253,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> 
extends FSDatasetMBean {
    * and, in case that they are not matched, update the record or mark it
    * as corrupted.
    */
-  void checkAndUpdate(String bpid, long blockId, File diskFile,
-      File diskMetaFile, FsVolumeSpi vol) throws IOException;
+  void checkAndUpdate(String bpid, ScanInfo info) throws IOException;
 
   /**
    * @param b - the block


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to