Repository: hbase
Updated Branches:
  refs/heads/branch-1.1 941ad56e7 -> b3d69bc02


HBASE-16429 FSHLog: deadlock if rollWriter called when ring buffer filled with 
appends


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b3d69bc0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b3d69bc0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b3d69bc0

Branch: refs/heads/branch-1.1
Commit: b3d69bc029a3b8a4db29b3c266c16643108938dd
Parents: 941ad56
Author: Yu Li <l...@apache.org>
Authored: Thu Aug 18 09:59:36 2016 +0800
Committer: Yu Li <l...@apache.org>
Committed: Thu Aug 18 11:16:54 2016 +0800

----------------------------------------------------------------------
 .../hadoop/hbase/regionserver/wal/FSHLog.java   | 33 +++++++--
 .../hadoop/hbase/regionserver/TestHRegion.java  | 70 ++++++++++++++++++++
 2 files changed, 98 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/b3d69bc0/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 4b4f6b4..53545ed 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -947,8 +947,20 @@ public class FSHLog implements WAL {
     // constructor BEFORE the ring buffer is set running so it is null on 
first time through
     // here; allow for that.
     SyncFuture syncFuture = null;
-    SafePointZigZagLatch zigzagLatch = (this.ringBufferEventHandler == null)?
-      null: this.ringBufferEventHandler.attainSafePoint();
+    SafePointZigZagLatch zigzagLatch = null;
+    long sequence = -1L;
+    if (this.ringBufferEventHandler != null) {
+      // Get sequence first to avoid dead lock when ring buffer is full
+      // Considering below sequence
+      // 1. replaceWriter is called and zigzagLatch is initialized
+      // 2. ringBufferEventHandler#onEvent is called and arrives at 
#attainSafePoint(long) then wait
+      // on safePointReleasedLatch
+      // 3. Since ring buffer is full, if we get sequence when publish sync, 
the replaceWriter
+      // thread will wait for the ring buffer to be consumed, but the only 
consumer is waiting
+      // replaceWriter thread to release safePointReleasedLatch, which causes 
a deadlock
+      sequence = getSequenceOnRingBuffer();
+      zigzagLatch = this.ringBufferEventHandler.attainSafePoint();
+    }
     afterCreatingZigZagLatch();
     TraceScope scope = Trace.startSpan("FSHFile.replaceWriter");
     try {
@@ -959,8 +971,11 @@ public class FSHLog implements WAL {
       // to come back.  Cleanup this syncFuture down below after we are ready 
to run again.
       try {
         if (zigzagLatch != null) {
+          // use assert to make sure no change breaks the logic that
+          // sequence and zigzagLatch will be set together
+          assert sequence > 0L : "Failed to get sequence from ring buffer";
           Trace.addTimelineAnnotation("awaiting safepoint");
-          syncFuture = zigzagLatch.waitSafePoint(publishSyncOnRingBuffer());
+          syncFuture = 
zigzagLatch.waitSafePoint(publishSyncOnRingBuffer(sequence));
         }
       } catch (FailedSyncBeforeLogCloseException e) {
         // If unflushed/unsynced entries on close, it is reason to abort.
@@ -1484,12 +1499,20 @@ public class FSHLog implements WAL {
     return logRollNeeded;
   }
 
-  private SyncFuture publishSyncOnRingBuffer() {
-    return publishSyncOnRingBuffer(null);
+  private SyncFuture publishSyncOnRingBuffer(long sequence) {
+    return publishSyncOnRingBuffer(sequence, null);
+  }
+
+  private long getSequenceOnRingBuffer() {
+    return this.disruptor.getRingBuffer().next();
   }
 
   private SyncFuture publishSyncOnRingBuffer(Span span) {
     long sequence = this.disruptor.getRingBuffer().next();
+    return publishSyncOnRingBuffer(sequence, span);
+  }
+
+  private SyncFuture publishSyncOnRingBuffer(long sequence, Span span) {
     SyncFuture syncFuture = getSyncFuture(sequence, span);
     try {
       RingBufferTruck truck = this.disruptor.getRingBuffer().get(sequence);

http://git-wip-us.apache.org/repos/asf/hbase/blob/b3d69bc0/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
----------------------------------------------------------------------
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index 67e2350..8947eaf 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -6485,4 +6485,74 @@ public class TestHRegion {
     return initHRegion(tableName, callingMethod, HBaseConfiguration.create(),
         families);
   }
+
+  /**
+   * HBASE-16429 Make sure no stuck if roll writer when ring buffer is filled 
with appends
+   * @throws IOException if IO error occurred during test
+   */
+  @Test(timeout = 60000)
+  public void testWritesWhileRollWriter() throws IOException {
+    int testCount = 10;
+    int numRows = 1024;
+    int numFamilies = 2;
+    int numQualifiers = 2;
+    final byte[][] families = new byte[numFamilies][];
+    for (int i = 0; i < numFamilies; i++) {
+      families[i] = Bytes.toBytes("family" + i);
+    }
+    final byte[][] qualifiers = new byte[numQualifiers][];
+    for (int i = 0; i < numQualifiers; i++) {
+      qualifiers[i] = Bytes.toBytes("qual" + i);
+    }
+
+    String method = "testWritesWhileRollWriter";
+    CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 2);
+    this.region = initHRegion(tableName, method, CONF, families);
+    try {
+      List<Thread> threads = new ArrayList<Thread>();
+      for (int i = 0; i < numRows; i++) {
+        final int count = i;
+        Thread t = new Thread(new Runnable() {
+
+          @Override
+          public void run() {
+            byte[] row = Bytes.toBytes("row" + count);
+            Put put = new Put(row);
+            put.setDurability(Durability.SYNC_WAL);
+            byte[] value = Bytes.toBytes(String.valueOf(count));
+            for (byte[] family : families) {
+              for (byte[] qualifier : qualifiers) {
+                put.addColumn(family, qualifier, (long) count, value);
+              }
+            }
+            try {
+              region.put(put);
+            } catch (IOException e) {
+              throw new RuntimeException(e);
+            }
+          }
+        });
+        threads.add(t);
+      }
+      for (Thread t : threads) {
+        t.start();
+      }
+
+      for (int i = 0; i < testCount; i++) {
+        region.getWAL().rollWriter();
+        Thread.yield();
+      }
+    } finally {
+      try {
+        HBaseTestingUtility.closeRegionAndWAL(this.region);
+        CONF.setInt("hbase.regionserver.wal.disruptor.event.count", 16 * 1024);
+      } catch (DroppedSnapshotException dse) {
+        // We could get this on way out because we interrupt the background 
flusher and it could
+        // fail anywhere causing a DSE over in the background flusher... only 
it is not properly
+        // dealt with so could still be memory hanging out when we get to here 
-- memory we can't
+        // flush because the accounting is 'off' since original DSE.
+      }
+      this.region = null;
+    }
+  }
 }

Reply via email to