HDFS-12612. DFSStripedOutputStream.close will throw if called a second time 
with a failed streamer. (Lei (Eddy) Xu)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f27a4ad0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f27a4ad0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f27a4ad0

Branch: refs/heads/YARN-1011
Commit: f27a4ad0324aa0b4080a1c4c6bf4cd560c927e20
Parents: 7532339
Author: Lei Xu <l...@apache.org>
Authored: Tue Oct 17 15:52:09 2017 -0700
Committer: Lei Xu <l...@apache.org>
Committed: Tue Oct 17 15:52:09 2017 -0700

----------------------------------------------------------------------
 .../hadoop/hdfs/DFSStripedOutputStream.java     | 40 +++++++----
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 31 ++------
 .../apache/hadoop/hdfs/ExceptionLastSeen.java   | 75 +++++++++++++++++++
 .../TestDFSStripedOutputStreamWithFailure.java  | 76 ++++++++++++++++++++
 4 files changed, 184 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f27a4ad0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
index 1b83959..39717ef 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java
@@ -82,6 +82,12 @@ public class DFSStripedOutputStream extends DFSOutputStream
     implements StreamCapabilities {
   private static final ByteBufferPool BUFFER_POOL = new 
ElasticByteBufferPool();
 
+  /**
+   * OutputStream level last exception, will be used to indicate the fatal
+   * exception of this stream, i.e., being aborted.
+   */
+  private final ExceptionLastSeen exceptionLastSeen = new ExceptionLastSeen();
+
   static class MultipleBlockingQueue<T> {
     private final List<BlockingQueue<T>> queues;
 
@@ -971,12 +977,9 @@ public class DFSStripedOutputStream extends DFSOutputStream
       if (isClosed()) {
         return;
       }
-      for (StripedDataStreamer streamer : streamers) {
-        streamer.getLastException().set(
-            new IOException("Lease timeout of "
-                + (dfsClient.getConf().getHdfsTimeout() / 1000)
-                + " seconds expired."));
-      }
+      exceptionLastSeen.set(new IOException("Lease timeout of "
+          + (dfsClient.getConf().getHdfsTimeout() / 1000)
+          + " seconds expired."));
 
       try {
         closeThreads(true);
@@ -1133,18 +1136,26 @@ public class DFSStripedOutputStream extends 
DFSOutputStream
   @Override
   protected synchronized void closeImpl() throws IOException {
     if (isClosed()) {
+      exceptionLastSeen.check(true);
+
+      // Writing to at least {dataUnits} replicas can be considered as success,
+      // and the rest of data can be recovered.
+      final int minReplication = ecPolicy.getNumDataUnits();
+      int goodStreamers = 0;
       final MultipleIOException.Builder b = new MultipleIOException.Builder();
-      for(int i = 0; i < streamers.size(); i++) {
-        final StripedDataStreamer si = getStripedDataStreamer(i);
+      for (final StripedDataStreamer si : streamers) {
         try {
           si.getLastException().check(true);
+          goodStreamers++;
         } catch (IOException e) {
           b.add(e);
         }
       }
-      final IOException ioe = b.build();
-      if (ioe != null) {
-        throw ioe;
+      if (goodStreamers < minReplication) {
+        final IOException ioe = b.build();
+        if (ioe != null) {
+          throw ioe;
+        }
       }
       return;
     }
@@ -1183,9 +1194,10 @@ public class DFSStripedOutputStream extends 
DFSOutputStream
         }
       } finally {
         // Failures may happen when flushing data/parity data out. Exceptions
-        // may be thrown if more than 3 streamers fail, or updatePipeline RPC
-        // fails. Streamers may keep waiting for the new block/GS information.
-        // Thus need to force closing these threads.
+        // may be thrown if the number of failed streamers is more than the
+        // number of parity blocks, or updatePipeline RPC fails. Streamers may
+        // keep waiting for the new block/GS information. Thus need to force
+        // closing these threads.
         closeThreads(true);
       }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f27a4ad0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 99fa5f3..c1473dd 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -285,39 +285,22 @@ class DataStreamer extends Daemon {
     packets.clear();
   }
 
-  class LastExceptionInStreamer {
-    private IOException thrown;
-
-    synchronized void set(Throwable t) {
-      assert t != null;
-      this.thrown = t instanceof IOException ?
-          (IOException) t : new IOException(t);
-    }
-
-    synchronized void clear() {
-      thrown = null;
-    }
-
-    /** Check if there already is an exception. */
+  class LastExceptionInStreamer extends ExceptionLastSeen {
+    /**
+     * Check if there already is an exception.
+     */
+    @Override
     synchronized void check(boolean resetToNull) throws IOException {
+      final IOException thrown = get();
       if (thrown != null) {
         if (LOG.isTraceEnabled()) {
           // wrap and print the exception to know when the check is called
           LOG.trace("Got Exception while checking, " + DataStreamer.this,
               new Throwable(thrown));
         }
-        final IOException e = thrown;
-        if (resetToNull) {
-          thrown = null;
-        }
-        throw e;
+        super.check(resetToNull);
       }
     }
-
-    synchronized void throwException4Close() throws IOException {
-      check(false);
-      throw new ClosedChannelException();
-    }
   }
 
   enum ErrorType {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f27a4ad0/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExceptionLastSeen.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExceptionLastSeen.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExceptionLastSeen.java
new file mode 100644
index 0000000..06bc5d2
--- /dev/null
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ExceptionLastSeen.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+import java.io.IOException;
+import java.nio.channels.ClosedChannelException;
+
+/**
+ * The exception last seen by the {@link DataStreamer} or
+ * {@link DFSOutputStream}.
+ */
+@InterfaceAudience.Private
+class ExceptionLastSeen {
+  private IOException thrown;
+
+  /** Get the last seen exception. */
+  synchronized protected IOException get() {
+    return thrown;
+  }
+
+  /**
+   * Set the last seen exception.
+   * @param t the exception.
+   */
+  synchronized void set(Throwable t) {
+    assert t != null;
+    this.thrown = t instanceof IOException ?
+        (IOException) t : new IOException(t);
+  }
+
+  /** Clear the last seen exception. */
+  synchronized void clear() {
+    thrown = null;
+  }
+
+  /**
+   * Check if there already is an exception. Throw the exception if exist.
+   *
+   * @param resetToNull set to true to reset exception to null after calling
+   *                    this function.
+   * @throws IOException on existing IOException.
+   */
+  synchronized void check(boolean resetToNull) throws IOException {
+    if (thrown != null) {
+      final IOException e = thrown;
+      if (resetToNull) {
+        thrown = null;
+      }
+      throw e;
+    }
+  }
+
+  synchronized void throwException4Close() throws IOException {
+    check(false);
+    throw new ClosedChannelException();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f27a4ad0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
index 57da439..e7fa278 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedOutputStreamWithFailure.java
@@ -319,6 +319,82 @@ public class TestDFSStripedOutputStreamWithFailure {
     }
   }
 
+  private void testCloseWithExceptionsInStreamer(
+      int numFailures, boolean shouldFail) throws Exception {
+    assertTrue(numFailures <=
+        ecPolicy.getNumDataUnits() + ecPolicy.getNumParityUnits());
+    final Path dirFile = new Path(dir, "ecfile-" + numFailures);
+    try (FSDataOutputStream out = dfs.create(dirFile, true)) {
+      out.write("idempotent close".getBytes());
+
+      // Expect to raise IOE on the first close call, but any following
+      // close() should be no-op.
+      LambdaTestUtils.intercept(IOException.class,
+          out::close);
+
+      assertTrue(out.getWrappedStream() instanceof DFSStripedOutputStream);
+      DFSStripedOutputStream stripedOut =
+          (DFSStripedOutputStream) out.getWrappedStream();
+      for (int i = 0; i < numFailures; i++) {
+        // Only inject 1 stream failure.
+        stripedOut.getStripedDataStreamer(i).getLastException().set(
+            new IOException("injected failure")
+        );
+      }
+      if (shouldFail) {
+        LambdaTestUtils.intercept(IOException.class, out::close);
+      }
+
+      // Close multiple times. All the following close() should have no
+      // side-effect.
+      out.close();
+    }
+  }
+
+  // HDFS-12612
+  @Test
+  public void testIdempotentCloseWithFailedStreams() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    try {
+      setup(conf);
+      // shutdown few datanodes to avoid getting sufficient data blocks number
+      // of datanodes.
+      while (cluster.getDataNodes().size() >= dataBlocks) {
+        cluster.stopDataNode(0);
+      }
+      cluster.restartNameNodes();
+      cluster.triggerHeartbeats();
+
+      testCloseWithExceptionsInStreamer(1, false);
+      testCloseWithExceptionsInStreamer(ecPolicy.getNumParityUnits(), false);
+      testCloseWithExceptionsInStreamer(ecPolicy.getNumParityUnits() + 1, 
true);
+      testCloseWithExceptionsInStreamer(ecPolicy.getNumDataUnits(), true);
+    } finally {
+      tearDown();
+    }
+  }
+
+  @Test
+  public void testCloseAfterAbort() throws Exception {
+    HdfsConfiguration conf = new HdfsConfiguration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
+    try {
+      setup(conf);
+
+      final Path dirFile = new Path(dir, "ecfile");
+      FSDataOutputStream out = dfs.create(dirFile, true);
+      assertTrue(out.getWrappedStream() instanceof DFSStripedOutputStream);
+      DFSStripedOutputStream stripedOut =
+          (DFSStripedOutputStream) out.getWrappedStream();
+      stripedOut.abort();
+      LambdaTestUtils.intercept(IOException.class,
+          "Lease timeout", stripedOut::close);
+    } finally {
+      tearDown();
+    }
+  }
+
   @Test(timeout = 90000)
   public void testAddBlockWhenNoSufficientParityNumOfNodes()
       throws IOException {


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to