This is an automated email from the ASF dual-hosted git repository. ramkrishna pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push: new accd975 HBASE-25065 WAL archival to be done by a separate thread (#2501) accd975 is described below commit accd9750aa217c40e9db641c53905b8f4bb7e66d Author: ramkrish86 <ram_krish...@hotmail.com> AuthorDate: Sun Oct 11 10:46:06 2020 +0530 HBASE-25065 WAL archival to be done by a separate thread (#2501) * HBASE-25065 WAL archival can be batched/throttled and also done by a separate thread * Fix checkstyle issues * Address review comments * checkstyle comments * Addressing final review comments Signed-off-by: Michael Stack <st...@apache.org> --- .../hadoop/hbase/master/region/MasterRegion.java | 2 +- .../hadoop/hbase/regionserver/HRegionServer.java | 2 +- .../hbase/regionserver/wal/AbstractFSWAL.java | 69 ++++++++++++++++++++-- .../hadoop/hbase/regionserver/wal/AsyncFSWAL.java | 15 ++++- .../hadoop/hbase/regionserver/wal/FSHLog.java | 30 ++++++++-- .../hadoop/hbase/wal/AbstractFSWALProvider.java | 8 ++- .../hadoop/hbase/wal/AsyncFSWALProvider.java | 8 +-- .../hadoop/hbase/wal/DisabledWALProvider.java | 5 +- .../apache/hadoop/hbase/wal/FSHLogProvider.java | 6 +- .../hadoop/hbase/wal/RegionGroupingProvider.java | 9 ++- .../hbase/wal/SyncReplicationWALProvider.java | 7 ++- .../org/apache/hadoop/hbase/wal/WALFactory.java | 19 +++--- .../org/apache/hadoop/hbase/wal/WALProvider.java | 5 +- .../regionserver/TestFailedAppendAndSync.java | 44 +++++++++++++- .../regionserver/wal/AbstractTestLogRolling.java | 7 ++- .../apache/hadoop/hbase/wal/IOTestProvider.java | 4 +- .../apache/hadoop/hbase/wal/TestWALFactory.java | 2 +- 17 files changed, 199 insertions(+), 43 deletions(-) diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java index 81da59d..688a549 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/region/MasterRegion.java @@ -301,7 +301,7 @@ public final class MasterRegion { params.archivedWalSuffix(), params.rollPeriodMs(), params.flushSize()); walRoller.start(); - WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), false); + WALFactory walFactory = new WALFactory(conf, server.getServerName().toString(), server, false); Path tableDir = CommonFSUtils.getTableDir(rootDir, td.getTableName()); HRegion region; if (fs.exists(tableDir)) { diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index d6eb45f..d51eab4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -1906,7 +1906,7 @@ public class HRegionServer extends Thread implements boolean isMasterNoTableOrSystemTableOnly = this instanceof HMaster && !LoadBalancer.isMasterCanHostUserRegions(conf); WALFactory factory = - new WALFactory(conf, serverName.toString(), !isMasterNoTableOrSystemTableOnly); + new WALFactory(conf, serverName.toString(), this, !isMasterNoTableOrSystemTableOnly); if (!isMasterNoTableOrSystemTableOnly) { // TODO Replication make assumptions here based on the default filesystem impl Path oldLogDir = new Path(walRootDir, HConstants.HREGION_OLDLOGDIR_NAME); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java index d2c624a..ac99ea6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -41,6 +41,8 @@ import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -53,6 +55,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; @@ -84,8 +87,12 @@ import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + + + + /** * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one @@ -185,6 +192,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { */ protected final Configuration conf; + protected final Abortable abortable; + /** Listeners that are called on WAL events. */ protected final List<WALActionsListener> listeners = new CopyOnWriteArrayList<>(); @@ -329,6 +338,11 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { protected final AtomicBoolean rollRequested = new AtomicBoolean(false); + private final ExecutorService logArchiveExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("WAL-Archiver-%d").build()); + + private final int archiveRetries; + public long getFilenum() { return this.filenum.get(); } @@ -380,10 +394,19 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, final String suffix) throws FailedLogCloseException, IOException { + this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + } + + protected AbstractFSWAL(final FileSystem fs, final Abortable abortable, final Path rootDir, + final String logDir, final String archiveDir, final Configuration conf, + final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, + final String suffix) + throws FailedLogCloseException, IOException { this.fs = fs; this.walDir = new Path(rootDir, logDir); this.walArchiveDir = new Path(rootDir, archiveDir); this.conf = conf; + this.abortable = abortable; if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { throw new IOException("Unable to mkdir " + walDir); @@ -482,6 +505,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { this.walTooOldNs = TimeUnit.SECONDS.toNanos(conf.getInt( SURVIVED_TOO_LONG_SEC_KEY, SURVIVED_TOO_LONG_SEC_DEFAULT)); this.useHsync = conf.getBoolean(HRegion.WAL_HSYNC_CONF_KEY, HRegion.DEFAULT_WAL_HSYNC); + archiveRetries = this.conf.getInt("hbase.regionserver.walroll.archive.retries", 0); + } /** @@ -715,11 +740,39 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { regionsBlockingThisWal.clear(); } } + if (logsToArchive != null) { - for (Pair<Path, Long> logAndSize : logsToArchive) { - this.totalLogSize.addAndGet(-logAndSize.getSecond()); - archiveLogFile(logAndSize.getFirst()); - this.walFile2Props.remove(logAndSize.getFirst()); + final List<Pair<Path, Long>> localLogsToArchive = logsToArchive; + // make it async + for (Pair<Path, Long> log : localLogsToArchive) { + logArchiveExecutor.execute(() -> { + archive(log); + }); + this.walFile2Props.remove(log.getFirst()); + } + } + } + + protected void archive(final Pair<Path, Long> log) { + int retry = 1; + while (true) { + try { + archiveLogFile(log.getFirst()); + totalLogSize.addAndGet(-log.getSecond()); + // successful + break; + } catch (Throwable e) { + if (retry > archiveRetries) { + LOG.error("Failed log archiving for the log {},", log.getFirst(), e); + if (this.abortable != null) { + this.abortable.abort("Failed log archiving", e); + break; + } + } else { + LOG.error("Log archiving failed for the log {} - attempt {}", log.getFirst(), retry, + e); + } + retry++; } } } @@ -732,7 +785,8 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { return new Path(archiveDir, p.getName()); } - private void archiveLogFile(final Path p) throws IOException { + @VisibleForTesting + protected void archiveLogFile(final Path p) throws IOException { Path newPath = getWALArchivePath(this.walArchiveDir, p); // Tell our listeners that a log is going to be archived. if (!this.listeners.isEmpty()) { @@ -907,6 +961,9 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL { rollWriterLock.lock(); try { doShutdown(); + if (logArchiveExecutor != null) { + logArchiveExecutor.shutdownNow(); + } } finally { rollWriterLock.unlock(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java index a40e503..3424460 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -44,9 +44,11 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.io.asyncfs.AsyncFSOutput; @@ -60,7 +62,6 @@ import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hbase.thirdparty.io.netty.channel.Channel; @@ -68,6 +69,7 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoop; import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup; import org.apache.hbase.thirdparty.io.netty.util.concurrent.SingleThreadEventExecutor; + /** * An asynchronous implementation of FSWAL. * <p> @@ -206,7 +208,16 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { - super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix, + eventLoopGroup, channelClass); + } + + public AsyncFSWAL(FileSystem fs, Abortable abortable, Path rootDir, String logDir, + String archiveDir, Configuration conf, List<WALActionsListener> listeners, + boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup, + Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException { + super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, + suffix); this.eventLoopGroup = eventLoopGroup; this.channelClass = channelClass; Supplier<Boolean> hasConsumerTask; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java index 001be00..fe910aa 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/FSHLog.java @@ -40,10 +40,12 @@ import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.trace.TraceUtil; @@ -62,10 +64,10 @@ import org.apache.htrace.core.TraceScope; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; + /** * The default implementation of FSWAL. */ @@ -168,7 +170,7 @@ public class FSHLog extends AbstractFSWAL<Writer> { private final int waitOnShutdownInSeconds; private final ExecutorService closeExecutor = Executors.newCachedThreadPool( - new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); + new ThreadFactoryBuilder().setDaemon(true).setNameFormat("Close-WAL-Writer-%d").build()); /** * Exception handler to pass the disruptor ringbuffer. Same as native implementation only it logs @@ -208,11 +210,25 @@ public class FSHLog extends AbstractFSWAL<Writer> { this(fs, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, null); } + @VisibleForTesting + public FSHLog(final FileSystem fs, Abortable abortable, final Path root, final String logDir, + final Configuration conf) throws IOException { + this(fs, abortable, root, logDir, HConstants.HREGION_OLDLOGDIR_NAME, conf, null, true, null, + null); + } + + public FSHLog(final FileSystem fs, final Path rootDir, final String logDir, + final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, + final boolean failIfWALExists, final String prefix, final String suffix) throws IOException { + this(fs, null, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + } + /** * Create an edit log at the given <code>dir</code> location. You should never have to load an * existing log. If there is a log at startup, it should have already been processed and deleted * by the time the WAL object is started up. * @param fs filesystem handle + * @param abortable Abortable - the server here * @param rootDir path to where logs and oldlogs * @param logDir dir where wals are stored * @param archiveDir dir where wals are archived @@ -226,10 +242,12 @@ public class FSHLog extends AbstractFSWAL<Writer> { * @param suffix will be url encoded. null is treated as empty. non-empty must start with * {@link org.apache.hadoop.hbase.wal.AbstractFSWALProvider#WAL_FILE_NAME_DELIMITER} */ - public FSHLog(final FileSystem fs, final Path rootDir, final String logDir, - final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, - final boolean failIfWALExists, final String prefix, final String suffix) throws IOException { - super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + public FSHLog(final FileSystem fs, final Abortable abortable, final Path rootDir, + final String logDir, final String archiveDir, final Configuration conf, + final List<WALActionsListener> listeners, final boolean failIfWALExists, final String prefix, + final String suffix) throws IOException { + super(fs, abortable, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, + suffix); this.minTolerableReplication = conf.getInt(TOLERABLE_LOW_REPLICATION, CommonFSUtils.getDefaultReplication(fs, this.walDir)); this.lowReplicationRollLimit = conf.getInt(LOW_REPLICATION_ROLL_LIMIT, DEFAULT_LOW_REPLICATION_ROLL_LIMIT); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java index e7bdb0b..84c94e6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractFSWALProvider.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.wal; + import java.io.FileNotFoundException; import java.io.IOException; import java.util.ArrayList; @@ -29,10 +30,12 @@ import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.regex.Matcher; import java.util.regex.Pattern; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.client.RegionInfo; @@ -88,6 +91,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen protected AtomicBoolean initialized = new AtomicBoolean(false); // for default wal provider, logPrefix won't change protected String logPrefix; + protected Abortable abortable; /** * We use walCreateLock to prevent wal recreation in different threads, and also prevent getWALs @@ -102,7 +106,8 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen * null */ @Override - public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) + throws IOException { if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("WALProvider.init should only be called once."); } @@ -119,6 +124,7 @@ public abstract class AbstractFSWALProvider<T extends AbstractFSWAL<?>> implemen } } logPrefix = sb.toString(); + this.abortable = abortable; doInit(conf); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java index 062b368..3a2ffa7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AsyncFSWALProvider.java @@ -65,11 +65,11 @@ public class AsyncFSWALProvider extends AbstractFSWALProvider<AsyncFSWAL> { @Override protected AsyncFSWAL createWAL() throws IOException { - return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), - getWALDirectoryName(factory.factoryId), + return new AsyncFSWAL(CommonFSUtils.getWALFileSystem(conf), this.abortable, + CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, - META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, - eventLoopGroup, channelClass); + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null, eventLoopGroup, + channelClass); } @Override diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java index 0ff2195..6e5a053 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/DisabledWALProvider.java @@ -25,8 +25,10 @@ import java.util.OptionalLong; import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.PrivateCellUtil; @@ -55,7 +57,8 @@ class DisabledWALProvider implements WALProvider { WAL disabled; @Override - public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) + throws IOException { if (null != disabled) { throw new IllegalStateException("WALProvider.init should only be called once."); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java index 3b91c24..e64d70f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/FSHLogProvider.java @@ -67,7 +67,7 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> { * Public because of FSHLog. Should be package-private */ public static Writer createWriter(final Configuration conf, final FileSystem fs, final Path path, - final boolean overwritable, long blocksize) throws IOException { + final boolean overwritable, long blocksize) throws IOException { // Configuration already does caching for the Class lookup. Class<? extends Writer> logWriterClass = conf.getClass("hbase.regionserver.hlog.writer.impl", ProtobufLogWriter.class, @@ -101,8 +101,8 @@ public class FSHLogProvider extends AbstractFSWALProvider<FSHLog> { @Override protected FSHLog createWAL() throws IOException { - return new FSHLog(CommonFSUtils.getWALFileSystem(conf), CommonFSUtils.getWALRootDir(conf), - getWALDirectoryName(factory.factoryId), + return new FSHLog(CommonFSUtils.getWALFileSystem(conf), abortable, + CommonFSUtils.getWALRootDir(conf), getWALDirectoryName(factory.factoryId), getWALArchiveDirectoryName(conf, factory.factoryId), conf, listeners, true, logPrefix, META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : null); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java index 764d3d5..20d043b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RegionGroupingProvider.java @@ -28,7 +28,9 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.stream.Collectors; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; @@ -137,14 +139,17 @@ public class RegionGroupingProvider implements WALProvider { private List<WALActionsListener> listeners = new ArrayList<>(); private String providerId; private Class<? extends WALProvider> providerClass; + private Abortable abortable; @Override - public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) + throws IOException { if (null != strategy) { throw new IllegalStateException("WALProvider.init should only be called once."); } this.conf = conf; this.factory = factory; + this.abortable = abortable; if (META_WAL_PROVIDER_ID.equals(providerId)) { // do not change the provider id if it is for meta @@ -171,7 +176,7 @@ public class RegionGroupingProvider implements WALProvider { private WALProvider createProvider(String group) throws IOException { WALProvider provider = WALFactory.createProvider(providerClass); provider.init(factory, conf, - META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group); + META_WAL_PROVIDER_ID.equals(providerId) ? META_WAL_PROVIDER_ID : group, this.abortable); provider.addWALActionsListener(new MetricsWAL()); return provider; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java index 9859c20..001e1a8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java @@ -35,7 +35,9 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.Stream; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.DualAsyncFSWAL; @@ -108,11 +110,12 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen } @Override - public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) + throws IOException { if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("WALProvider.init should only be called once."); } - provider.init(factory, conf, providerId); + provider.init(factory, conf, providerId, abortable); this.conf = conf; this.factory = factory; Pair<EventLoopGroup, Class<? extends Channel>> eventLoopGroupAndChannelClass = diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java index 26b8727..6a5feb0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java @@ -21,9 +21,11 @@ import java.io.IOException; import java.io.InterruptedIOException; import java.util.List; import java.util.concurrent.atomic.AtomicReference; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL; import org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader; @@ -35,7 +37,6 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; /** @@ -86,6 +87,7 @@ public class WALFactory { public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled"; final String factoryId; + final Abortable abortable; private final WALProvider provider; // The meta updates are written to a different wal. If this // regionserver holds meta regions, then this ref will be non-null. @@ -119,6 +121,7 @@ public class WALFactory { // this instance can't create wals, just reader/writers. provider = null; factoryId = SINGLETON_ID; + this.abortable = null; } @VisibleForTesting @@ -175,7 +178,7 @@ public class WALFactory { public WALFactory(Configuration conf, String factoryId) throws IOException { // default enableSyncReplicationWALProvider is true, only disable SyncReplicationWALProvider // for HMaster or HRegionServer which take system table only. See HBASE-19999 - this(conf, factoryId, true); + this(conf, factoryId, null, true); } /** @@ -183,11 +186,12 @@ public class WALFactory { * instances. * @param factoryId a unique identifier for this factory. used i.e. by filesystem implementations * to make a directory + * @param abortable the server associated with this WAL file * @param enableSyncReplicationWALProvider whether wrap the wal provider to a * {@link SyncReplicationWALProvider} */ - public WALFactory(Configuration conf, String factoryId, boolean enableSyncReplicationWALProvider) - throws IOException { + public WALFactory(Configuration conf, String factoryId, Abortable abortable, + boolean enableSyncReplicationWALProvider) throws IOException { // until we've moved reader/writer construction down into providers, this initialization must // happen prior to provider initialization, in case they need to instantiate a reader/writer. timeoutMillis = conf.getInt("hbase.hlog.open.timeout", 300000); @@ -196,20 +200,21 @@ public class WALFactory { AbstractFSWALProvider.Reader.class); this.conf = conf; this.factoryId = factoryId; + this.abortable = abortable; // end required early initialization if (conf.getBoolean(WAL_ENABLED, true)) { WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); if (enableSyncReplicationWALProvider) { provider = new SyncReplicationWALProvider(provider); } - provider.init(this, conf, null); + provider.init(this, conf, null, this.abortable); provider.addWALActionsListener(new MetricsWAL()); this.provider = provider; } else { // special handling of existing configuration behavior. LOG.warn("Running with WAL disabled."); provider = new DisabledWALProvider(); - provider.init(this, conf, factoryId); + provider.init(this, conf, factoryId, null); } } @@ -274,7 +279,7 @@ public class WALFactory { clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER)); } provider = createProvider(clz); - provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID); + provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable); provider.addWALActionsListener(new MetricsWAL()); if (metaProvider.compareAndSet(null, provider)) { return provider; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java index c3bd149..01c1d11 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java @@ -23,7 +23,9 @@ import java.io.IOException; import java.util.List; import java.util.OptionalLong; import java.util.concurrent.CompletableFuture; + import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.regionserver.wal.AsyncFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener; @@ -46,7 +48,8 @@ public interface WALProvider { * @param conf may not be null * @param providerId differentiate between providers from one factory. may be null */ - void init(WALFactory factory, Configuration conf, String providerId) throws IOException; + void init(WALFactory factory, Configuration conf, String providerId, Abortable server) + throws IOException; /** * @param region the region which we want to get a WAL for it. Could be null. diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java index a9ce548..fdf96da 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestFailedAppendAndSync.java @@ -26,6 +26,7 @@ import java.io.IOException; import java.util.List; import java.util.Map; import java.util.concurrent.atomic.AtomicLong; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -33,6 +34,7 @@ import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseClassTestRule; import org.apache.hadoop.hbase.HBaseTestingUtility; import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Put; @@ -41,6 +43,7 @@ import org.apache.hadoop.hbase.regionserver.wal.FailedLogCloseException; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManagerTestHelper; +import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALProvider.Writer; @@ -107,11 +110,13 @@ public class TestFailedAppendAndSync { class DodgyFSLog extends FSHLog { volatile boolean throwSyncException = false; volatile boolean throwAppendException = false; + volatile boolean throwArchiveException = false; + final AtomicLong rolls = new AtomicLong(0); - public DodgyFSLog(FileSystem fs, Path root, String logDir, Configuration conf) + public DodgyFSLog(FileSystem fs, Server server, Path root, String logDir, Configuration conf) throws IOException { - super(fs, root, logDir, conf); + super(fs, server, root, logDir, conf); } @Override @@ -123,6 +128,18 @@ public class TestFailedAppendAndSync { } @Override + protected void archiveLogFile(Path p) throws IOException { + if (throwArchiveException) { + throw new IOException("throw archival exception"); + } + } + + @Override + protected void archive(Pair<Path, Long> localLogsToArchive) { + super.archive(localLogsToArchive); + } + + @Override protected Writer createWriterInstance(Path path) throws IOException { final Writer w = super.createWriterInstance(path); return new Writer() { @@ -176,7 +193,7 @@ public class TestFailedAppendAndSync { // the test. FileSystem fs = FileSystem.get(CONF); Path rootDir = new Path(dir + getName()); - DodgyFSLog dodgyWAL = new DodgyFSLog(fs, rootDir, getName(), CONF); + DodgyFSLog dodgyWAL = new DodgyFSLog(fs, (Server)services, rootDir, getName(), CONF); dodgyWAL.init(); LogRoller logRoller = new LogRoller(services); logRoller.addWAL(dodgyWAL); @@ -256,6 +273,27 @@ public class TestFailedAppendAndSync { Threads.sleep(1); } } + + try { + dodgyWAL.throwAppendException = false; + dodgyWAL.throwSyncException = false; + dodgyWAL.throwArchiveException = true; + Pair<Path, Long> pair = new Pair<Path, Long>(); + pair.setFirst(new Path("/a/b/")); + pair.setSecond(100L); + dodgyWAL.archive(pair); + } catch (Throwable ioe) { + } + while (true) { + try { + // one more abort needs to be called + Mockito.verify(services, Mockito.atLeast(2)).abort(Mockito.anyString(), + (Throwable) Mockito.anyObject()); + break; + } catch (WantedButNotInvoked t) { + Threads.sleep(1); + } + } } finally { // To stop logRoller, its server has to say it is stopped. Mockito.when(services.isStopped()).thenReturn(true); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java index 4c19aa0..6e2059d 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestLogRolling.java @@ -175,10 +175,15 @@ public abstract class AbstractTestLogRolling { } } - private void assertLogFileSize(WAL log) { + private void assertLogFileSize(WAL log) throws InterruptedException { if (AbstractFSWALProvider.getNumRolledLogFiles(log) > 0) { assertTrue(AbstractFSWALProvider.getLogFileSize(log) > 0); } else { + for (int i = 0; i < 10; i++) { + if (AbstractFSWALProvider.getLogFileSize(log) != 0) { + Thread.sleep(10); + } + } assertEquals(0, AbstractFSWALProvider.getLogFileSize(log)); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java index d062c77..ecbd043 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/IOTestProvider.java @@ -32,6 +32,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.RegionInfo; // imports for things that haven't moved from regionserver.wal yet. @@ -99,7 +100,8 @@ public class IOTestProvider implements WALProvider { * null */ @Override - public void init(WALFactory factory, Configuration conf, String providerId) throws IOException { + public void init(WALFactory factory, Configuration conf, String providerId, Abortable abortable) + throws IOException { if (!initialized.compareAndSet(false, true)) { throw new IllegalStateException("WALProvider.init should only be called once."); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java index a899bdc..656932b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java @@ -687,7 +687,7 @@ public class TestWALFactory { assertEquals(wrappedWALProvider.getClass(), walFactory.getMetaProvider().getClass()); // if providers are not set and do not enable SyncReplicationWALProvider - walFactory = new WALFactory(conf, this.currentServername.toString(), false); + walFactory = new WALFactory(conf, this.currentServername.toString(), null, false); assertEquals(walFactory.getWALProvider().getClass(), walFactory.getMetaProvider().getClass()); }