Repository: hadoop
Updated Branches:
  refs/heads/branch-2.9 b756beb67 -> 3516ef45f


HDFS-12754. Lease renewal can hit a deadlock. Contributed by Kuhu Shukla.

(cherry picked from commit 45f59bde60a21138fdb0fb846588db422d1c97a5)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3516ef45
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3516ef45
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3516ef45

Branch: refs/heads/branch-2.9
Commit: 3516ef45f3bb7aaf6e928283d27ad27a8c3c2ada
Parents: b756beb
Author: Kihwal Lee <kih...@apache.org>
Authored: Mon Nov 27 17:01:34 2017 -0600
Committer: Kihwal Lee <kih...@apache.org>
Committed: Mon Nov 27 17:01:34 2017 -0600

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  | 15 +++-
 .../hadoop/hdfs/DFSClientFaultInjector.java     |  2 +
 .../hadoop/hdfs/client/impl/LeaseRenewer.java   | 55 +++++---------
 .../hdfs/client/impl/TestLeaseRenewer.java      | 23 ++----
 .../hadoop/hdfs/TestDFSClientRetries.java       | 80 ++++++++++++++++++++
 .../hadoop/hdfs/TestDistributedFileSystem.java  |  5 +-
 6 files changed, 122 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/3516ef45/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 75128ae..3fd8220 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -484,12 +484,21 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
   /** Get a lease and start automatic renewal */
   private void beginFileLease(final long inodeId, final DFSOutputStream out)
       throws IOException {
-    getLeaseRenewer().put(inodeId, out, this);
+    synchronized (filesBeingWritten) {
+      putFileBeingWritten(inodeId, out);
+      getLeaseRenewer().put(this);
+    }
   }
 
   /** Stop renewal of lease for the file. */
   void endFileLease(final long inodeId) {
-    getLeaseRenewer().closeFile(inodeId, this);
+    synchronized (filesBeingWritten) {
+      removeFileBeingWritten(inodeId);
+      // remove client from renewer if no files are open
+      if (filesBeingWritten.isEmpty()) {
+        getLeaseRenewer().closeClient(this);
+      }
+    }
   }
 
 
@@ -615,9 +624,9 @@ public class DFSClient implements java.io.Closeable, 
RemotePeerFactory,
   @Override
   public synchronized void close() throws IOException {
     if(clientRunning) {
+      // lease renewal stops when all files are closed
       closeAllFilesBeingWritten(false);
       clientRunning = false;
-      getLeaseRenewer().closeClient(this);
       // close connections to the namenode
       closeConnectionToNamenode();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3516ef45/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
index b58cf16..d36c058 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClientFaultInjector.java
@@ -63,4 +63,6 @@ public class DFSClientFaultInjector {
   }
 
   public void sleepBeforeHedgedGet() {}
+
+  public void delayWhenRenewLeaseTimeout() {}
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3516ef45/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
index 6faf133..e33d024 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
@@ -30,7 +30,7 @@ import java.util.Map;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSClient;
-import org.apache.hadoop.hdfs.DFSOutputStream;
+import org.apache.hadoop.hdfs.DFSClientFaultInjector;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.Daemon;
@@ -76,7 +76,7 @@ import org.slf4j.LoggerFactory;
 public class LeaseRenewer {
   static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
 
-  static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L;
+  private static long leaseRenewerGraceDefault = 60*1000L;
   static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
 
   /** Get a {@link LeaseRenewer} instance */
@@ -156,9 +156,7 @@ public class LeaseRenewer {
       final LeaseRenewer stored = renewers.get(r.factorykey);
       //Since a renewer may expire, the stored renewer can be different.
       if (r == stored) {
-        if (!r.clientsRunning()) {
-          renewers.remove(r.factorykey);
-        }
+        renewers.remove(r.factorykey);
       }
     }
   }
@@ -201,7 +199,7 @@ public class LeaseRenewer {
 
   private LeaseRenewer(Factory.Key factorykey) {
     this.factorykey = factorykey;
-    unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT);
+    unsyncSetGraceSleepPeriod(leaseRenewerGraceDefault);
 
     if (LOG.isTraceEnabled()) {
       instantiationTrace = StringUtils.stringifyException(
@@ -293,8 +291,7 @@ public class LeaseRenewer {
         && Time.monotonicNow() - emptyTime > gracePeriod;
   }
 
-  public synchronized void put(final long inodeId, final DFSOutputStream out,
-      final DFSClient dfsc) {
+  public synchronized void put(final DFSClient dfsc) {
     if (dfsc.isClientRunning()) {
       if (!isRunning() || isRenewerExpired()) {
         //start a new deamon with a new id.
@@ -328,7 +325,6 @@ public class LeaseRenewer {
         });
         daemon.start();
       }
-      dfsc.putFileBeingWritten(inodeId, out);
       emptyTime = Long.MAX_VALUE;
     }
   }
@@ -338,28 +334,6 @@ public class LeaseRenewer {
     emptyTime = time;
   }
 
-  /** Close a file. */
-  public void closeFile(final long inodeId, final DFSClient dfsc) {
-    dfsc.removeFileBeingWritten(inodeId);
-
-    synchronized(this) {
-      if (dfsc.isFilesBeingWrittenEmpty()) {
-        dfsclients.remove(dfsc);
-      }
-      //update emptyTime if necessary
-      if (emptyTime == Long.MAX_VALUE) {
-        for(DFSClient c : dfsclients) {
-          if (!c.isFilesBeingWrittenEmpty()) {
-            //found a non-empty file-being-written map
-            return;
-          }
-        }
-        //discover the first time that all file-being-written maps are empty.
-        emptyTime = Time.monotonicNow();
-      }
-    }
-  }
-
   /** Close the given client. */
   public synchronized void closeClient(final DFSClient dfsc) {
     dfsclients.remove(dfsc);
@@ -447,14 +421,17 @@ public class LeaseRenewer {
         } catch (SocketTimeoutException ie) {
           LOG.warn("Failed to renew lease for " + clientsString() + " for "
               + (elapsed/1000) + " seconds.  Aborting ...", ie);
+          List<DFSClient> dfsclientsCopy;
           synchronized (this) {
-            while (!dfsclients.isEmpty()) {
-              DFSClient dfsClient = dfsclients.get(0);
-              dfsClient.closeAllFilesBeingWritten(true);
-              closeClient(dfsClient);
-            }
+            DFSClientFaultInjector.get().delayWhenRenewLeaseTimeout();
+            dfsclientsCopy = new ArrayList<>(dfsclients);
+            dfsclients.clear();
             //Expire the current LeaseRenewer thread.
             emptyTime = 0;
+            Factory.INSTANCE.remove(LeaseRenewer.this);
+          }
+          for (DFSClient dfsClient : dfsclientsCopy) {
+            dfsClient.closeAllFilesBeingWritten(true);
           }
           break;
         } catch (IOException ie) {
@@ -511,4 +488,10 @@ public class LeaseRenewer {
       return b.append("]").toString();
     }
   }
+
+  @VisibleForTesting
+  public static void setLeaseRenewerGraceDefault(
+      long leaseRenewerGraceDefault) {
+    LeaseRenewer.leaseRenewerGraceDefault = leaseRenewerGraceDefault;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3516ef45/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
index eb10e96..f73ea6d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs-client/src/test/java/org/apache/hadoop/hdfs/client/impl/TestLeaseRenewer.java
@@ -109,7 +109,7 @@ public class TestLeaseRenewer {
     // Set up a file so that we start renewing our lease.
     DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
     long fileId = 123L;
-    renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
+    renewer.put(MOCK_DFSCLIENT);
 
     // Wait for lease to get renewed
     long failTime = Time.monotonicNow() + 5000;
@@ -121,7 +121,7 @@ public class TestLeaseRenewer {
       Assert.fail("Did not renew lease at all!");
     }
 
-    renewer.closeFile(fileId, MOCK_DFSCLIENT);
+    renewer.closeClient(MOCK_DFSCLIENT);
   }
 
   /**
@@ -136,11 +136,8 @@ public class TestLeaseRenewer {
     Mockito.doReturn(false).when(mockClient1).renewLease();
     assertSame(renewer, LeaseRenewer.getInstance(
         FAKE_AUTHORITY, FAKE_UGI_A, mockClient1));
-
-    // Set up a file so that we start renewing our lease.
-    DFSOutputStream mockStream1 = Mockito.mock(DFSOutputStream.class);
     long fileId = 456L;
-    renewer.put(fileId, mockStream1, mockClient1);
+    renewer.put(mockClient1);
 
     // Second DFSClient does renew lease
     final DFSClient mockClient2 = createMockClient();
@@ -148,9 +145,7 @@ public class TestLeaseRenewer {
     assertSame(renewer, LeaseRenewer.getInstance(
         FAKE_AUTHORITY, FAKE_UGI_A, mockClient2));
 
-    // Set up a file so that we start renewing our lease.
-    DFSOutputStream mockStream2 = Mockito.mock(DFSOutputStream.class);
-    renewer.put(fileId, mockStream2, mockClient2);
+    renewer.put(mockClient2);
 
 
     // Wait for lease to get renewed
@@ -171,19 +166,17 @@ public class TestLeaseRenewer {
       }
     }, 100, 10000);
 
-    renewer.closeFile(fileId, mockClient1);
-    renewer.closeFile(fileId, mockClient2);
+    renewer.closeClient(mockClient1);
+    renewer.closeClient(mockClient2);
   }
 
   @Test
   public void testThreadName() throws Exception {
-    DFSOutputStream mockStream = Mockito.mock(DFSOutputStream.class);
-    long fileId = 789L;
     Assert.assertFalse("Renewer not initially running",
         renewer.isRunning());
 
     // Pretend to open a file
-    renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
+    renewer.put(MOCK_DFSCLIENT);
 
     Assert.assertTrue("Renewer should have started running",
         renewer.isRunning());
@@ -193,7 +186,7 @@ public class TestLeaseRenewer {
     Assert.assertEquals("LeaseRenewer:myuser@hdfs://nn1/", threadName);
 
     // Pretend to close the file
-    renewer.closeFile(fileId, MOCK_DFSCLIENT);
+    renewer.closeClient(MOCK_DFSCLIENT);
     renewer.setEmptyTime(Time.monotonicNow());
 
     // Should stop the renewer running within a few seconds

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3516ef45/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
index cf93943..e9985b9 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSClientRetries.java
@@ -47,6 +47,7 @@ import java.util.Arrays;
 import java.util.EnumSet;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
@@ -1236,4 +1237,83 @@ public class TestDFSClientRetries {
       cluster.shutdown();
     }
   }
+
+  @Test(timeout=120000)
+  public void testLeaseRenewAndDFSOutputStreamDeadLock() throws Exception {
+    final CountDownLatch testLatch = new CountDownLatch(1);
+    DFSClientFaultInjector.set(new DFSClientFaultInjector() {
+      public void delayWhenRenewLeaseTimeout() {
+        try {
+          testLatch.await();
+        } catch (InterruptedException e) {
+          e.printStackTrace();
+        }
+      }
+    });
+    String file1 = "/testFile1";
+    // Set short retry timeouts so this test runs faster
+    conf.setInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000);
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
+    try {
+      cluster.waitActive();
+      final NamenodeProtocols spyNN = spy(cluster.getNameNodeRpc());
+
+      doAnswer(new SleepFixedTimeAnswer(1500, testLatch)).when(spyNN).complete(
+          anyString(), anyString(), any(ExtendedBlock.class), anyLong());
+      DFSClient client = new DFSClient(null, spyNN, conf, null);
+      // Get hold of the lease renewer instance used by the client
+      LeaseRenewer leaseRenewer = client.getLeaseRenewer();
+      leaseRenewer.setRenewalTime(100);
+      final OutputStream out1 = client.create(file1, false);
+
+      out1.write(new byte[256]);
+
+      Thread closeThread = new Thread(new Runnable() {
+        @Override public void run() {
+          try {
+            //1. trigger get LeaseRenewer lock
+            Mockito.doThrow(new SocketTimeoutException()).when(spyNN)
+                .renewLease(Mockito.anyString());
+          } catch (IOException e) {
+            e.printStackTrace();
+          }
+        }
+      });
+      closeThread.start();
+
+      //2. trigger get DFSOutputStream lock
+      out1.close();
+
+    } finally {
+      cluster.shutdown();
+    }
+  }
+
+  private static class SleepFixedTimeAnswer implements Answer<Object> {
+    private final int sleepTime;
+    private final CountDownLatch testLatch;
+
+    SleepFixedTimeAnswer(int sleepTime, CountDownLatch latch) {
+      this.sleepTime = sleepTime;
+      this.testLatch = latch;
+    }
+
+    @Override
+    public Object answer(InvocationOnMock invocation) throws Throwable {
+      boolean interrupted = false;
+      try {
+        Thread.sleep(sleepTime);
+      } catch (InterruptedException ie) {
+        interrupted = true;
+      }
+      try {
+        return invocation.callRealMethod();
+      } finally {
+        testLatch.countDown();
+        if (interrupted) {
+          Thread.currentThread().interrupt();
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/3516ef45/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
index 0cff7d4..d92a67d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
@@ -290,6 +290,7 @@ public class TestDistributedFileSystem {
     Configuration conf = getTestConfiguration();
     final long grace = 1000L;
     MiniDFSCluster cluster = null;
+    LeaseRenewer.setLeaseRenewerGraceDefault(grace);
 
     try {
       cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
@@ -302,10 +303,6 @@ public class TestDistributedFileSystem {
 
       {
         final DistributedFileSystem dfs = cluster.getFileSystem();
-        Method setMethod = dfs.dfs.getLeaseRenewer().getClass()
-            .getDeclaredMethod("setGraceSleepPeriod", long.class);
-        setMethod.setAccessible(true);
-        setMethod.invoke(dfs.dfs.getLeaseRenewer(), grace);
         Method checkMethod = dfs.dfs.getLeaseRenewer().getClass()
             .getDeclaredMethod("isRunning");
         checkMethod.setAccessible(true);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to