This is an automated email from the ASF dual-hosted git repository. surendralilhore pushed a commit to branch branch-3.2 in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push: new a0cdebc HDFS-14373. EC : Decoding is failing when block group last incomplete cell fall in to AlignedStripe. Contributed by Surendra Singh Lilhore. a0cdebc is described below commit a0cdebc409bb252e99a3ae2482f8543ba5d044d0 Author: Surendra Singh Lilhore <surendralilh...@apache.org> AuthorDate: Tue Oct 8 00:14:30 2019 +0530 HDFS-14373. EC : Decoding is failing when block group last incomplete cell fall in to AlignedStripe. Contributed by Surendra Singh Lilhore. (cherry picked from commit 382967be51052d59e31d8d05713645b8d3c2325b) --- .../java/org/apache/hadoop/hdfs/StripeReader.java | 4 ++ .../apache/hadoop/hdfs/util/StripedBlockUtil.java | 20 +++++++-- .../hadoop/hdfs/TestDFSStripedInputStream.java | 47 ++++++++++++++++++++++ 3 files changed, 68 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java index e90af84..8fd38bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripeReader.java @@ -248,6 +248,8 @@ abstract class StripeReader { DFSClient.LOG.warn("Found Checksum error for " + currentBlock + " from " + currentNode + " at " + ce.getPos()); + //Clear buffer to make next decode success + strategy.getReadBuffer().clear(); // we want to remember which block replicas we have tried corruptedBlocks.addCorruptedBlock(currentBlock, currentNode); throw ce; @@ -255,6 +257,8 @@ abstract class StripeReader { DFSClient.LOG.warn("Exception while reading from " + currentBlock + " of " + dfsStripedInputStream.getSrc() + " from " + currentNode, e); + //Clear buffer to make next decode success + strategy.getReadBuffer().clear(); throw e; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java index 2245757..718c51d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/util/StripedBlockUtil.java @@ -356,7 +356,8 @@ public class StripedBlockUtil { cells); // Step 3: merge into stripes - AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges); + AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges, + blockGroup, cellSize); // Step 4: calculate each chunk's position in destination buffer. Since the // whole read range is within a single stripe, the logic is simpler here. @@ -417,7 +418,8 @@ public class StripedBlockUtil { cells); // Step 3: merge into at most 5 stripes - AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges); + AlignedStripe[] stripes = mergeRangesForInternalBlocks(ecPolicy, ranges, + blockGroup, cellSize); // Step 4: calculate each chunk's position in destination buffer calcualteChunkPositionsInBuf(cellSize, stripes, cells, buf); @@ -513,7 +515,8 @@ public class StripedBlockUtil { * {@link AlignedStripe} instances. */ private static AlignedStripe[] mergeRangesForInternalBlocks( - ErasureCodingPolicy ecPolicy, VerticalRange[] ranges) { + ErasureCodingPolicy ecPolicy, VerticalRange[] ranges, + LocatedStripedBlock blockGroup, int cellSize) { int dataBlkNum = ecPolicy.getNumDataUnits(); int parityBlkNum = ecPolicy.getNumParityUnits(); List<AlignedStripe> stripes = new ArrayList<>(); @@ -525,6 +528,17 @@ public class StripedBlockUtil { } } + // Add block group last cell offset in stripePoints if it is fall in to read + // offset range. + int lastCellIdxInBG = (int) (blockGroup.getBlockSize() / cellSize); + int idxInInternalBlk = lastCellIdxInBG / ecPolicy.getNumDataUnits(); + long lastCellEndOffset = (idxInInternalBlk * (long)cellSize) + + (blockGroup.getBlockSize() % cellSize); + if (stripePoints.first() < lastCellEndOffset + && stripePoints.last() > lastCellEndOffset) { + stripePoints.add(lastCellEndOffset); + } + long prev = -1; for (long point : stripePoints) { if (prev >= 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index 5734171..3355691 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -19,8 +19,11 @@ package org.apache.hadoop.hdfs; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.hdfs.protocol.LocatedBlock; @@ -561,6 +564,50 @@ public class TestDFSStripedInputStream { } } + @Test + public void testReadWhenLastIncompleteCellComeInToDecodeAlignedStripe() + throws IOException { + DataNodeProperties stopDataNode = null; + try { + cluster.waitActive(); + ErasureCodingPolicy policy = getEcPolicy(); + DistributedFileSystem filesystem = cluster.getFileSystem(); + filesystem.enableErasureCodingPolicy(policy.getName()); + Path dir = new Path("/tmp"); + filesystem.mkdirs(dir); + filesystem.getClient().setErasureCodingPolicy(dir.toString(), + policy.getName()); + Path f = new Path(dir, "file"); + + //1. File with one stripe, last data cell should be half filed. + long fileLength = (policy.getCellSize() * policy.getNumDataUnits()) + - (policy.getCellSize() / 2); + DFSTestUtil.createFile(filesystem, f, fileLength, (short) 1, 0); + + //2. Stop first DN from stripe. + LocatedBlocks lbs = cluster.getNameNodeRpc().getBlockLocations( + f.toString(), 0, fileLength); + LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0)); + final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(bg, + cellSize, dataBlocks, parityBlocks); + cluster.stopDataNode(blocks[0].getLocations()[0].getName()); + + //3. Do pread for fist cell, reconstruction should happen + try (FSDataInputStream in = filesystem.open(f)) { + DFSStripedInputStream stripedIn = (DFSStripedInputStream) in + .getWrappedStream(); + byte[] b = new byte[policy.getCellSize()]; + stripedIn.read(0, b, 0, policy.getCellSize()); + } + } catch (HadoopIllegalArgumentException e) { + fail(e.getMessage()); + } finally { + if (stopDataNode != null) { + cluster.restartDataNode(stopDataNode, true); + } + } + } + /** * Empties the pool for the specified buffer type, for the current ecPolicy. * <p> --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org