HDFS-11163. Mover should move the file blocks to default storage once policy is unset. Contributed by Surendra Singh Lilhore.
(cherry picked from commit 00ed21a6fedb45a7c8992b8d45adaa83f14af34c) (cherry picked from commit d5e2bd4096bf2b4d8a5a22042145a08905f93cd4) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c4bf5043 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c4bf5043 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c4bf5043 Branch: refs/heads/branch-2.8 Commit: c4bf504395d0594c6496439bfe59d78a606e16de Parents: 32475a7 Author: Chris Nauroth <cnaur...@apache.org> Authored: Tue Apr 11 15:01:49 2017 -0700 Committer: Chris Nauroth <cnaur...@apache.org> Committed: Tue Apr 11 21:55:28 2017 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/fs/FsServerDefaults.java | 21 +++++- .../hadoop/hdfs/protocolPB/PBHelperClient.java | 4 +- .../src/main/proto/hdfs.proto | 1 + .../apache/hadoop/hdfs/server/mover/Mover.java | 11 ++- .../hdfs/server/namenode/FSNamesystem.java | 5 +- .../apache/hadoop/hdfs/TestFileCreation.java | 1 + .../hadoop/hdfs/server/mover/TestMover.java | 74 ++++++++++++++++++++ 7 files changed, 109 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4bf5043/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java index 469243c..9933e5d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsServerDefaults.java @@ -55,6 +55,7 @@ public class FsServerDefaults implements Writable { private long trashInterval; private DataChecksum.Type checksumType; private String keyProviderUri; + private byte storagepolicyId; public FsServerDefaults() { } @@ -62,8 +63,17 @@ public class FsServerDefaults implements Writable { public FsServerDefaults(long blockSize, int bytesPerChecksum, int writePacketSize, short replication, int fileBufferSize, boolean encryptDataTransfer, long trashInterval, - DataChecksum.Type checksumType, - String keyProviderUri) { + DataChecksum.Type checksumType, String keyProviderUri) { + this(blockSize, bytesPerChecksum, writePacketSize, replication, + fileBufferSize, encryptDataTransfer, trashInterval, checksumType, + keyProviderUri, (byte) 0); + } + + public FsServerDefaults(long blockSize, int bytesPerChecksum, + int writePacketSize, short replication, int fileBufferSize, + boolean encryptDataTransfer, long trashInterval, + DataChecksum.Type checksumType, String keyProviderUri, + byte storagepolicy) { this.blockSize = blockSize; this.bytesPerChecksum = bytesPerChecksum; this.writePacketSize = writePacketSize; @@ -73,6 +83,7 @@ public class FsServerDefaults implements Writable { this.trashInterval = trashInterval; this.checksumType = checksumType; this.keyProviderUri = keyProviderUri; + this.storagepolicyId = storagepolicy; } public long getBlockSize() { @@ -115,6 +126,10 @@ public class FsServerDefaults implements Writable { return keyProviderUri; } + public byte getDefaultStoragePolicyId() { + return storagepolicyId; + } + // ///////////////////////////////////////// // Writable // ///////////////////////////////////////// @@ -127,6 +142,7 @@ public class FsServerDefaults implements Writable { out.writeShort(replication); out.writeInt(fileBufferSize); WritableUtils.writeEnum(out, checksumType); + out.writeByte(storagepolicyId); } @Override @@ -138,5 +154,6 @@ public class FsServerDefaults implements Writable { replication = in.readShort(); fileBufferSize = in.readInt(); checksumType = WritableUtils.readEnum(in, DataChecksum.Type.class); + storagepolicyId = in.readByte(); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4bf5043/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java index 42ac9b1..000e211 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelperClient.java @@ -1592,7 +1592,8 @@ public class PBHelperClient { fs.getEncryptDataTransfer(), fs.getTrashInterval(), convert(fs.getChecksumType()), - fs.hasKeyProviderUri() ? fs.getKeyProviderUri() : null); + fs.hasKeyProviderUri() ? fs.getKeyProviderUri() : null, + (byte) fs.getPolicyId()); } public static List<CryptoProtocolVersionProto> convert( @@ -1744,6 +1745,7 @@ public class PBHelperClient { .setTrashInterval(fs.getTrashInterval()) .setChecksumType(convert(fs.getChecksumType())) .setKeyProviderUri(fs.getKeyProviderUri()) + .setPolicyId(fs.getDefaultStoragePolicyId()) .build(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4bf5043/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index 6661060..7a465be 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -374,6 +374,7 @@ message FsServerDefaultsProto { optional uint64 trashInterval = 7 [default = 0]; optional ChecksumTypeProto checksumType = 8 [default = CHECKSUM_CRC32]; optional string keyProviderUri = 9; + optional uint32 policyId = 10 [default = 0]; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4bf5043/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 2e1b8e2..df42c4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -351,10 +351,15 @@ public class Mover { /** @return true if it is necessary to run another round of migration */ private void processFile(String fullPath, HdfsLocatedFileStatus status, Result result) { - final byte policyId = status.getStoragePolicy(); - // currently we ignore files with unspecified storage policy + byte policyId = status.getStoragePolicy(); if (policyId == HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) { - return; + try { + // get default policy from namenode + policyId = dfs.getServerDefaults().getDefaultStoragePolicyId(); + } catch (IOException e) { + LOG.warn("Failed to get default policy for " + fullPath, e); + return; + } } final BlockStoragePolicy policy = blockStoragePolicies[policyId]; if (policy == null) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4bf5043/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 68ac80e..1df5275 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -792,8 +792,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT), checksumType, conf.getTrimmed( - CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, - "")); + CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH, + ""), + blockManager.getStoragePolicySuite().getDefaultPolicy().getId()); this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY, DFS_NAMENODE_MAX_OBJECTS_DEFAULT); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4bf5043/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index e47c8b1..ef91f1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -171,6 +171,7 @@ public class TestFileCreation { assertEquals(DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT, serverDefaults.getWritePacketSize()); assertEquals(DFS_REPLICATION_DEFAULT + 1, serverDefaults.getReplication()); assertEquals(IO_FILE_BUFFER_SIZE_DEFAULT, serverDefaults.getFileBufferSize()); + assertEquals(7, serverDefaults.getDefaultStoragePolicyId()); } finally { fs.close(); cluster.shutdown(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/c4bf5043/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index 1c47f43..c7d04d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -22,7 +22,9 @@ import java.net.URI; import java.util.*; import java.util.concurrent.atomic.AtomicInteger; +import com.google.common.base.Supplier; import com.google.common.collect.Maps; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; @@ -45,9 +47,12 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ToolRunner; import org.junit.Assert; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class TestMover { + private static final Logger LOG = LoggerFactory.getLogger(TestMover.class); static final int DEFAULT_BLOCK_SIZE = 100; static { @@ -409,4 +414,73 @@ public class TestMover { cluster.shutdown(); } } + + @Test(timeout = 300000) + public void testMoverWhenStoragePolicyUnset() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .storageTypes( + new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}}) + .build(); + try { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String file = "/testMoverWhenStoragePolicyUnset"; + // write to DISK + DFSTestUtil.createFile(dfs, new Path(file), 1L, (short) 1, 0L); + + // move to ARCHIVE + dfs.setStoragePolicy(new Path(file), "COLD"); + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", file.toString()}); + Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc); + + // Wait till namenode notified about the block location details + waitForLocatedBlockWithArchiveStorageType(dfs, file, 1); + + // verify before unset policy + LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + Assert.assertTrue(StorageType.ARCHIVE == (lb.getStorageTypes())[0]); + + // unset storage policy + dfs.unsetStoragePolicy(new Path(file)); + rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", file.toString()}); + Assert.assertEquals("Movement to DISK should be successful", 0, rc); + + lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + Assert.assertTrue(StorageType.DISK == (lb.getStorageTypes())[0]); + } finally { + cluster.shutdown(); + } + } + + private void waitForLocatedBlockWithArchiveStorageType( + final DistributedFileSystem dfs, final String file, + final int expectedArchiveCount) throws Exception { + GenericTestUtils.waitFor(new Supplier<Boolean>() { + @Override + public Boolean get() { + LocatedBlock lb = null; + try { + lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + } catch (IOException e) { + LOG.error("Exception while getting located blocks", e); + return false; + } + int archiveCount = 0; + for (StorageType storageType : lb.getStorageTypes()) { + if (StorageType.ARCHIVE == storageType) { + archiveCount++; + } + } + LOG.info("Archive replica count, expected={} and actual={}", + expectedArchiveCount, archiveCount); + return expectedArchiveCount == archiveCount; + } + }, 100, 3000); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org