[16/50] hadoop git commit: HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu)
HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f27a4ad0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f27a4ad0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f27a4ad0 Branch: refs/heads/HDFS-7240 Commit: f27a4ad0324aa0b4080a1c4c6bf4cd560c927e20 Parents: 7532339 Author: Lei Xu Authored: Tue Oct 17 15:52:09 2017 -0700 Committer: Lei Xu Committed: Tue Oct 17 15:52:09 2017 -0700 -- .../hadoop/hdfs/DFSStripedOutputStream.java | 40 +++ .../org/apache/hadoop/hdfs/DataStreamer.java| 31 ++-- .../apache/hadoop/hdfs/ExceptionLastSeen.java | 75 +++ .../TestDFSStripedOutputStreamWithFailure.java | 76 4 files changed, 184 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f27a4ad0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 1b83959..39717ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -82,6 +82,12 @@ public class DFSStripedOutputStream extends DFSOutputStream implements StreamCapabilities { private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + /** + * OutputStream level last exception, will be used to indicate the fatal + * exception of this stream, i.e., being aborted. + */ + private final ExceptionLastSeen exceptionLastSeen = new ExceptionLastSeen(); + static class MultipleBlockingQueue { private final List> queues; @@ -971,12 +977,9 @@ public class DFSStripedOutputStream extends DFSOutputStream if (isClosed()) { return; } - for (StripedDataStreamer streamer : streamers) { -streamer.getLastException().set( -new IOException("Lease timeout of " -+ (dfsClient.getConf().getHdfsTimeout() / 1000) -+ " seconds expired.")); - } + exceptionLastSeen.set(new IOException("Lease timeout of " + + (dfsClient.getConf().getHdfsTimeout() / 1000) + + " seconds expired.")); try { closeThreads(true); @@ -1133,18 +1136,26 @@ public class DFSStripedOutputStream extends DFSOutputStream @Override protected synchronized void closeImpl() throws IOException { if (isClosed()) { + exceptionLastSeen.check(true); + + // Writing to at least {dataUnits} replicas can be considered as success, + // and the rest of data can be recovered. + final int minReplication = ecPolicy.getNumDataUnits(); + int goodStreamers = 0; final MultipleIOException.Builder b = new MultipleIOException.Builder(); - for(int i = 0; i < streamers.size(); i++) { -final StripedDataStreamer si = getStripedDataStreamer(i); + for (final StripedDataStreamer si : streamers) { try { si.getLastException().check(true); + goodStreamers++; } catch (IOException e) { b.add(e); } } - final IOException ioe = b.build(); - if (ioe != null) { -throw ioe; + if (goodStreamers < minReplication) { +final IOException ioe = b.build(); +if (ioe != null) { + throw ioe; +} } return; } @@ -1183,9 +1194,10 @@ public class DFSStripedOutputStream extends DFSOutputStream } } finally { // Failures may happen when flushing data/parity data out. Exceptions -// may be thrown if more than 3 streamers fail, or updatePipeline RPC -// fails. Streamers may keep waiting for the new block/GS information. -// Thus need to force closing these threads. +// may be thrown if the number of failed streamers is more than the +// number of parity blocks, or updatePipeline RPC fails. Streamers may +// keep waiting for the new block/GS information. Thus need to force +// closing these threads. closeThreads(true); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f27a4ad0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java -- diff --git a/hadoop-hdfs-project/hado
[32/50] [abbrv] hadoop git commit: HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu)
HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f27a4ad0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f27a4ad0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f27a4ad0 Branch: refs/heads/YARN-1011 Commit: f27a4ad0324aa0b4080a1c4c6bf4cd560c927e20 Parents: 7532339 Author: Lei Xu Authored: Tue Oct 17 15:52:09 2017 -0700 Committer: Lei Xu Committed: Tue Oct 17 15:52:09 2017 -0700 -- .../hadoop/hdfs/DFSStripedOutputStream.java | 40 +++ .../org/apache/hadoop/hdfs/DataStreamer.java| 31 ++-- .../apache/hadoop/hdfs/ExceptionLastSeen.java | 75 +++ .../TestDFSStripedOutputStreamWithFailure.java | 76 4 files changed, 184 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f27a4ad0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 1b83959..39717ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -82,6 +82,12 @@ public class DFSStripedOutputStream extends DFSOutputStream implements StreamCapabilities { private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + /** + * OutputStream level last exception, will be used to indicate the fatal + * exception of this stream, i.e., being aborted. + */ + private final ExceptionLastSeen exceptionLastSeen = new ExceptionLastSeen(); + static class MultipleBlockingQueue { private final List> queues; @@ -971,12 +977,9 @@ public class DFSStripedOutputStream extends DFSOutputStream if (isClosed()) { return; } - for (StripedDataStreamer streamer : streamers) { -streamer.getLastException().set( -new IOException("Lease timeout of " -+ (dfsClient.getConf().getHdfsTimeout() / 1000) -+ " seconds expired.")); - } + exceptionLastSeen.set(new IOException("Lease timeout of " + + (dfsClient.getConf().getHdfsTimeout() / 1000) + + " seconds expired.")); try { closeThreads(true); @@ -1133,18 +1136,26 @@ public class DFSStripedOutputStream extends DFSOutputStream @Override protected synchronized void closeImpl() throws IOException { if (isClosed()) { + exceptionLastSeen.check(true); + + // Writing to at least {dataUnits} replicas can be considered as success, + // and the rest of data can be recovered. + final int minReplication = ecPolicy.getNumDataUnits(); + int goodStreamers = 0; final MultipleIOException.Builder b = new MultipleIOException.Builder(); - for(int i = 0; i < streamers.size(); i++) { -final StripedDataStreamer si = getStripedDataStreamer(i); + for (final StripedDataStreamer si : streamers) { try { si.getLastException().check(true); + goodStreamers++; } catch (IOException e) { b.add(e); } } - final IOException ioe = b.build(); - if (ioe != null) { -throw ioe; + if (goodStreamers < minReplication) { +final IOException ioe = b.build(); +if (ioe != null) { + throw ioe; +} } return; } @@ -1183,9 +1194,10 @@ public class DFSStripedOutputStream extends DFSOutputStream } } finally { // Failures may happen when flushing data/parity data out. Exceptions -// may be thrown if more than 3 streamers fail, or updatePipeline RPC -// fails. Streamers may keep waiting for the new block/GS information. -// Thus need to force closing these threads. +// may be thrown if the number of failed streamers is more than the +// number of parity blocks, or updatePipeline RPC fails. Streamers may +// keep waiting for the new block/GS information. Thus need to force +// closing these threads. closeThreads(true); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f27a4ad0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java -- diff --git a/hadoop-hdfs-project/hado
[02/50] [abbrv] hadoop git commit: HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu)
HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6959db9c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6959db9c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6959db9c Branch: refs/heads/resource-types Commit: 6959db9c20217f6adb12e9f3140f5db9a26c38c4 Parents: 81a8686 Author: Lei Xu Authored: Tue Oct 17 15:52:09 2017 -0700 Committer: Lei Xu Committed: Tue Oct 17 15:53:07 2017 -0700 -- .../hadoop/hdfs/DFSStripedOutputStream.java | 40 +++ .../org/apache/hadoop/hdfs/DataStreamer.java| 31 ++-- .../apache/hadoop/hdfs/ExceptionLastSeen.java | 75 +++ .../TestDFSStripedOutputStreamWithFailure.java | 76 4 files changed, 184 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6959db9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 1b83959..39717ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -82,6 +82,12 @@ public class DFSStripedOutputStream extends DFSOutputStream implements StreamCapabilities { private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + /** + * OutputStream level last exception, will be used to indicate the fatal + * exception of this stream, i.e., being aborted. + */ + private final ExceptionLastSeen exceptionLastSeen = new ExceptionLastSeen(); + static class MultipleBlockingQueue { private final List> queues; @@ -971,12 +977,9 @@ public class DFSStripedOutputStream extends DFSOutputStream if (isClosed()) { return; } - for (StripedDataStreamer streamer : streamers) { -streamer.getLastException().set( -new IOException("Lease timeout of " -+ (dfsClient.getConf().getHdfsTimeout() / 1000) -+ " seconds expired.")); - } + exceptionLastSeen.set(new IOException("Lease timeout of " + + (dfsClient.getConf().getHdfsTimeout() / 1000) + + " seconds expired.")); try { closeThreads(true); @@ -1133,18 +1136,26 @@ public class DFSStripedOutputStream extends DFSOutputStream @Override protected synchronized void closeImpl() throws IOException { if (isClosed()) { + exceptionLastSeen.check(true); + + // Writing to at least {dataUnits} replicas can be considered as success, + // and the rest of data can be recovered. + final int minReplication = ecPolicy.getNumDataUnits(); + int goodStreamers = 0; final MultipleIOException.Builder b = new MultipleIOException.Builder(); - for(int i = 0; i < streamers.size(); i++) { -final StripedDataStreamer si = getStripedDataStreamer(i); + for (final StripedDataStreamer si : streamers) { try { si.getLastException().check(true); + goodStreamers++; } catch (IOException e) { b.add(e); } } - final IOException ioe = b.build(); - if (ioe != null) { -throw ioe; + if (goodStreamers < minReplication) { +final IOException ioe = b.build(); +if (ioe != null) { + throw ioe; +} } return; } @@ -1183,9 +1194,10 @@ public class DFSStripedOutputStream extends DFSOutputStream } } finally { // Failures may happen when flushing data/parity data out. Exceptions -// may be thrown if more than 3 streamers fail, or updatePipeline RPC -// fails. Streamers may keep waiting for the new block/GS information. -// Thus need to force closing these threads. +// may be thrown if the number of failed streamers is more than the +// number of parity blocks, or updatePipeline RPC fails. Streamers may +// keep waiting for the new block/GS information. Thus need to force +// closing these threads. closeThreads(true); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6959db9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java -- diff --git a/hadoop-hdfs-project
hadoop git commit: HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu)
Repository: hadoop Updated Branches: refs/heads/trunk 75323394f -> f27a4ad03 HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f27a4ad0 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f27a4ad0 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f27a4ad0 Branch: refs/heads/trunk Commit: f27a4ad0324aa0b4080a1c4c6bf4cd560c927e20 Parents: 7532339 Author: Lei Xu Authored: Tue Oct 17 15:52:09 2017 -0700 Committer: Lei Xu Committed: Tue Oct 17 15:52:09 2017 -0700 -- .../hadoop/hdfs/DFSStripedOutputStream.java | 40 +++ .../org/apache/hadoop/hdfs/DataStreamer.java| 31 ++-- .../apache/hadoop/hdfs/ExceptionLastSeen.java | 75 +++ .../TestDFSStripedOutputStreamWithFailure.java | 76 4 files changed, 184 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/f27a4ad0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 1b83959..39717ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -82,6 +82,12 @@ public class DFSStripedOutputStream extends DFSOutputStream implements StreamCapabilities { private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + /** + * OutputStream level last exception, will be used to indicate the fatal + * exception of this stream, i.e., being aborted. + */ + private final ExceptionLastSeen exceptionLastSeen = new ExceptionLastSeen(); + static class MultipleBlockingQueue { private final List> queues; @@ -971,12 +977,9 @@ public class DFSStripedOutputStream extends DFSOutputStream if (isClosed()) { return; } - for (StripedDataStreamer streamer : streamers) { -streamer.getLastException().set( -new IOException("Lease timeout of " -+ (dfsClient.getConf().getHdfsTimeout() / 1000) -+ " seconds expired.")); - } + exceptionLastSeen.set(new IOException("Lease timeout of " + + (dfsClient.getConf().getHdfsTimeout() / 1000) + + " seconds expired.")); try { closeThreads(true); @@ -1133,18 +1136,26 @@ public class DFSStripedOutputStream extends DFSOutputStream @Override protected synchronized void closeImpl() throws IOException { if (isClosed()) { + exceptionLastSeen.check(true); + + // Writing to at least {dataUnits} replicas can be considered as success, + // and the rest of data can be recovered. + final int minReplication = ecPolicy.getNumDataUnits(); + int goodStreamers = 0; final MultipleIOException.Builder b = new MultipleIOException.Builder(); - for(int i = 0; i < streamers.size(); i++) { -final StripedDataStreamer si = getStripedDataStreamer(i); + for (final StripedDataStreamer si : streamers) { try { si.getLastException().check(true); + goodStreamers++; } catch (IOException e) { b.add(e); } } - final IOException ioe = b.build(); - if (ioe != null) { -throw ioe; + if (goodStreamers < minReplication) { +final IOException ioe = b.build(); +if (ioe != null) { + throw ioe; +} } return; } @@ -1183,9 +1194,10 @@ public class DFSStripedOutputStream extends DFSOutputStream } } finally { // Failures may happen when flushing data/parity data out. Exceptions -// may be thrown if more than 3 streamers fail, or updatePipeline RPC -// fails. Streamers may keep waiting for the new block/GS information. -// Thus need to force closing these threads. +// may be thrown if the number of failed streamers is more than the +// number of parity blocks, or updatePipeline RPC fails. Streamers may +// keep waiting for the new block/GS information. Thus need to force +// closing these threads. closeThreads(true); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/f27a4ad0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
hadoop git commit: HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu)
Repository: hadoop Updated Branches: refs/heads/branch-3.0 81a86860b -> 6959db9c2 HDFS-12612. DFSStripedOutputStream.close will throw if called a second time with a failed streamer. (Lei (Eddy) Xu) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6959db9c Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6959db9c Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6959db9c Branch: refs/heads/branch-3.0 Commit: 6959db9c20217f6adb12e9f3140f5db9a26c38c4 Parents: 81a8686 Author: Lei Xu Authored: Tue Oct 17 15:52:09 2017 -0700 Committer: Lei Xu Committed: Tue Oct 17 15:53:07 2017 -0700 -- .../hadoop/hdfs/DFSStripedOutputStream.java | 40 +++ .../org/apache/hadoop/hdfs/DataStreamer.java| 31 ++-- .../apache/hadoop/hdfs/ExceptionLastSeen.java | 75 +++ .../TestDFSStripedOutputStreamWithFailure.java | 76 4 files changed, 184 insertions(+), 38 deletions(-) -- http://git-wip-us.apache.org/repos/asf/hadoop/blob/6959db9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java -- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 1b83959..39717ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -82,6 +82,12 @@ public class DFSStripedOutputStream extends DFSOutputStream implements StreamCapabilities { private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + /** + * OutputStream level last exception, will be used to indicate the fatal + * exception of this stream, i.e., being aborted. + */ + private final ExceptionLastSeen exceptionLastSeen = new ExceptionLastSeen(); + static class MultipleBlockingQueue { private final List> queues; @@ -971,12 +977,9 @@ public class DFSStripedOutputStream extends DFSOutputStream if (isClosed()) { return; } - for (StripedDataStreamer streamer : streamers) { -streamer.getLastException().set( -new IOException("Lease timeout of " -+ (dfsClient.getConf().getHdfsTimeout() / 1000) -+ " seconds expired.")); - } + exceptionLastSeen.set(new IOException("Lease timeout of " + + (dfsClient.getConf().getHdfsTimeout() / 1000) + + " seconds expired.")); try { closeThreads(true); @@ -1133,18 +1136,26 @@ public class DFSStripedOutputStream extends DFSOutputStream @Override protected synchronized void closeImpl() throws IOException { if (isClosed()) { + exceptionLastSeen.check(true); + + // Writing to at least {dataUnits} replicas can be considered as success, + // and the rest of data can be recovered. + final int minReplication = ecPolicy.getNumDataUnits(); + int goodStreamers = 0; final MultipleIOException.Builder b = new MultipleIOException.Builder(); - for(int i = 0; i < streamers.size(); i++) { -final StripedDataStreamer si = getStripedDataStreamer(i); + for (final StripedDataStreamer si : streamers) { try { si.getLastException().check(true); + goodStreamers++; } catch (IOException e) { b.add(e); } } - final IOException ioe = b.build(); - if (ioe != null) { -throw ioe; + if (goodStreamers < minReplication) { +final IOException ioe = b.build(); +if (ioe != null) { + throw ioe; +} } return; } @@ -1183,9 +1194,10 @@ public class DFSStripedOutputStream extends DFSOutputStream } } finally { // Failures may happen when flushing data/parity data out. Exceptions -// may be thrown if more than 3 streamers fail, or updatePipeline RPC -// fails. Streamers may keep waiting for the new block/GS information. -// Thus need to force closing these threads. +// may be thrown if the number of failed streamers is more than the +// number of parity blocks, or updatePipeline RPC fails. Streamers may +// keep waiting for the new block/GS information. Thus need to force +// closing these threads. closeThreads(true); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/6959db9c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java --