This is an automated email from the ASF dual-hosted git repository. zhangduo pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push: new 0d0952261a8 HBASE-28190 Add slow sync log rolling test in TestAsyncLogRolling. (#5583) 0d0952261a8 is described below commit 0d0952261a8bc2a0ffbe1b6efc918afb5f197d88 Author: Fantasy-Jay <13631435...@163.com> AuthorDate: Tue Dec 19 11:56:11 2023 +0800 HBASE-28190 Add slow sync log rolling test in TestAsyncLogRolling. (#5583) Signed-off-by: Duo Zhang <zhang...@apache.org> (cherry picked from commit c668cbcb8acc25a059ee102557e02a959cbcada7) --- .../hbase/regionserver/wal/AbstractFSWAL.java | 4 + .../hadoop/hbase/regionserver/wal/FSHLog.java | 8 - .../regionserver/wal/AbstractTestLogRolling.java | 106 +++++++++- .../regionserver/wal/TestAsyncLogRolling.java | 65 ++++++ .../hadoop/hbase/regionserver/wal/TestFSHLog.java | 12 +- .../hbase/regionserver/wal/TestLogRolling.java | 234 +++++---------------- 6 files changed, 225 insertions(+), 204 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index c9ff80f69c6..0beffbd1dd7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -1395,6 +1395,10 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { WALSplitter.split(baseDir, p, archiveDir, fs, conf, WALFactory.getInstance(conf)); } + W getWriter() { + return this.writer; + } + private static void usage() { System.err.println("Usage: AbstractFSWAL <ARGS>"); System.err.println("Arguments:"); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 6afe2e06794..42f0235f67b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -1208,12 +1208,4 @@ public class FSHLog extends AbstractFSWAL<Writer> { } return new DatanodeInfo[0]; } - - Writer getWriter() { - return this.writer; - } - - void setWriter(Writer writer) { - this.writer = writer; - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 722f5ce5bc2..b06fbb51b74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -20,9 +20,13 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThan; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.io.IOException; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.hbase.HBaseTestingUtility; @@ -31,6 +35,7 @@ import org.apache.hadoop.hbase.MiniHBaseCluster; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.StartMiniClusterOption; import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Get; @@ -48,8 +53,10 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.junit.After; +import org.junit.AfterClass; import org.junit.Assert; import org.junit.Before; import org.junit.BeforeClass; @@ -59,6 +66,8 @@ import org.junit.rules.TestName; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * Test log deletion as logs are rolled. */ @@ -74,6 +83,10 @@ public abstract class AbstractTestLogRolling { protected static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); @Rule public final TestName name = new TestName(); + protected static int syncLatencyMillis; + private static int rowNum = 1; + private static final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); + protected static ScheduledExecutorService EXECUTOR; public AbstractTestLogRolling() { this.server = null; @@ -118,6 +131,17 @@ public abstract class AbstractTestLogRolling { // disable low replication check for log roller to get a more stable result // TestWALOpenAfterDNRollingStart will test this option. conf.setLong("hbase.regionserver.hlog.check.lowreplication.interval", 24L * 60 * 60 * 1000); + + // For slow sync threshold test: roll after 5 slow syncs in 10 seconds + conf.setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); + conf.setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); + // For slow sync threshold test: roll once after a sync above this threshold + conf.setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); + + // Slow sync executor. + EXECUTOR = Executors + .newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat("Slow-sync-%d") + .setDaemon(true).setUncaughtExceptionHandler(Threads.LOGGING_EXCEPTION_HANDLER).build()); } @Before @@ -139,6 +163,11 @@ public abstract class AbstractTestLogRolling { TEST_UTIL.shutdownMiniCluster(); } + @AfterClass + public static void tearDownAfterClass() { + EXECUTOR.shutdownNow(); + } + private void startAndWriteData() throws IOException, InterruptedException { this.server = cluster.getRegionServerThreads().get(0).getRegionServer(); @@ -158,6 +187,74 @@ public abstract class AbstractTestLogRolling { } } + private static void setSyncLatencyMillis(int latency) { + syncLatencyMillis = latency; + } + + protected final AbstractFSWAL<?> getWALAndRegisterSlowSyncHook(RegionInfo region) + throws IOException { + // Get a reference to the wal. + final AbstractFSWAL<?> log = (AbstractFSWAL<?>) server.getWAL(region); + + // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested + log.registerWALActionsListener(new WALActionsListener() { + @Override + public void logRollRequested(RollRequestReason reason) { + switch (reason) { + case SLOW_SYNC: + slowSyncHookCalled.lazySet(true); + break; + default: + break; + } + } + }); + return log; + } + + protected final void checkSlowSync(AbstractFSWAL<?> log, Table table, int slowSyncLatency, + int writeCount, boolean slowSync) throws Exception { + if (slowSyncLatency > 0) { + setSyncLatencyMillis(slowSyncLatency); + setSlowLogWriter(log.conf); + } else { + setDefaultLogWriter(log.conf); + } + + // Set up for test + log.rollWriter(true); + slowSyncHookCalled.set(false); + + final WALProvider.WriterBase oldWriter = log.getWriter(); + + // Write some data + for (int i = 0; i < writeCount; i++) { + writeData(table, rowNum++); + } + + if (slowSync) { + TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { + @Override + public boolean evaluate() throws Exception { + return log.getWriter() != oldWriter; + } + + @Override + public String explainFailure() throws Exception { + return "Waited too long for our test writer to get rolled out"; + } + }); + + assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + } else { + assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + } + } + + protected abstract void setSlowLogWriter(Configuration conf); + + protected abstract void setDefaultLogWriter(Configuration conf); + /** * Tests that log rolling doesn't hang when no data is written. */ @@ -239,12 +336,10 @@ public abstract class AbstractTestLogRolling { */ @Test public void testCompactionRecordDoesntBlockRolling() throws Exception { - Table table = null; // When the hbase:meta table can be opened, the region servers are running - Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); - try { - table = createTestTable(getName()); + try (Table t = TEST_UTIL.getConnection().getTable(TableName.META_TABLE_NAME); + Table table = createTestTable(getName())) { server = TEST_UTIL.getRSForFirstRegionInTable(table.getName()); HRegion region = server.getRegions(table.getName()).get(0); @@ -289,9 +384,6 @@ public abstract class AbstractTestLogRolling { log.rollWriter(); // Now 2nd WAL is deleted and 3rd is added. assertEquals("Should have 1 WALs at the end", 1, AbstractFSWALProvider.getNumRolledLogFiles(log)); - } finally { - if (t != null) t.close(); - if (table != null) table.close(); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java index 9dc27a693a7..804e93eb8f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestAsyncLogRolling.java @@ -20,10 +20,17 @@ package org.apache.hadoop.hbase.regionserver.wal; import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.client.Table; +import org.apache.hadoop.hbase.client.TableDescriptor; +import org.apache.hadoop.hbase.client.TableDescriptorBuilder; import org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper; import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.VerySlowRegionServerTests; @@ -36,6 +43,9 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.apache.hbase.thirdparty.io.netty.channel.Channel; +import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; + @Category({ VerySlowRegionServerTests.class, LargeTests.class }) public class TestAsyncLogRolling extends AbstractTestLogRolling { @@ -51,6 +61,61 @@ public class TestAsyncLogRolling extends AbstractTestLogRolling { AbstractTestLogRolling.setUpBeforeClass(); } + public static class SlowSyncLogWriter extends AsyncProtobufLogWriter { + + public SlowSyncLogWriter(EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) { + super(eventLoopGroup, channelClass); + } + + @Override + public CompletableFuture<Long> sync(boolean forceSync) { + CompletableFuture<Long> future = new CompletableFuture<>(); + super.sync(forceSync).whenCompleteAsync((lengthAfterFlush, error) -> { + EXECUTOR.schedule(() -> { + if (error != null) { + future.completeExceptionally(error); + } else { + future.complete(lengthAfterFlush); + } + }, syncLatencyMillis, TimeUnit.MILLISECONDS); + }); + return future; + } + } + + @Override + protected void setSlowLogWriter(Configuration conf) { + conf.set(AsyncFSWALProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName()); + } + + @Override + protected void setDefaultLogWriter(Configuration conf) { + conf.set(AsyncFSWALProvider.WRITER_IMPL, AsyncProtobufLogWriter.class.getName()); + } + + @Test + public void testSlowSyncLogRolling() throws Exception { + // Create the test table + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(desc); + try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) { + server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); + RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); + final AbstractFSWAL<?> log = getWALAndRegisterSlowSyncHook(region); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + + // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold. + // Write some data. Should only take one sync. + checkSlowSync(log, table, 5000, 1, true); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + } + } + @Test public void testLogRollOnDatanodeDeath() throws IOException, InterruptedException { dfsCluster.startDataNodes(TEST_UTIL.getConfiguration(), 3, true, null, null); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java index a8a88553141..0b317c2c74a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestFSHLog.java @@ -53,6 +53,7 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; @@ -196,9 +197,10 @@ public class TestFSHLog extends AbstractTestFSWAL { } final String name = this.name.getMethodName(); - try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(CONF), name, - HConstants.HREGION_OLDLOGDIR_NAME, CONF, null, true, null, null)) { - log.setWriter(new FailingWriter()); + Configuration conf = new Configuration(CONF); + conf.set(FSHLogProvider.WRITER_IMPL, FailingWriter.class.getName()); + try (CustomFSHLog log = new CustomFSHLog(FS, CommonFSUtils.getRootDir(conf), name, + HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null)) { Field ringBufferEventHandlerField = FSHLog.class.getDeclaredField("ringBufferEventHandler"); ringBufferEventHandlerField.setAccessible(true); FSHLog.RingBufferEventHandler ringBufferEventHandler = @@ -208,7 +210,7 @@ public class TestFSHLog extends AbstractTestFSWAL { try { SyncFuture future0 = log.publishSyncOnRingBuffer(); // Wait for the sync to be done. - Waiter.waitFor(CONF, TEST_TIMEOUT_MS, future0::isDone); + Waiter.waitFor(conf, TEST_TIMEOUT_MS, future0::isDone); // Publish another sync from the same thread, this should not overwrite the done sync. SyncFuture future1 = log.publishSyncOnRingBuffer(); assertFalse(future1.isDone()); @@ -217,7 +219,7 @@ public class TestFSHLog extends AbstractTestFSWAL { // Wait for the safe point to be reached. // With the deadlock in HBASE-25984, this is never possible, thus blocking the sync // pipeline. - Waiter.waitFor(CONF, TEST_TIMEOUT_MS, latch::isSafePointAttained); + Waiter.waitFor(conf, TEST_TIMEOUT_MS, latch::isSafePointAttained); } finally { // Force release the safe point, for the clean up. latch.releaseSafePoint(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java index c098140fbe9..8065d86cac1 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestLogRolling.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.Waiter; import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RegionInfo; @@ -56,10 +55,9 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.JVMClusterUtil; import org.apache.hadoop.hbase.util.RecoverLeaseFSUtils; import org.apache.hadoop.hbase.wal.AbstractFSWALProvider; +import org.apache.hadoop.hbase.wal.FSHLogProvider; import org.apache.hadoop.hbase.wal.WAL; -import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.wal.WALStreamReader; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.server.datanode.DataNode; @@ -98,192 +96,30 @@ public class TestLogRolling extends AbstractTestLogRolling { conf.setInt("hbase.regionserver.hlog.lowreplication.rolllimit", 3); conf.set(WALFactory.WAL_PROVIDER, "filesystem"); AbstractTestLogRolling.setUpBeforeClass(); - - // For slow sync threshold test: roll after 5 slow syncs in 10 seconds - TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_THRESHOLD, 5); - TEST_UTIL.getConfiguration().setInt(FSHLog.SLOW_SYNC_ROLL_INTERVAL_MS, 10 * 1000); - // For slow sync threshold test: roll once after a sync above this threshold - TEST_UTIL.getConfiguration().setInt(FSHLog.ROLL_ON_SYNC_TIME_MS, 5000); } - @Test - public void testSlowSyncLogRolling() throws Exception { - // Create the test table - TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) - .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); - admin.createTable(desc); - Table table = TEST_UTIL.getConnection().getTable(desc.getTableName()); - int row = 1; - try { - // Get a reference to the FSHLog - server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); - RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); - final FSHLog log = (FSHLog) server.getWAL(region); - - // Register a WALActionsListener to observe if a SLOW_SYNC roll is requested - - final AtomicBoolean slowSyncHookCalled = new AtomicBoolean(); - log.registerWALActionsListener(new WALActionsListener() { - @Override - public void logRollRequested(WALActionsListener.RollRequestReason reason) { - switch (reason) { - case SLOW_SYNC: - slowSyncHookCalled.lazySet(true); - break; - default: - break; - } - } - }); - - // Write some data - - for (int i = 0; i < 10; i++) { - writeData(table, row++); - } - - assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set up for test - slowSyncHookCalled.set(false); - - // Wrap the current writer with the anonymous class below that adds 200 ms of - // latency to any sync on the hlog. This should be more than sufficient to trigger - // slow sync warnings. - final Writer oldWriter1 = log.getWriter(); - final Writer newWriter1 = new Writer() { - @Override - public void close() throws IOException { - oldWriter1.close(); - } - - @Override - public void sync(boolean forceSync) throws IOException { - try { - Thread.sleep(200); - } catch (InterruptedException e) { - InterruptedIOException ex = new InterruptedIOException(); - ex.initCause(e); - throw ex; - } - oldWriter1.sync(forceSync); - } - - @Override - public void append(Entry entry) throws IOException { - oldWriter1.append(entry); - } - - @Override - public long getLength() { - return oldWriter1.getLength(); - } - - @Override - public long getSyncedLength() { - return oldWriter1.getSyncedLength(); - } - }; - log.setWriter(newWriter1); - - // Write some data. - // We need to write at least 5 times, but double it. We should only request - // a SLOW_SYNC roll once in the current interval. - for (int i = 0; i < 10; i++) { - writeData(table, row++); - } - - // Wait for our wait injecting writer to get rolled out, as needed. - - TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - return log.getWriter() != newWriter1; - } - - @Override - public String explainFailure() throws Exception { - return "Waited too long for our test writer to get rolled out"; - } - }); - - assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set up for test - slowSyncHookCalled.set(false); - - // Wrap the current writer with the anonymous class below that adds 5000 ms of - // latency to any sync on the hlog. - // This will trip the other threshold. - final Writer oldWriter2 = (Writer) log.getWriter(); - final Writer newWriter2 = new Writer() { - @Override - public void close() throws IOException { - oldWriter2.close(); - } - - @Override - public void sync(boolean forceSync) throws IOException { - try { - Thread.sleep(5000); - } catch (InterruptedException e) { - InterruptedIOException ex = new InterruptedIOException(); - ex.initCause(e); - throw ex; - } - oldWriter2.sync(forceSync); - } - - @Override - public void append(Entry entry) throws IOException { - oldWriter2.append(entry); - } - - @Override - public long getLength() { - return oldWriter2.getLength(); - } - - @Override - public long getSyncedLength() { - return oldWriter2.getSyncedLength(); - } - }; - log.setWriter(newWriter2); - - // Write some data. Should only take one sync. - - writeData(table, row++); - - // Wait for our wait injecting writer to get rolled out, as needed. - - TEST_UTIL.waitFor(10000, 100, new Waiter.ExplainingPredicate<Exception>() { - @Override - public boolean evaluate() throws Exception { - return log.getWriter() != newWriter2; - } - - @Override - public String explainFailure() throws Exception { - return "Waited too long for our test writer to get rolled out"; - } - }); - - assertTrue("Should have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); - - // Set up for test - slowSyncHookCalled.set(false); - - // Write some data - for (int i = 0; i < 10; i++) { - writeData(table, row++); + public static class SlowSyncLogWriter extends ProtobufLogWriter { + @Override + public void sync(boolean forceSync) throws IOException { + try { + Thread.sleep(syncLatencyMillis); + } catch (InterruptedException e) { + InterruptedIOException ex = new InterruptedIOException(); + ex.initCause(e); + throw ex; } + super.sync(forceSync); + } + } - assertFalse("Should not have triggered log roll due to SLOW_SYNC", slowSyncHookCalled.get()); + @Override + protected void setSlowLogWriter(Configuration conf) { + conf.set(FSHLogProvider.WRITER_IMPL, SlowSyncLogWriter.class.getName()); + } - } finally { - table.close(); - } + @Override + protected void setDefaultLogWriter(Configuration conf) { + conf.set(FSHLogProvider.WRITER_IMPL, ProtobufLogWriter.class.getName()); } void batchWriteAndWait(Table table, final FSHLog log, int start, boolean expect, int timeout) @@ -313,6 +149,36 @@ public class TestLogRolling extends AbstractTestLogRolling { } } + @Test + public void testSlowSyncLogRolling() throws Exception { + // Create the test table + TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf(getName())) + .setColumnFamily(ColumnFamilyDescriptorBuilder.of(HConstants.CATALOG_FAMILY)).build(); + admin.createTable(desc); + try (Table table = TEST_UTIL.getConnection().getTable(desc.getTableName())) { + server = TEST_UTIL.getRSForFirstRegionInTable(desc.getTableName()); + RegionInfo region = server.getRegions(desc.getTableName()).get(0).getRegionInfo(); + final AbstractFSWAL<?> log = getWALAndRegisterSlowSyncHook(region); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + + // Adds 200 ms of latency to any sync on the hlog. This should be more than sufficient to + // trigger slow sync warnings. + // Write some data. + // We need to write at least 5 times, but double it. We should only request + // a SLOW_SYNC roll once in the current interval. + checkSlowSync(log, table, 200, 10, true); + + // Adds 5000 ms of latency to any sync on the hlog. This will trip the other threshold. + // Write some data. Should only take one sync. + checkSlowSync(log, table, 5000, 1, true); + + // Set default log writer, no additional latency to any sync on the hlog. + checkSlowSync(log, table, -1, 10, false); + } + } + /** * Tests that logs are rolled upon detecting datanode death Requires an HDFS jar with HDFS-826 & * syncFs() support (HDFS-200)