This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.6 by this push:
     new 0d0952261a8 HBASE-28190 Add slow sync log rolling test in 
TestAsyncLogRolling. (#5583)
0d0952261a8 is described below

commit 0d0952261a8bc2a0ffbe1b6efc918afb5f197d88
Author: Fantasy-Jay <13631435...@163.com>
AuthorDate: Tue Dec 19 11:56:11 2023 +0800

    HBASE-28190 Add slow sync log rolling test in TestAsyncLogRolling. (#5583)
    
    Signed-off-by: Duo Zhang <zhang...@apache.org>
    (cherry picked from commit c668cbcb8acc25a059ee102557e02a959cbcada7)
---
 .../hbase/regionserver/wal/AbstractFSWAL.java      |   4 +
 .../hadoop/hbase/regionserver/wal/FSHLog.java      |   8 -
 .../regionserver/wal/AbstractTestLogRolling.java   | 106 +++++++++-
 .../regionserver/wal/TestAsyncLogRolling.java      |  65 ++++++
 .../hadoop/hbase/regionserver/wal/TestFSHLog.java  |  12 +-
 .../hbase/regionserver/wal/TestLogRolling.java     | 234 +++++----------------
 6 files changed, 225 insertions(+), 204 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index c9ff80f69c6..0beffbd1dd7 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -1395,6 +1395,10 @@ public abstract class AbstractFSWAL<W extends 
WriterBase> implements WAL {
     WALSplitter.split(baseDir, p, archiveDir, fs, conf, 
WALFactory.getInstance(conf));
   }
 
+  W getWriter() {
+    return this.writer;
+  }
+
   private static void usage() {
     System.err.println("Usage: AbstractFSWAL <ARGS>");
     System.err.println("Arguments:");
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
index 6afe2e06794..42f0235f67b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java
@@ -1208,12 +1208,4 @@ public class FSHLog extends AbstractFSWAL<Writer> {
     }
     return new DatanodeInfo[0];
   }
-
-  Writer getWriter() {
-    return this.writer;
-  }
-
-  void setWriter(Writer writer) {
-    this.writer = writer;
-  }
 }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
index 722f5ce5bc2..b06fbb51b74 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java
@@ -20,9 +20,13 @@ package org.apache.hadoop.hbase.regionserver.wal;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThan;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
@@ -31,6 +35,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.StartMiniClusterOption;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Get;
@@ -48,8 +53,10 @@ import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALProvider;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
@@ -59,6 +66,8 @@ import org.junit.rules.TestName;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import 
org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * Test log deletion as logs are rolled.
  */
@@ -74,6 +83,10 @@ public abstract class AbstractTestLogRolling {
   protected static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
   @Rule
   public final TestName name = new TestName();
+  protected static int syncLatencyMillis;
+  private static int rowNum = 1;
+  private static final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
+  protected static ScheduledExecutorService EXECUTOR;
 
   public AbstractTestLogRolling() {
     this.server = null;
@@ -118,6 +131,17 @@ public abstract class AbstractTestLogRolling {
     // disable low replication check for log roller to get a more stable result
     // TestWALOpenAfterDNRollingStart will test this option.
     conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L 
* 60 * 60 * 1000);
+
+    // For slow sync threshold test: roll after 5 slow syncs in 10 seconds
+    conf.setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
+    conf.setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000);
+    // For slow sync threshold test: roll once after a sync above this 
threshold
+    conf.setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
+
+    // Slow sync executor.
+    EXECUTOR = Executors
+      .newSingleThreadScheduledExecutor(new 
ThreadFactoryBuilder().setNameFormat("Slow-sync-%d")
+        
.setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build());
   }
 
   @Before
@@ -139,6 +163,11 @@ public abstract class AbstractTestLogRolling {
     TEST_UTIL.shutdownMiniCluster();
   }
 
+  @AfterClass
+  public static void tearDownAfterClass() {
+    EXECUTOR.shutdownNow();
+  }
+
   private void startAndWriteData() throws IOException, InterruptedException {
     this.server = cluster.getRegionServerThreads().get(0).getRegionServer();
 
@@ -158,6 +187,74 @@ public abstract class AbstractTestLogRolling {
     }
   }
 
+  private static void setSyncLatencyMillis(int latency) {
+    syncLatencyMillis = latency;
+  }
+
+  protected final AbstractFSWAL<?> getWALAndRegisterSlowSyncHook(RegionInfo 
region)
+    throws IOException {
+    // Get a reference to the wal.
+    final AbstractFSWAL<?> log = (AbstractFSWAL<?>) server.getWAL(region);
+
+    // Register a WALActionsListener to observe if a SLOW_SYNC roll is 
requested
+    log.registerWALActionsListener(new WALActionsListener() {
+      @Override
+      public void logRollRequested(RollRequestReason reason) {
+        switch (reason) {
+          case SLOW_SYNC:
+            slowSyncHookCalled.lazySet(true);
+            break;
+          default:
+            break;
+        }
+      }
+    });
+    return log;
+  }
+
+  protected final void checkSlowSync(AbstractFSWAL<?> log, Table table, int 
slowSyncLatency,
+    int writeCount, boolean slowSync) throws Exception {
+    if (slowSyncLatency > 0) {
+      setSyncLatencyMillis(slowSyncLatency);
+      setSlowLogWriter(log.conf);
+    } else {
+      setDefaultLogWriter(log.conf);
+    }
+
+    // Set up for test
+    log.rollWriter(true);
+    slowSyncHookCalled.set(false);
+
+    final WALProvider.WriterBase oldWriter = log.getWriter();
+
+    // Write some data
+    for (int i = 0; i < writeCount; i++) {
+      writeData(table, rowNum++);
+    }
+
+    if (slowSync) {
+      TEST_UTIL.waitFor(10000, 100, new 
Waiter.ExplainingPredicate<Exception>() {
+        @Override
+        public boolean evaluate() throws Exception {
+          return log.getWriter() != oldWriter;
+        }
+
+        @Override
+        public String explainFailure() throws Exception {
+          return "Waited too long for our test writer to get rolled out";
+        }
+      });
+
+      assertTrue("Should have triggered log roll due to SLOW_SYNC", 
slowSyncHookCalled.get());
+    } else {
+      assertFalse("Should not have triggered log roll due to SLOW_SYNC", 
slowSyncHookCalled.get());
+    }
+  }
+
+  protected abstract void setSlowLogWriter(Configuration conf);
+
+  protected abstract void setDefaultLogWriter(Configuration conf);
+
   /**
    * Tests that log rolling doesn't hang when no data is written.
    */
@@ -239,12 +336,10 @@ public abstract class AbstractTestLogRolling {
    */
   @Test
   public void testCompactionRecordDoesntBlockRolling() throws Exception {
-    Table table = null;
 
     // When the hbase:meta table can be opened, the region servers are running
-    Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
-    try {
-      table = createTestTable(getName());
+    try (Table t = 
TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME);
+      Table table = createTestTable(getName())) {
 
       server = TEST_UTIL.getRSForFirstRegionInTable(table.getName());
       HRegion region = server.getRegions(table.getName()).get(0);
@@ -289,9 +384,6 @@ public abstract class AbstractTestLogRolling {
       log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added.
       assertEquals("Should have 1 WALs at the end", 1,
         AbstractFSWALProvider.getNumRolledLogFiles(log));
-    } finally {
-      if (t != null) t.close();
-      if (table != null) table.close();
     }
   }
 
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java
index 9dc27a693a7..804e93eb8f5 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java
@@ -20,10 +20,17 @@ package org.apache.hadoop.hbase.regionserver.wal;
 import static org.junit.Assert.assertEquals;
 
 import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.RegionInfo;
 import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
 import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests;
@@ -36,6 +43,9 @@ import org.junit.ClassRule;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
 
+import org.apache.hbase.thirdparty.io.netty.channel.Channel;
+import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
+
 @Category({ VerySlowRegionServerTests.class, LargeTests.class })
 public class TestAsyncLogRolling extends AbstractTestLogRolling {
 
@@ -51,6 +61,61 @@ public class TestAsyncLogRolling extends 
AbstractTestLogRolling {
     AbstractTestLogRolling.setUpBeforeClass();
   }
 
+  public static class SlowSyncLogWriter extends AsyncProtobufLogWriter {
+
+    public SlowSyncLogWriter(EventLoopGroup eventLoopGroup, Class<? extends 
Channel> channelClass) {
+      super(eventLoopGroup, channelClass);
+    }
+
+    @Override
+    public CompletableFuture<Long> sync(boolean forceSync) {
+      CompletableFuture<Long> future = new CompletableFuture<>();
+      super.sync(forceSync).whenCompleteAsync((lengthAfterFlush, error) -> {
+        EXECUTOR.schedule(() -> {
+          if (error != null) {
+            future.completeExceptionally(error);
+          } else {
+            future.complete(lengthAfterFlush);
+          }
+        }, syncLatencyMillis, TimeUnit.MILLISECONDS);
+      });
+      return future;
+    }
+  }
+
+  @Override
+  protected void setSlowLogWriter(Configuration conf) {
+    conf.set(AsyncFSWALProvider.WRITER_IMPL, 
SlowSyncLogWriter.class.getName());
+  }
+
+  @Override
+  protected void setDefaultLogWriter(Configuration conf) {
+    conf.set(AsyncFSWALProvider.WRITER_IMPL, 
AsyncProtobufLogWriter.class.getName());
+  }
+
+  @Test
+  public void testSlowSyncLogRolling() throws Exception {
+    // Create the test table
+    TableDescriptor desc = 
TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
+    admin.createTable(desc);
+    try (Table table = 
TEST_UTIL.getConnection().getTable(desc.getTableName())) {
+      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
+      RegionInfo region = 
server.getRegions(desc.getTableName()).get(0).getRegionInfo();
+      final AbstractFSWAL<?> log = getWALAndRegisterSlowSyncHook(region);
+
+      // Set default log writer, no additional latency to any sync on the hlog.
+      checkSlowSync(log, table, -1, 10, false);
+
+      // Adds 5000 ms of latency to any sync on the hlog. This will trip the 
other threshold.
+      // Write some data. Should only take one sync.
+      checkSlowSync(log, table, 5000, 1, true);
+
+      // Set default log writer, no additional latency to any sync on the hlog.
+      checkSlowSync(log, table, -1, 10, false);
+    }
+  }
+
   @Test
   public void testLogRollOnDatanodeDeath() throws IOException, 
InterruptedException {
     dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 3, true, null, 
null);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
index a8a88553141..0b317c2c74a 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java
@@ -53,6 +53,7 @@ import 
org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.CommonFSUtils;
 import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.apache.hadoop.hbase.wal.WAL;
 import org.apache.hadoop.hbase.wal.WALEdit;
 import org.apache.hadoop.hbase.wal.WALKey;
@@ -196,9 +197,10 @@ public class TestFSHLog extends AbstractTestFSWAL {
     }
 
     final String name = this.name.getMethodName();
-    try (CustomFSHLog log = new CustomFSHLog(FS, 
CommonFSUtils.getRootDir(CONF), name,
-      HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) {
-      log.setWriter(new FailingWriter());
+    Configuration conf = new Configuration(CONF);
+    conf.set(FSHLogProvider.WRITER_IMPL, FailingWriter.class.getName());
+    try (CustomFSHLog log = new CustomFSHLog(FS, 
CommonFSUtils.getRootDir(conf), name,
+      HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null)) {
       Field ringBufferEventHandlerField = 
FSHLog.class.getDeclaredField("ringBufferEventHandler");
       ringBufferEventHandlerField.setAccessible(true);
       FSHLog.RingBufferEventHandler ringBufferEventHandler =
@@ -208,7 +210,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
       try {
         SyncFuture future0 = log.publishSyncOnRingBuffer();
         // Wait for the sync to be done.
-        Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone);
+        Waiter.waitFor(conf, TEST_TIMEOUT_MS, future0::isDone);
         // Publish another sync from the same thread, this should not 
overwrite the done sync.
         SyncFuture future1 = log.publishSyncOnRingBuffer();
         assertFalse(future1.isDone());
@@ -217,7 +219,7 @@ public class TestFSHLog extends AbstractTestFSWAL {
         // Wait for the safe point to be reached.
         // With the deadlock in HBASE-25984, this is never possible, thus 
blocking the sync
         // pipeline.
-        Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained);
+        Waiter.waitFor(conf, TEST_TIMEOUT_MS, latch::isSafePointAttained);
       } finally {
         // Force release the safe point, for the clean up.
         latch.releaseSafePoint();
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
index c098140fbe9..8065d86cac1 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.HBaseClassTestRule;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.RegionInfo;
@@ -56,10 +55,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.JVMClusterUtil;
 import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils;
 import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.apache.hadoop.hbase.wal.FSHLogProvider;
 import org.apache.hadoop.hbase.wal.WAL;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALProvider.Writer;
 import org.apache.hadoop.hbase.wal.WALStreamReader;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
@@ -98,192 +96,30 @@ public class TestLogRolling extends AbstractTestLogRolling 
{
     conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3);
     conf.set(WALFactory.WAL_PROVIDER, "filesystem");
     AbstractTestLogRolling.setUpBeforeClass();
-
-    // For slow sync threshold test: roll after 5 slow syncs in 10 seconds
-    TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5);
-    TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 
* 1000);
-    // For slow sync threshold test: roll once after a sync above this 
threshold
-    TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000);
   }
 
-  @Test
-  public void testSlowSyncLogRolling() throws Exception {
-    // Create the test table
-    TableDescriptor desc = 
TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
-      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
-    admin.createTable(desc);
-    Table table = TEST_UTIL.getConnection().getTable(desc.getTableName());
-    int row = 1;
-    try {
-      // Get a reference to the FSHLog
-      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
-      RegionInfo region = 
server.getRegions(desc.getTableName()).get(0).getRegionInfo();
-      final FSHLog log = (FSHLog) server.getWAL(region);
-
-      // Register a WALActionsListener to observe if a SLOW_SYNC roll is 
requested
-
-      final AtomicBoolean slowSyncHookCalled = new AtomicBoolean();
-      log.registerWALActionsListener(new WALActionsListener() {
-        @Override
-        public void logRollRequested(WALActionsListener.RollRequestReason 
reason) {
-          switch (reason) {
-            case SLOW_SYNC:
-              slowSyncHookCalled.lazySet(true);
-              break;
-            default:
-              break;
-          }
-        }
-      });
-
-      // Write some data
-
-      for (int i = 0; i < 10; i++) {
-        writeData(table, row++);
-      }
-
-      assertFalse("Should not have triggered log roll due to SLOW_SYNC", 
slowSyncHookCalled.get());
-
-      // Set up for test
-      slowSyncHookCalled.set(false);
-
-      // Wrap the current writer with the anonymous class below that adds 200 
ms of
-      // latency to any sync on the hlog. This should be more than sufficient 
to trigger
-      // slow sync warnings.
-      final Writer oldWriter1 = log.getWriter();
-      final Writer newWriter1 = new Writer() {
-        @Override
-        public void close() throws IOException {
-          oldWriter1.close();
-        }
-
-        @Override
-        public void sync(boolean forceSync) throws IOException {
-          try {
-            Thread.sleep(200);
-          } catch (InterruptedException e) {
-            InterruptedIOException ex = new InterruptedIOException();
-            ex.initCause(e);
-            throw ex;
-          }
-          oldWriter1.sync(forceSync);
-        }
-
-        @Override
-        public void append(Entry entry) throws IOException {
-          oldWriter1.append(entry);
-        }
-
-        @Override
-        public long getLength() {
-          return oldWriter1.getLength();
-        }
-
-        @Override
-        public long getSyncedLength() {
-          return oldWriter1.getSyncedLength();
-        }
-      };
-      log.setWriter(newWriter1);
-
-      // Write some data.
-      // We need to write at least 5 times, but double it. We should only 
request
-      // a SLOW_SYNC roll once in the current interval.
-      for (int i = 0; i < 10; i++) {
-        writeData(table, row++);
-      }
-
-      // Wait for our wait injecting writer to get rolled out, as needed.
-
-      TEST_UTIL.waitFor(10000, 100, new 
Waiter.ExplainingPredicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return log.getWriter() != newWriter1;
-        }
-
-        @Override
-        public String explainFailure() throws Exception {
-          return "Waited too long for our test writer to get rolled out";
-        }
-      });
-
-      assertTrue("Should have triggered log roll due to SLOW_SYNC", 
slowSyncHookCalled.get());
-
-      // Set up for test
-      slowSyncHookCalled.set(false);
-
-      // Wrap the current writer with the anonymous class below that adds 5000 
ms of
-      // latency to any sync on the hlog.
-      // This will trip the other threshold.
-      final Writer oldWriter2 = (Writer) log.getWriter();
-      final Writer newWriter2 = new Writer() {
-        @Override
-        public void close() throws IOException {
-          oldWriter2.close();
-        }
-
-        @Override
-        public void sync(boolean forceSync) throws IOException {
-          try {
-            Thread.sleep(5000);
-          } catch (InterruptedException e) {
-            InterruptedIOException ex = new InterruptedIOException();
-            ex.initCause(e);
-            throw ex;
-          }
-          oldWriter2.sync(forceSync);
-        }
-
-        @Override
-        public void append(Entry entry) throws IOException {
-          oldWriter2.append(entry);
-        }
-
-        @Override
-        public long getLength() {
-          return oldWriter2.getLength();
-        }
-
-        @Override
-        public long getSyncedLength() {
-          return oldWriter2.getSyncedLength();
-        }
-      };
-      log.setWriter(newWriter2);
-
-      // Write some data. Should only take one sync.
-
-      writeData(table, row++);
-
-      // Wait for our wait injecting writer to get rolled out, as needed.
-
-      TEST_UTIL.waitFor(10000, 100, new 
Waiter.ExplainingPredicate<Exception>() {
-        @Override
-        public boolean evaluate() throws Exception {
-          return log.getWriter() != newWriter2;
-        }
-
-        @Override
-        public String explainFailure() throws Exception {
-          return "Waited too long for our test writer to get rolled out";
-        }
-      });
-
-      assertTrue("Should have triggered log roll due to SLOW_SYNC", 
slowSyncHookCalled.get());
-
-      // Set up for test
-      slowSyncHookCalled.set(false);
-
-      // Write some data
-      for (int i = 0; i < 10; i++) {
-        writeData(table, row++);
+  public static class SlowSyncLogWriter extends ProtobufLogWriter {
+    @Override
+    public void sync(boolean forceSync) throws IOException {
+      try {
+        Thread.sleep(syncLatencyMillis);
+      } catch (InterruptedException e) {
+        InterruptedIOException ex = new InterruptedIOException();
+        ex.initCause(e);
+        throw ex;
       }
+      super.sync(forceSync);
+    }
+  }
 
-      assertFalse("Should not have triggered log roll due to SLOW_SYNC", 
slowSyncHookCalled.get());
+  @Override
+  protected void setSlowLogWriter(Configuration conf) {
+    conf.set(FSHLogProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName());
+  }
 
-    } finally {
-      table.close();
-    }
+  @Override
+  protected void setDefaultLogWriter(Configuration conf) {
+    conf.set(FSHLogProvider.WRITER_IMPL, ProtobufLogWriter.class.getName());
   }
 
   void batchWriteAndWait(Table table, final FSHLog log, int start, boolean 
expect, int timeout)
@@ -313,6 +149,36 @@ public class TestLogRolling extends AbstractTestLogRolling 
{
     }
   }
 
+  @Test
+  public void testSlowSyncLogRolling() throws Exception {
+    // Create the test table
+    TableDescriptor desc = 
TableDescriptorBuilder.newBuilder(TableName.valueOf(getName()))
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build();
+    admin.createTable(desc);
+    try (Table table = 
TEST_UTIL.getConnection().getTable(desc.getTableName())) {
+      server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName());
+      RegionInfo region = 
server.getRegions(desc.getTableName()).get(0).getRegionInfo();
+      final AbstractFSWAL<?> log = getWALAndRegisterSlowSyncHook(region);
+
+      // Set default log writer, no additional latency to any sync on the hlog.
+      checkSlowSync(log, table, -1, 10, false);
+
+      // Adds 200 ms of latency to any sync on the hlog. This should be more 
than sufficient to
+      // trigger slow sync warnings.
+      // Write some data.
+      // We need to write at least 5 times, but double it. We should only 
request
+      // a SLOW_SYNC roll once in the current interval.
+      checkSlowSync(log, table, 200, 10, true);
+
+      // Adds 5000 ms of latency to any sync on the hlog. This will trip the 
other threshold.
+      // Write some data. Should only take one sync.
+      checkSlowSync(log, table, 5000, 1, true);
+
+      // Set default log writer, no additional latency to any sync on the hlog.
+      checkSlowSync(log, table, -1, 10, false);
+    }
+  }
+
   /**
    * Tests that logs are rolled upon detecting datanode death Requires an HDFS 
jar with HDFS-826 &
    * syncFs() support (HDFS-200)

Reply via email to