HADOOP-15292. Distcp's use of pread is slowing it down. Contributed by Virajith Jalaparti.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3bd6b1fd Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3bd6b1fd Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3bd6b1fd Branch: refs/heads/HDFS-7240 Commit: 3bd6b1fd85c44354c777ef4fda6415231505b2a4 Parents: b451889 Author: Steve Loughran <ste...@apache.org> Authored: Thu Mar 8 11:15:46 2018 +0000 Committer: Steve Loughran <ste...@apache.org> Committed: Thu Mar 8 11:15:46 2018 +0000 ---------------------------------------------------------------------- .../tools/mapred/RetriableFileCopyCommand.java | 24 ++++++---- .../hadoop/tools/util/ThrottledInputStream.java | 48 +++++++++++--------- .../hadoop/tools/mapred/TestCopyMapper.java | 24 +++++++++- 3 files changed, 66 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bd6b1fd/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java index 21f621a..0311061 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/RetriableFileCopyCommand.java @@ -260,7 +260,8 @@ public class RetriableFileCopyCommand extends RetriableCommand { boolean finished = false; try { inStream = getInputStream(source, context.getConfiguration()); - int bytesRead = readBytes(inStream, buf, sourceOffset); + seekIfRequired(inStream, sourceOffset); + int bytesRead = readBytes(inStream, buf); while (bytesRead >= 0) { if (chunkLength > 0 && (totalBytesRead + bytesRead) >= chunkLength) { @@ -276,7 +277,7 @@ public class RetriableFileCopyCommand extends RetriableCommand { if (finished) { break; } - bytesRead = readBytes(inStream, buf, sourceOffset); + bytesRead = readBytes(inStream, buf); } outStream.close(); outStream = null; @@ -299,13 +300,20 @@ public class RetriableFileCopyCommand extends RetriableCommand { context.setStatus(message.toString()); } - private static int readBytes(ThrottledInputStream inStream, byte buf[], - long position) throws IOException { + private static int readBytes(ThrottledInputStream inStream, byte buf[]) + throws IOException { + try { + return inStream.read(buf); + } catch (IOException e) { + throw new CopyReadException(e); + } + } + + private static void seekIfRequired(ThrottledInputStream inStream, + long sourceOffset) throws IOException { try { - if (position == 0) { - return inStream.read(buf); - } else { - return inStream.read(position, buf, 0, buf.length); + if (sourceOffset != inStream.getPos()) { + inStream.seek(sourceOffset); } } catch (IOException e) { throw new CopyReadException(e); http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bd6b1fd/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java index 2d2f10c..4d3676a 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/util/ThrottledInputStream.java @@ -18,7 +18,7 @@ package org.apache.hadoop.tools.util; -import org.apache.hadoop.fs.PositionedReadable; +import org.apache.hadoop.fs.Seekable; import java.io.IOException; import java.io.InputStream; @@ -33,7 +33,7 @@ import java.io.InputStream; * (Thus, while the read-rate might exceed the maximum for a given short interval, * the average tends towards the specified maximum, overall.) */ -public class ThrottledInputStream extends InputStream { +public class ThrottledInputStream extends InputStream implements Seekable { private final InputStream rawStream; private final float maxBytesPerSec; @@ -95,25 +95,6 @@ public class ThrottledInputStream extends InputStream { return readLen; } - /** - * Read bytes starting from the specified position. This requires rawStream is - * an instance of {@link PositionedReadable}. - */ - public int read(long position, byte[] buffer, int offset, int length) - throws IOException { - if (!(rawStream instanceof PositionedReadable)) { - throw new UnsupportedOperationException( - "positioned read is not supported by the internal stream"); - } - throttle(); - int readLen = ((PositionedReadable) rawStream).read(position, buffer, - offset, length); - if (readLen != -1) { - bytesRead += readLen; - } - return readLen; - } - private void throttle() throws IOException { while (getBytesPerSec() > maxBytesPerSec) { try { @@ -165,4 +146,29 @@ public class ThrottledInputStream extends InputStream { ", totalSleepTime=" + totalSleepTime + '}'; } + + private void checkSeekable() throws IOException { + if (!(rawStream instanceof Seekable)) { + throw new UnsupportedOperationException( + "seek operations are unsupported by the internal stream"); + } + } + + @Override + public void seek(long pos) throws IOException { + checkSeekable(); + ((Seekable) rawStream).seek(pos); + } + + @Override + public long getPos() throws IOException { + checkSeekable(); + return ((Seekable) rawStream).getPos(); + } + + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + checkSeekable(); + return ((Seekable) rawStream).seekToNewSource(targetPos); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bd6b1fd/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java index fd998c8..da51326 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyMapper.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.tools.CopyListingFileStatus; @@ -55,6 +56,10 @@ import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; + public class TestCopyMapper { private static final Log LOG = LogFactory.getLog(TestCopyMapper.class); private static List<Path> pathList = new ArrayList<Path>(); @@ -248,7 +253,11 @@ public class TestCopyMapper { // do the distcp again with -update and -append option CopyMapper copyMapper = new CopyMapper(); - StubContext stubContext = new StubContext(getConfiguration(), null, 0); + Configuration conf = getConfiguration(); + // set the buffer size to 1/10th the size of the file. + conf.setInt(DistCpOptionSwitch.COPY_BUFFER_SIZE.getConfigLabel(), + DEFAULT_FILE_SIZE/10); + StubContext stubContext = new StubContext(conf, null, 0); Mapper<Text, CopyListingFileStatus, Text, Text>.Context context = stubContext.getContext(); // Enable append @@ -257,6 +266,10 @@ public class TestCopyMapper { copyMapper.setup(context); int numFiles = 0; + MetricsRecordBuilder rb = + getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + String readCounter = "ReadsFromLocalClient"; + long readsFromClient = getLongCounter(readCounter, rb); for (Path path: pathList) { if (fs.getFileStatus(path).isFile()) { numFiles++; @@ -274,6 +287,15 @@ public class TestCopyMapper { .getValue()); Assert.assertEquals(numFiles, stubContext.getReporter(). getCounter(CopyMapper.Counter.COPY).getValue()); + rb = getMetrics(cluster.getDataNodes().get(0).getMetrics().name()); + /* + * added as part of HADOOP-15292 to ensure that multiple readBlock() + * operations are not performed to read a block from a single Datanode. + * assert assumes that there is only one block per file, and that the number + * of files appended to in appendSourceData() above is captured by the + * variable numFiles. + */ + assertCounter(readCounter, readsFromClient + numFiles, rb); } private void testCopy(boolean preserveChecksum) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org