HDFS-8433. Erasure coding: set blockToken in LocatedStripedBlock. Contributed by Walter Su.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ec17ac94 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ec17ac94 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ec17ac94 Branch: refs/heads/HDFS-7285-REBASE Commit: ec17ac942c5c6002d9060337db9d2ffe5845de6a Parents: 943ebc9 Author: Walter Su <[email protected]> Authored: Mon Jul 20 10:18:34 2015 +0800 Committer: Vinayakumar B <[email protected]> Committed: Thu Aug 13 17:31:13 2015 +0530 ---------------------------------------------------------------------- .../hdfs/protocol/LocatedStripedBlock.java | 16 + .../src/main/proto/hdfs.proto | 3 + .../hadoop-hdfs/CHANGES-HDFS-EC-7285.txt | 5 + .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 15 +- .../server/blockmanagement/BlockManager.java | 21 +- .../hadoop/hdfs/util/StripedBlockUtil.java | 13 +- .../blockmanagement/TestBlockTokenWithDFS.java | 422 ++++++++++--------- .../TestBlockTokenWithDFSStriped.java | 119 ++++++ 8 files changed, 410 insertions(+), 204 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec17ac94/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java index dc5a77f..6e62220 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedStripedBlock.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.protocol; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.security.token.Token; import java.util.Arrays; @@ -32,8 +34,10 @@ import java.util.Arrays; @InterfaceStability.Evolving public class LocatedStripedBlock extends LocatedBlock { private static final int[] EMPTY_INDICES = {}; + private static final Token<BlockTokenIdentifier> EMPTY_TOKEN = new Token<>(); private int[] blockIndices; + private Token<BlockTokenIdentifier>[] blockTokens; public LocatedStripedBlock(ExtendedBlock b, DatanodeInfo[] locs, String[] storageIDs, StorageType[] storageTypes, int[] indices, @@ -46,6 +50,10 @@ public class LocatedStripedBlock extends LocatedBlock { this.blockIndices = new int[indices.length]; System.arraycopy(indices, 0, blockIndices, 0, indices.length); } + blockTokens = new Token[blockIndices.length]; + for (int i = 0; i < blockIndices.length; i++) { + blockTokens[i] = EMPTY_TOKEN; + } } @Override @@ -67,4 +75,12 @@ public class LocatedStripedBlock extends LocatedBlock { public boolean isStriped() { return true; } + + public Token<BlockTokenIdentifier>[] getBlockTokens() { + return blockTokens; + } + + public void setBlockTokens(Token<BlockTokenIdentifier>[] tokens) { + this.blockTokens = tokens; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec17ac94/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto index e1f944f..d2cb665 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/proto/hdfs.proto @@ -220,7 +220,10 @@ message LocatedBlockProto { repeated bool isCached = 6 [packed=true]; // if a location in locs is cached repeated StorageTypeProto storageTypes = 7; repeated string storageIDs = 8; + + // striped block related fields repeated uint32 blockIndex = 9; // used for striped block to indicate block index for each storage + repeated hadoop.common.TokenProto blockTokens = 10; // each internal block has a block token } message DataEncryptionKeyProto { http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec17ac94/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt index e2ccd9b..4709388 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-EC-7285.txt @@ -359,3 +359,8 @@ HDFS-8058. Erasure coding: use BlockInfo[] for both striped and contiguous blocks in INodeFile. (Zhe Zhang and Yi Liu via zhz) + + HDFS-8787. Erasure coding: rename BlockInfoContiguousUC and BlockInfoStripedUC + to be consistent with trunk. (zhz) + + HDFS-8433. Erasure coding: set blockToken in LocatedStripedBlock.(waltersu4549) http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec17ac94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 6c645c7..a97e2ff 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -813,9 +813,12 @@ public class PBHelper { builder.addAllStorageIDs(Arrays.asList(storageIDs)); } if (b instanceof LocatedStripedBlock) { - int[] indices = ((LocatedStripedBlock) b).getBlockIndices(); - for (int index : indices) { - builder.addBlockIndex(index); + LocatedStripedBlock sb = (LocatedStripedBlock) b; + int[] indices = sb.getBlockIndices(); + Token<BlockTokenIdentifier>[] blockTokens = sb.getBlockTokens(); + for (int i = 0; i < indices.length; i++) { + builder.addBlockIndex(indices[i]); + builder.addBlockTokens(PBHelper.convert(blockTokens[i])); } } @@ -872,6 +875,12 @@ public class PBHelper { storageIDs, storageTypes, indices, proto.getOffset(), proto.getCorrupt(), cachedLocs.toArray(new DatanodeInfo[cachedLocs.size()])); + List<TokenProto> tokenProtos = proto.getBlockTokensList(); + Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length]; + for (int i = 0; i < indices.length; i++) { + blockTokens[i] = PBHelper.convert(tokenProtos.get(i)); + } + ((LocatedStripedBlock) lb).setBlockTokens(blockTokens); } lb.setBlockToken(PBHelper.convert(proto.getBlockToken())); http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec17ac94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index a815e63..4e866f5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -97,6 +97,7 @@ import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.metrics2.util.MBeans; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.LightWeightGSet; import org.apache.hadoop.util.Time; @@ -1005,9 +1006,23 @@ public class BlockManager implements BlockStatsMXBean { final AccessMode mode) throws IOException { if (isBlockTokenEnabled()) { // Use cached UGI if serving RPC calls. - b.setBlockToken(blockTokenSecretManager.generateToken( - NameNode.getRemoteUser().getShortUserName(), - b.getBlock(), EnumSet.of(mode))); + if (b.isStriped()) { + LocatedStripedBlock sb = (LocatedStripedBlock) b; + int[] indices = sb.getBlockIndices(); + Token<BlockTokenIdentifier>[] blockTokens = new Token[indices.length]; + ExtendedBlock internalBlock = new ExtendedBlock(b.getBlock()); + for (int i = 0; i < indices.length; i++) { + internalBlock.setBlockId(b.getBlock().getBlockId() + indices[i]); + blockTokens[i] = blockTokenSecretManager.generateToken( + NameNode.getRemoteUser().getShortUserName(), + internalBlock, EnumSet.of(mode)); + } + sb.setBlockTokens(blockTokens); + } else { + b.setBlockToken(blockTokenSecretManager.generateToken( + NameNode.getRemoteUser().getShortUserName(), + b.getBlock(), EnumSet.of(mode))); + } } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec17ac94/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 6bd5e1f..9b0939c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -30,8 +30,10 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; import com.google.common.base.Preconditions; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.io.erasurecode.ECSchema; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; +import org.apache.hadoop.security.token.Token; import java.nio.ByteBuffer; import java.util.*; @@ -105,17 +107,22 @@ public class StripedBlockUtil { int idxInBlockGroup) { final ExtendedBlock blk = constructInternalBlock( bg.getBlock(), cellSize, dataBlkNum, idxInBlockGroup); - + final LocatedBlock locatedBlock; if (idxInReturnedLocs < bg.getLocations().length) { - return new LocatedBlock(blk, + locatedBlock = new LocatedBlock(blk, new DatanodeInfo[]{bg.getLocations()[idxInReturnedLocs]}, new String[]{bg.getStorageIDs()[idxInReturnedLocs]}, new StorageType[]{bg.getStorageTypes()[idxInReturnedLocs]}, bg.getStartOffset(), bg.isCorrupt(), null); } else { - return new LocatedBlock(blk, null, null, null, + locatedBlock = new LocatedBlock(blk, null, null, null, bg.getStartOffset(), bg.isCorrupt(), null); } + Token<BlockTokenIdentifier>[] blockTokens = bg.getBlockTokens(); + if (idxInBlockGroup < blockTokens.length) { + locatedBlock.setBlockToken(blockTokens[idxInBlockGroup]); + } + return locatedBlock; } /** http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec17ac94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java index 43f2992..26ed1fe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFS.java @@ -24,7 +24,6 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.net.InetSocketAddress; import java.net.Socket; -import java.util.EnumSet; import java.util.List; import java.util.Random; @@ -69,28 +68,32 @@ import org.junit.Test; public class TestBlockTokenWithDFS { - private static final int BLOCK_SIZE = 1024; - private static final int FILE_SIZE = 2 * BLOCK_SIZE; + protected static int BLOCK_SIZE = 1024; + protected static int FILE_SIZE = 2 * BLOCK_SIZE; private static final String FILE_TO_READ = "/fileToRead.dat"; private static final String FILE_TO_WRITE = "/fileToWrite.dat"; private static final String FILE_TO_APPEND = "/fileToAppend.dat"; - private final byte[] rawData = new byte[FILE_SIZE]; { ((Log4JLogger) DFSClient.LOG).getLogger().setLevel(Level.ALL); + } + + public static byte[] generateBytes(int fileSize){ Random r = new Random(); + byte[] rawData = new byte[fileSize]; r.nextBytes(rawData); + return rawData; } - private void createFile(FileSystem fs, Path filename) throws IOException { + private void createFile(FileSystem fs, Path filename, byte[] expected) throws IOException { FSDataOutputStream out = fs.create(filename); - out.write(rawData); + out.write(expected); out.close(); } // read a file using blockSeekTo() - private boolean checkFile1(FSDataInputStream in) { - byte[] toRead = new byte[FILE_SIZE]; + private boolean checkFile1(FSDataInputStream in, byte[] expected) { + byte[] toRead = new byte[expected.length]; int totalRead = 0; int nRead = 0; try { @@ -101,27 +104,27 @@ public class TestBlockTokenWithDFS { return false; } assertEquals("Cannot read file.", toRead.length, totalRead); - return checkFile(toRead); + return checkFile(toRead, expected); } // read a file using fetchBlockByteRange() - private boolean checkFile2(FSDataInputStream in) { - byte[] toRead = new byte[FILE_SIZE]; + private boolean checkFile2(FSDataInputStream in, byte[] expected) { + byte[] toRead = new byte[expected.length]; try { assertEquals("Cannot read file", toRead.length, in.read(0, toRead, 0, toRead.length)); } catch (IOException e) { return false; } - return checkFile(toRead); + return checkFile(toRead, expected); } - private boolean checkFile(byte[] fileToCheck) { - if (fileToCheck.length != rawData.length) { + private boolean checkFile(byte[] fileToCheck, byte[] expected) { + if (fileToCheck.length != expected.length) { return false; } for (int i = 0; i < fileToCheck.length; i++) { - if (fileToCheck[i] != rawData[i]) { + if (fileToCheck[i] != expected[i]) { return false; } } @@ -137,7 +140,7 @@ public class TestBlockTokenWithDFS { } // try reading a block using a BlockReader directly - private static void tryRead(final Configuration conf, LocatedBlock lblock, + protected void tryRead(final Configuration conf, LocatedBlock lblock, boolean shouldSucceed) { InetSocketAddress targetAddr = null; IOException ioe = null; @@ -148,7 +151,7 @@ public class TestBlockTokenWithDFS { targetAddr = NetUtils.createSocketAddr(nodes[0].getXferAddr()); blockReader = new BlockReaderFactory(new DfsClientConf(conf)). - setFileName(BlockReaderFactory.getFileName(targetAddr, + setFileName(BlockReaderFactory.getFileName(targetAddr, "test-blockpoolid", block.getBlockId())). setBlock(block). setBlockToken(lblock.getBlockToken()). @@ -205,7 +208,7 @@ public class TestBlockTokenWithDFS { } // get a conf for testing - private static Configuration getConf(int numDataNodes) { + protected Configuration getConf(int numDataNodes) { Configuration conf = new Configuration(); conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE); @@ -241,16 +244,16 @@ public class TestBlockTokenWithDFS { SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); Path fileToAppend = new Path(FILE_TO_APPEND); FileSystem fs = cluster.getFileSystem(); - + byte[] expected = generateBytes(FILE_SIZE); // write a one-byte file FSDataOutputStream stm = writeFile(fs, fileToAppend, (short) numDataNodes, BLOCK_SIZE); - stm.write(rawData, 0, 1); + stm.write(expected, 0, 1); stm.close(); // open the file again for append stm = fs.append(fileToAppend); - int mid = rawData.length - 1; - stm.write(rawData, 1, mid - 1); + int mid = expected.length - 1; + stm.write(expected, 1, mid - 1); stm.hflush(); /* @@ -267,11 +270,11 @@ public class TestBlockTokenWithDFS { // remove a datanode to force re-establishing pipeline cluster.stopDataNode(0); // append the rest of the file - stm.write(rawData, mid, rawData.length - mid); + stm.write(expected, mid, expected.length - mid); stm.close(); // check if append is successful FSDataInputStream in5 = fs.open(fileToAppend); - assertTrue(checkFile1(in5)); + assertTrue(checkFile1(in5, expected)); } finally { if (cluster != null) { cluster.shutdown(); @@ -303,11 +306,12 @@ public class TestBlockTokenWithDFS { Path fileToWrite = new Path(FILE_TO_WRITE); FileSystem fs = cluster.getFileSystem(); + byte[] expected = generateBytes(FILE_SIZE); FSDataOutputStream stm = writeFile(fs, fileToWrite, (short) numDataNodes, BLOCK_SIZE); // write a partial block - int mid = rawData.length - 1; - stm.write(rawData, 0, mid); + int mid = expected.length - 1; + stm.write(expected, 0, mid); stm.hflush(); /* @@ -324,11 +328,11 @@ public class TestBlockTokenWithDFS { // remove a datanode to force re-establishing pipeline cluster.stopDataNode(0); // write the rest of the file - stm.write(rawData, mid, rawData.length - mid); + stm.write(expected, mid, expected.length - mid); stm.close(); // check if write is successful FSDataInputStream in4 = fs.open(fileToWrite); - assertTrue(checkFile1(in4)); + assertTrue(checkFile1(in4, expected)); } finally { if (cluster != null) { cluster.shutdown(); @@ -346,125 +350,137 @@ public class TestBlockTokenWithDFS { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build(); cluster.waitActive(); assertEquals(numDataNodes, cluster.getDataNodes().size()); + doTestRead(conf, cluster, false); + } finally { + if (cluster != null) { + cluster.shutdown(); + } + } + } - final NameNode nn = cluster.getNameNode(); - final NamenodeProtocols nnProto = nn.getRpcServer(); - final BlockManager bm = nn.getNamesystem().getBlockManager(); - final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); + protected void doTestRead(Configuration conf, MiniDFSCluster cluster, + boolean isStriped) throws Exception { + final int numDataNodes = cluster.getDataNodes().size(); + final NameNode nn = cluster.getNameNode(); + final NamenodeProtocols nnProto = nn.getRpcServer(); + final BlockManager bm = nn.getNamesystem().getBlockManager(); + final BlockTokenSecretManager sm = bm.getBlockTokenSecretManager(); - // set a short token lifetime (1 second) initially - SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); + // set a short token lifetime (1 second) initially + SecurityTestUtil.setBlockTokenLifetime(sm, 1000L); - Path fileToRead = new Path(FILE_TO_READ); - FileSystem fs = cluster.getFileSystem(); - createFile(fs, fileToRead); + Path fileToRead = new Path(FILE_TO_READ); + FileSystem fs = cluster.getFileSystem(); + byte[] expected = generateBytes(FILE_SIZE); + createFile(fs, fileToRead, expected); /* * setup for testing expiration handling of cached tokens */ - // read using blockSeekTo(). Acquired tokens are cached in in1 - FSDataInputStream in1 = fs.open(fileToRead); - assertTrue(checkFile1(in1)); - // read using blockSeekTo(). Acquired tokens are cached in in2 - FSDataInputStream in2 = fs.open(fileToRead); - assertTrue(checkFile1(in2)); - // read using fetchBlockByteRange(). Acquired tokens are cached in in3 - FSDataInputStream in3 = fs.open(fileToRead); - assertTrue(checkFile2(in3)); + // read using blockSeekTo(). Acquired tokens are cached in in1 + FSDataInputStream in1 = fs.open(fileToRead); + assertTrue(checkFile1(in1,expected)); + // read using blockSeekTo(). Acquired tokens are cached in in2 + FSDataInputStream in2 = fs.open(fileToRead); + assertTrue(checkFile1(in2,expected)); + // read using fetchBlockByteRange(). Acquired tokens are cached in in3 + FSDataInputStream in3 = fs.open(fileToRead); + assertTrue(checkFile2(in3,expected)); /* * testing READ interface on DN using a BlockReader */ - DFSClient client = null; - try { - client = new DFSClient(new InetSocketAddress("localhost", + DFSClient client = null; + try { + client = new DFSClient(new InetSocketAddress("localhost", cluster.getNameNodePort()), conf); - } finally { - if (client != null) client.close(); - } - List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations( - FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks(); - LocatedBlock lblock = locatedBlocks.get(0); // first block - Token<BlockTokenIdentifier> myToken = lblock.getBlockToken(); - // verify token is not expired - assertFalse(SecurityTestUtil.isBlockTokenExpired(myToken)); - // read with valid token, should succeed - tryRead(conf, lblock, true); + } finally { + if (client != null) client.close(); + } + List<LocatedBlock> locatedBlocks = nnProto.getBlockLocations( + FILE_TO_READ, 0, FILE_SIZE).getLocatedBlocks(); + LocatedBlock lblock = locatedBlocks.get(0); // first block + // verify token is not expired + assertFalse(isBlockTokenExpired(lblock)); + // read with valid token, should succeed + tryRead(conf, lblock, true); /* * wait till myToken and all cached tokens in in1, in2 and in3 expire */ - while (!SecurityTestUtil.isBlockTokenExpired(myToken)) { - try { - Thread.sleep(10); - } catch (InterruptedException ignored) { - } + while (!isBlockTokenExpired(lblock)) { + try { + Thread.sleep(10); + } catch (InterruptedException ignored) { } + } /* * continue testing READ interface on DN using a BlockReader */ - // verify token is expired - assertTrue(SecurityTestUtil.isBlockTokenExpired(myToken)); - // read should fail - tryRead(conf, lblock, false); - // use a valid new token - lblock.setBlockToken(sm.generateToken(lblock.getBlock(), - EnumSet.of(BlockTokenIdentifier.AccessMode.READ))); - // read should succeed - tryRead(conf, lblock, true); - // use a token with wrong blockID - ExtendedBlock wrongBlock = new ExtendedBlock(lblock.getBlock() - .getBlockPoolId(), lblock.getBlock().getBlockId() + 1); - lblock.setBlockToken(sm.generateToken(wrongBlock, - EnumSet.of(BlockTokenIdentifier.AccessMode.READ))); - // read should fail - tryRead(conf, lblock, false); - // use a token with wrong access modes - lblock.setBlockToken(sm.generateToken(lblock.getBlock(), - EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE, - BlockTokenIdentifier.AccessMode.COPY, - BlockTokenIdentifier.AccessMode.REPLACE))); - // read should fail - tryRead(conf, lblock, false); - - // set a long token lifetime for future tokens - SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L); + // verify token is expired + assertTrue(isBlockTokenExpired(lblock)); + // read should fail + tryRead(conf, lblock, false); + // use a valid new token + bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ); + // read should succeed + tryRead(conf, lblock, true); + // use a token with wrong blockID + long rightId = lblock.getBlock().getBlockId(); + long wrongId = rightId + 1; + lblock.getBlock().setBlockId(wrongId); + bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.READ); + lblock.getBlock().setBlockId(rightId); + // read should fail + tryRead(conf, lblock, false); + // use a token with wrong access modes + bm.setBlockToken(lblock, BlockTokenIdentifier.AccessMode.WRITE); + // read should fail + tryRead(conf, lblock, false); + + // set a long token lifetime for future tokens + SecurityTestUtil.setBlockTokenLifetime(sm, 600 * 1000L); /* * testing that when cached tokens are expired, DFSClient will re-fetch * tokens transparently for READ. */ - // confirm all tokens cached in in1 are expired by now - List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1); - for (LocatedBlock blk : lblocks) { - assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() is able to re-fetch token transparently - in1.seek(0); - assertTrue(checkFile1(in1)); - - // confirm all tokens cached in in2 are expired by now - List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2); - for (LocatedBlock blk : lblocks2) { - assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() is able to re-fetch token transparently (testing - // via another interface method) + // confirm all tokens cached in in1 are expired by now + List<LocatedBlock> lblocks = DFSTestUtil.getAllBlocks(in1); + for (LocatedBlock blk : lblocks) { + assertTrue(isBlockTokenExpired(blk)); + } + // verify blockSeekTo() is able to re-fetch token transparently + in1.seek(0); + assertTrue(checkFile1(in1, expected)); + + // confirm all tokens cached in in2 are expired by now + List<LocatedBlock> lblocks2 = DFSTestUtil.getAllBlocks(in2); + for (LocatedBlock blk : lblocks2) { + assertTrue(isBlockTokenExpired(blk)); + } + // verify blockSeekTo() is able to re-fetch token transparently (testing + // via another interface method) + if (isStriped) { + // striped block doesn't support seekToNewSource + in2.seek(0); + } else { assertTrue(in2.seekToNewSource(0)); - assertTrue(checkFile1(in2)); + } + assertTrue(checkFile1(in2,expected)); - // confirm all tokens cached in in3 are expired by now - List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3); - for (LocatedBlock blk : lblocks3) { - assertTrue(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify fetchBlockByteRange() is able to re-fetch token transparently - assertTrue(checkFile2(in3)); + // confirm all tokens cached in in3 are expired by now + List<LocatedBlock> lblocks3 = DFSTestUtil.getAllBlocks(in3); + for (LocatedBlock blk : lblocks3) { + assertTrue(isBlockTokenExpired(blk)); + } + // verify fetchBlockByteRange() is able to re-fetch token transparently + assertTrue(checkFile2(in3,expected)); /* * testing that after datanodes are restarted on the same ports, cached @@ -473,37 +489,42 @@ public class TestBlockTokenWithDFS { * new tokens can be fetched from namenode). */ - // restart datanodes on the same ports that they currently use - assertTrue(cluster.restartDataNodes(true)); - cluster.waitActive(); - assertEquals(numDataNodes, cluster.getDataNodes().size()); - cluster.shutdownNameNode(0); + // restart datanodes on the same ports that they currently use + assertTrue(cluster.restartDataNodes(true)); + cluster.waitActive(); + assertEquals(numDataNodes, cluster.getDataNodes().size()); + cluster.shutdownNameNode(0); - // confirm tokens cached in in1 are still valid - lblocks = DFSTestUtil.getAllBlocks(in1); - for (LocatedBlock blk : lblocks) { - assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() still works (forced to use cached tokens) - in1.seek(0); - assertTrue(checkFile1(in1)); - - // confirm tokens cached in in2 are still valid - lblocks2 = DFSTestUtil.getAllBlocks(in2); - for (LocatedBlock blk : lblocks2) { - assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify blockSeekTo() still works (forced to use cached tokens) + // confirm tokens cached in in1 are still valid + lblocks = DFSTestUtil.getAllBlocks(in1); + for (LocatedBlock blk : lblocks) { + assertFalse(isBlockTokenExpired(blk)); + } + // verify blockSeekTo() still works (forced to use cached tokens) + in1.seek(0); + assertTrue(checkFile1(in1,expected)); + + // confirm tokens cached in in2 are still valid + lblocks2 = DFSTestUtil.getAllBlocks(in2); + for (LocatedBlock blk : lblocks2) { + assertFalse(isBlockTokenExpired(blk)); + } + + // verify blockSeekTo() still works (forced to use cached tokens) + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); + } + assertTrue(checkFile1(in2,expected)); - // confirm tokens cached in in3 are still valid - lblocks3 = DFSTestUtil.getAllBlocks(in3); - for (LocatedBlock blk : lblocks3) { - assertFalse(SecurityTestUtil.isBlockTokenExpired(blk.getBlockToken())); - } - // verify fetchBlockByteRange() still works (forced to use cached tokens) - assertTrue(checkFile2(in3)); + // confirm tokens cached in in3 are still valid + lblocks3 = DFSTestUtil.getAllBlocks(in3); + for (LocatedBlock blk : lblocks3) { + assertFalse(isBlockTokenExpired(blk)); + } + // verify fetchBlockByteRange() still works (forced to use cached tokens) + assertTrue(checkFile2(in3,expected)); /* * testing that when namenode is restarted, cached tokens should still @@ -512,18 +533,23 @@ public class TestBlockTokenWithDFS { * setup for this test depends on the previous test. */ - // restart the namenode and then shut it down for test - cluster.restartNameNode(0); - cluster.shutdownNameNode(0); + // restart the namenode and then shut it down for test + cluster.restartNameNode(0); + cluster.shutdownNameNode(0); - // verify blockSeekTo() still works (forced to use cached tokens) - in1.seek(0); - assertTrue(checkFile1(in1)); - // verify again blockSeekTo() still works (forced to use cached tokens) + // verify blockSeekTo() still works (forced to use cached tokens) + in1.seek(0); + assertTrue(checkFile1(in1,expected)); + // verify again blockSeekTo() still works (forced to use cached tokens) + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); - // verify fetchBlockByteRange() still works (forced to use cached tokens) - assertTrue(checkFile2(in3)); + } + assertTrue(checkFile1(in2,expected)); + + // verify fetchBlockByteRange() still works (forced to use cached tokens) + assertTrue(checkFile2(in3,expected)); /* * testing that after both namenode and datanodes got restarted (namenode @@ -532,58 +558,60 @@ public class TestBlockTokenWithDFS { * setup of this test depends on the previous test. */ - // restore the cluster and restart the datanodes for test - cluster.restartNameNode(0); - assertTrue(cluster.restartDataNodes(true)); - cluster.waitActive(); - assertEquals(numDataNodes, cluster.getDataNodes().size()); - - // shutdown namenode so that DFSClient can't get new tokens from namenode - cluster.shutdownNameNode(0); - - // verify blockSeekTo() fails (cached tokens become invalid) - in1.seek(0); - assertFalse(checkFile1(in1)); - // verify fetchBlockByteRange() fails (cached tokens become invalid) - assertFalse(checkFile2(in3)); - - // restart the namenode to allow DFSClient to re-fetch tokens - cluster.restartNameNode(0); - // verify blockSeekTo() works again (by transparently re-fetching - // tokens from namenode) - in1.seek(0); - assertTrue(checkFile1(in1)); + // restore the cluster and restart the datanodes for test + cluster.restartNameNode(0); + assertTrue(cluster.restartDataNodes(true)); + cluster.waitActive(); + assertEquals(numDataNodes, cluster.getDataNodes().size()); + + // shutdown namenode so that DFSClient can't get new tokens from namenode + cluster.shutdownNameNode(0); + + // verify blockSeekTo() fails (cached tokens become invalid) + in1.seek(0); + assertFalse(checkFile1(in1,expected)); + // verify fetchBlockByteRange() fails (cached tokens become invalid) + assertFalse(checkFile2(in3,expected)); + + // restart the namenode to allow DFSClient to re-fetch tokens + cluster.restartNameNode(0); + // verify blockSeekTo() works again (by transparently re-fetching + // tokens from namenode) + in1.seek(0); + assertTrue(checkFile1(in1,expected)); + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); - // verify fetchBlockByteRange() works again (by transparently - // re-fetching tokens from namenode) - assertTrue(checkFile2(in3)); + } + assertTrue(checkFile1(in2,expected)); + // verify fetchBlockByteRange() works again (by transparently + // re-fetching tokens from namenode) + assertTrue(checkFile2(in3,expected)); /* * testing that when datanodes are restarted on different ports, DFSClient * is able to re-fetch tokens transparently to connect to them */ - // restart datanodes on newly assigned ports - assertTrue(cluster.restartDataNodes(false)); - cluster.waitActive(); - assertEquals(numDataNodes, cluster.getDataNodes().size()); - // verify blockSeekTo() is able to re-fetch token transparently - in1.seek(0); - assertTrue(checkFile1(in1)); - // verify blockSeekTo() is able to re-fetch token transparently + // restart datanodes on newly assigned ports + assertTrue(cluster.restartDataNodes(false)); + cluster.waitActive(); + assertEquals(numDataNodes, cluster.getDataNodes().size()); + // verify blockSeekTo() is able to re-fetch token transparently + in1.seek(0); + assertTrue(checkFile1(in1,expected)); + // verify blockSeekTo() is able to re-fetch token transparently + if (isStriped) { + in2.seek(0); + } else { in2.seekToNewSource(0); - assertTrue(checkFile1(in2)); - // verify fetchBlockByteRange() is able to re-fetch token transparently - assertTrue(checkFile2(in3)); - - } finally { - if (cluster != null) { - cluster.shutdown(); - } } - } + assertTrue(checkFile1(in2,expected)); + // verify fetchBlockByteRange() is able to re-fetch token transparently + assertTrue(checkFile2(in3,expected)); + } /** * Integration testing of access token, involving NN, DN, and Balancer */ @@ -593,4 +621,8 @@ public class TestBlockTokenWithDFS { conf.setBoolean(DFSConfigKeys.DFS_BLOCK_ACCESS_TOKEN_ENABLE_KEY, true); new TestBalancer().integrationTest(conf); } + + protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException { + return SecurityTestUtil.isBlockTokenExpired(lb.getBlockToken()); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/ec17ac94/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java new file mode 100644 index 0000000..e212917 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockTokenWithDFSStriped.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock; +import org.apache.hadoop.hdfs.util.StripedBlockUtil; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +public class TestBlockTokenWithDFSStriped extends TestBlockTokenWithDFS { + + private final static int dataBlocks = HdfsConstants.NUM_DATA_BLOCKS; + private final static int parityBlocks = HdfsConstants.NUM_PARITY_BLOCKS; + private final static int cellSize = HdfsConstants.BLOCK_STRIPED_CELL_SIZE; + private final static int stripesPerBlock = 4; + private final static int numDNs = dataBlocks + parityBlocks + 2; + private static MiniDFSCluster cluster; + private static Configuration conf; + + { + BLOCK_SIZE = cellSize * stripesPerBlock; + FILE_SIZE = BLOCK_SIZE * dataBlocks * 3; + } + + @Before + public void setup() throws IOException { + conf = getConf(); + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build(); + cluster.getFileSystem().getClient() + .createErasureCodingZone("/", null, cellSize); + cluster.waitActive(); + } + + @After + public void tearDown() { + if (cluster != null) { + cluster.shutdown(); + } + } + + private Configuration getConf() { + Configuration conf = super.getConf(numDNs); + conf.setInt("io.bytes.per.checksum", cellSize); + return conf; + } + + @Test + @Override + public void testRead() throws Exception { + //TODO: DFSStripedInputStream handles token expiration +// doTestRead(conf, cluster, true); + } + + @Test + @Override + public void testWrite() throws Exception { + //TODO: DFSStripedOutputStream handles token expiration + } + + @Test + @Override + public void testAppend() throws Exception { + //TODO: support Append for striped file + } + + @Test + @Override + public void testEnd2End() throws Exception { + //TODO: DFSStripedOutputStream handles token expiration + } + + @Override + protected void tryRead(final Configuration conf, LocatedBlock lblock, + boolean shouldSucceed) { + LocatedStripedBlock lsb = (LocatedStripedBlock) lblock; + LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup + (lsb, cellSize, dataBlocks, parityBlocks); + for (LocatedBlock internalBlock : internalBlocks) { + super.tryRead(conf, internalBlock, shouldSucceed); + } + } + + @Override + protected boolean isBlockTokenExpired(LocatedBlock lb) throws IOException { + LocatedStripedBlock lsb = (LocatedStripedBlock) lb; + LocatedBlock[] internalBlocks = StripedBlockUtil.parseStripedBlockGroup + (lsb, cellSize, dataBlocks, parityBlocks); + for (LocatedBlock internalBlock : internalBlocks) { + if(super.isBlockTokenExpired(internalBlock)){ + return true; + } + } + return false; + } +}
