HBASE-15265 Implement an asynchronous FSHLog
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/c96b642f Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/c96b642f Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/c96b642f Branch: refs/heads/master Commit: c96b642f15ddc24ad1f52616a4fb74feb51483c4 Parents: 1a9837a Author: zhangduo <zhang...@apache.org> Authored: Sat Mar 26 15:40:45 2016 +0800 Committer: zhangduo <zhang...@apache.org> Committed: Sat Mar 26 15:41:27 2016 +0800 ---------------------------------------------------------------------- .../hadoop/hbase/regionserver/LogRoller.java | 3 + .../hbase/regionserver/wal/AbstractFSWAL.java | 910 +++++++++++ .../wal/AbstractProtobufLogWriter.java | 170 ++ .../hbase/regionserver/wal/AsyncFSWAL.java | 732 +++++++++ .../wal/AsyncProtobufLogWriter.java | 203 +++ .../hadoop/hbase/regionserver/wal/FSHLog.java | 1518 ++++-------------- .../hbase/regionserver/wal/FSWALEntry.java | 24 +- .../regionserver/wal/ProtobufLogWriter.java | 148 +- .../hbase/regionserver/wal/SyncFuture.java | 137 +- .../hbase/regionserver/wal/WriterBase.java | 62 - .../FanOutOneBlockAsyncDFSOutputHelper.java | 40 +- .../hadoop/hbase/wal/AbstractFSWALProvider.java | 361 +++++ .../hadoop/hbase/wal/AsyncFSWALProvider.java | 73 + .../hadoop/hbase/wal/DefaultWALProvider.java | 348 +--- .../hadoop/hbase/wal/DisabledWALProvider.java | 4 + .../java/org/apache/hadoop/hbase/wal/WAL.java | 9 +- .../org/apache/hadoop/hbase/wal/WALFactory.java | 6 +- .../apache/hadoop/hbase/wal/WALProvider.java | 10 +- .../wal/AbstractTestLogRolling.java | 332 ++++ .../wal/AbstractTestProtobufLog.java | 209 +++ .../regionserver/wal/SequenceFileLogWriter.java | 22 +- .../wal/TestAsyncLogRollPeriod.java | 36 + .../regionserver/wal/TestAsyncLogRolling.java | 65 + .../regionserver/wal/TestAsyncProtobufLog.java | 82 + .../regionserver/wal/TestAsyncWALReplay.java | 36 + .../wal/TestAsyncWALReplayCompressed.java | 38 + .../hbase/regionserver/wal/TestDurability.java | 29 +- .../regionserver/wal/TestLogRollAbort.java | 6 +- .../regionserver/wal/TestLogRollPeriod.java | 12 +- .../hbase/regionserver/wal/TestLogRolling.java | 404 +---- .../hbase/regionserver/wal/TestProtobufLog.java | 187 +-- .../wal/TestWALReplayCompressed.java | 1 - 32 files changed, 3918 insertions(+), 2299 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java index 40edc05..fa217ee 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/LogRoller.java @@ -167,6 +167,9 @@ public class LogRoller extends HasThread { } } } + for (WAL wal : walNeedsRoll.keySet()) { + wal.logRollerExited(); + } LOG.info("LogRoller exiting."); } http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java new file mode 100644 index 0000000..f189ff1 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java @@ -0,0 +1,910 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.apache.hadoop.hbase.wal.AbstractFSWALProvider.WAL_FILE_NAME_DELIMITER; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryUsage; +import java.net.URLEncoder; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantLock; + +import com.google.common.annotations.VisibleForTesting; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.util.HeapMemorySizeUtil; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.DrainBarrier; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.wal.WAL; +import org.apache.hadoop.hbase.wal.WALFactory; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.util.StringUtils; +import org.apache.htrace.NullScope; +import org.apache.htrace.Span; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +/** + * Implementation of {@link WAL} to go against {@link FileSystem}; i.e. keep WALs in HDFS. Only one + * WAL is ever being written at a time. When a WAL hits a configured maximum size, it is rolled. + * This is done internal to the implementation. + * <p> + * As data is flushed from the MemStore to other on-disk structures (files sorted by key, hfiles), a + * WAL becomes obsolete. We can let go of all the log edits/entries for a given HRegion-sequence id. + * A bunch of work in the below is done keeping account of these region sequence ids -- what is + * flushed out to hfiles, and what is yet in WAL and in memory only. + * <p> + * It is only practical to delete entire files. Thus, we delete an entire on-disk file + * <code>F</code> when all of the edits in <code>F</code> have a log-sequence-id that's older + * (smaller) than the most-recent flush. + * <p> + * To read an WAL, call + * {@link WALFactory#createReader(org.apache.hadoop.fs.FileSystem, org.apache.hadoop.fs.Path)}. * + * <h2>Failure Semantic</h2> If an exception on append or sync, roll the WAL because the current WAL + * is now a lame duck; any more appends or syncs will fail also with the same original exception. If + * we have made successful appends to the WAL and we then are unable to sync them, our current + * semantic is to return error to the client that the appends failed but also to abort the current + * context, usually the hosting server. We need to replay the WALs. <br> + * TODO: Change this semantic. A roll of WAL may be sufficient as long as we have flagged client + * that the append failed. <br> + * TODO: replication may pick up these last edits though they have been marked as failed append + * (Need to keep our own file lengths, not rely on HDFS). + */ +@InterfaceAudience.Private +public abstract class AbstractFSWAL<W> implements WAL { + + private static final Log LOG = LogFactory.getLog(AbstractFSWAL.class); + + protected static final int DEFAULT_SLOW_SYNC_TIME_MS = 100; // in ms + + /** + * file system instance + */ + protected final FileSystem fs; + + /** + * WAL directory, where all WAL files would be placed. + */ + protected final Path walDir; + + /** + * dir path where old logs are kept. + */ + protected final Path walArchiveDir; + + /** + * Matches just those wal files that belong to this wal instance. + */ + protected final PathFilter ourFiles; + + /** + * Prefix of a WAL file, usually the region server name it is hosted on. + */ + protected final String walFilePrefix; + + /** + * Suffix included on generated wal file names + */ + protected final String walFileSuffix; + + /** + * Prefix used when checking for wal membership. + */ + protected final String prefixPathStr; + + protected final WALCoprocessorHost coprocessorHost; + + /** + * conf object + */ + protected final Configuration conf; + + /** Listeners that are called on WAL events. */ + protected final List<WALActionsListener> listeners = + new CopyOnWriteArrayList<WALActionsListener>(); + + /** + * Class that does accounting of sequenceids in WAL subsystem. Holds oldest outstanding sequence + * id as yet not flushed as well as the most recent edit sequence id appended to the WAL. Has + * facility for answering questions such as "Is it safe to GC a WAL?". + */ + protected final SequenceIdAccounting sequenceIdAccounting = new SequenceIdAccounting(); + + /** The barrier used to ensure that close() waits for all log rolls and flushes to finish. */ + protected final DrainBarrier closeBarrier = new DrainBarrier(); + + protected final int slowSyncNs; + + // If > than this size, roll the log. + protected final long logrollsize; + + /* + * If more than this many logs, force flush of oldest region to oldest edit goes to disk. If too + * many and we crash, then will take forever replaying. Keep the number of logs tidy. + */ + protected final int maxLogs; + + /** + * This lock makes sure only one log roll runs at a time. Should not be taken while any other lock + * is held. We don't just use synchronized because that results in bogus and tedious findbugs + * warning when it thinks synchronized controls writer thread safety. It is held when we are + * actually rolling the log. It is checked when we are looking to see if we should roll the log or + * not. + */ + protected final ReentrantLock rollWriterLock = new ReentrantLock(true); + + // The timestamp (in ms) when the log file was created. + protected final AtomicLong filenum = new AtomicLong(-1); + + // Number of transactions in the current Wal. + protected final AtomicInteger numEntries = new AtomicInteger(0); + + /** + * The highest known outstanding unsync'd WALEdit transaction id. Usually, we use a queue to pass + * WALEdit to background consumer thread, and the transaction id is the sequence number of the + * corresponding entry in queue. + */ + protected volatile long highestUnsyncedTxid = -1; + + /** + * Updated to the transaction id of the last successful sync call. This can be less than + * {@link #highestUnsyncedTxid} for case where we have an append where a sync has not yet come in + * for it. + */ + protected final AtomicLong highestSyncedTxid = new AtomicLong(0); + + /** + * The total size of wal + */ + protected final AtomicLong totalLogSize = new AtomicLong(0); + /** + * Current log file. + */ + volatile W writer; + + protected volatile boolean closed = false; + + protected final AtomicBoolean shutdown = new AtomicBoolean(false); + /** + * WAL Comparator; it compares the timestamp (log filenum), present in the log file name. Throws + * an IllegalArgumentException if used to compare paths from different wals. + */ + final Comparator<Path> LOG_NAME_COMPARATOR = new Comparator<Path>() { + @Override + public int compare(Path o1, Path o2) { + return Long.compare(getFileNumFromFileName(o1), getFileNumFromFileName(o2)); + } + }; + + /** + * Map of WAL log file to the latest sequence ids of all regions it has entries of. The map is + * sorted by the log file creation timestamp (contained in the log file name). + */ + protected ConcurrentNavigableMap<Path, Map<byte[], Long>> byWalRegionSequenceIds = + new ConcurrentSkipListMap<Path, Map<byte[], Long>>(LOG_NAME_COMPARATOR); + + /** + * Map of {@link SyncFuture}s keyed by Handler objects. Used so we reuse SyncFutures. + * <p> + * TODO: Reus FSWALEntry's rather than create them anew each time as we do SyncFutures here. + * <p> + * TODO: Add a FSWalEntry and SyncFuture as thread locals on handlers rather than have them get + * them from this Map? + */ + private final ConcurrentMap<Thread, SyncFuture> syncFuturesByHandler; + + /** + * A log file has a creation timestamp (in ms) in its file name ({@link #filenum}. This helper + * method returns the creation timestamp from a given log file. It extracts the timestamp assuming + * the filename is created with the {@link #computeFilename(long filenum)} method. + * @return timestamp, as in the log file name. + */ + protected long getFileNumFromFileName(Path fileName) { + if (fileName == null) { + throw new IllegalArgumentException("file name can't be null"); + } + if (!ourFiles.accept(fileName)) { + throw new IllegalArgumentException("The log file " + fileName + + " doesn't belong to this WAL. (" + toString() + ")"); + } + final String fileNameString = fileName.toString(); + String chompedPath = + fileNameString.substring(prefixPathStr.length(), + (fileNameString.length() - walFileSuffix.length())); + return Long.parseLong(chompedPath); + } + + private int calculateMaxLogFiles(float memstoreSizeRatio, long logRollSize) { + MemoryUsage mu = ManagementFactory.getMemoryMXBean().getHeapMemoryUsage(); + return Math.round(mu.getMax() * memstoreSizeRatio * 2 / logRollSize); + } + + protected AbstractFSWAL(final FileSystem fs, final Path rootDir, final String logDir, + final String archiveDir, final Configuration conf, final List<WALActionsListener> listeners, + final boolean failIfWALExists, final String prefix, final String suffix) + throws FailedLogCloseException, IOException { + this.fs = fs; + this.walDir = new Path(rootDir, logDir); + this.walArchiveDir = new Path(rootDir, archiveDir); + this.conf = conf; + + if (!fs.exists(walDir) && !fs.mkdirs(walDir)) { + throw new IOException("Unable to mkdir " + walDir); + } + + if (!fs.exists(this.walArchiveDir)) { + if (!fs.mkdirs(this.walArchiveDir)) { + throw new IOException("Unable to mkdir " + this.walArchiveDir); + } + } + + // If prefix is null||empty then just name it wal + this.walFilePrefix = + prefix == null || prefix.isEmpty() ? "wal" : URLEncoder.encode(prefix, "UTF8"); + // we only correctly differentiate suffices when numeric ones start with '.' + if (suffix != null && !(suffix.isEmpty()) && !(suffix.startsWith(WAL_FILE_NAME_DELIMITER))) { + throw new IllegalArgumentException("WAL suffix must start with '" + WAL_FILE_NAME_DELIMITER + + "' but instead was '" + suffix + "'"); + } + // Now that it exists, set the storage policy for the entire directory of wal files related to + // this FSHLog instance + FSUtils.setStoragePolicy(fs, conf, this.walDir, HConstants.WAL_STORAGE_POLICY, + HConstants.DEFAULT_WAL_STORAGE_POLICY); + this.walFileSuffix = (suffix == null) ? "" : URLEncoder.encode(suffix, "UTF8"); + this.prefixPathStr = new Path(walDir, walFilePrefix + WAL_FILE_NAME_DELIMITER).toString(); + + this.ourFiles = new PathFilter() { + @Override + public boolean accept(final Path fileName) { + // The path should start with dir/<prefix> and end with our suffix + final String fileNameString = fileName.toString(); + if (!fileNameString.startsWith(prefixPathStr)) { + return false; + } + if (walFileSuffix.isEmpty()) { + // in the case of the null suffix, we need to ensure the filename ends with a timestamp. + return org.apache.commons.lang.StringUtils.isNumeric(fileNameString + .substring(prefixPathStr.length())); + } else if (!fileNameString.endsWith(walFileSuffix)) { + return false; + } + return true; + } + }; + + if (failIfWALExists) { + final FileStatus[] walFiles = FSUtils.listStatus(fs, walDir, ourFiles); + if (null != walFiles && 0 != walFiles.length) { + throw new IOException("Target WAL already exists within directory " + walDir); + } + } + + // Register listeners. TODO: Should this exist anymore? We have CPs? + if (listeners != null) { + for (WALActionsListener i : listeners) { + registerWALActionsListener(i); + } + } + this.coprocessorHost = new WALCoprocessorHost(this, conf); + + // Get size to roll log at. Roll at 95% of HDFS block size so we avoid crossing HDFS blocks + // (it costs a little x'ing bocks) + final long blocksize = + this.conf.getLong("hbase.regionserver.hlog.blocksize", + FSUtils.getDefaultBlockSize(this.fs, this.walDir)); + this.logrollsize = + (long) (blocksize * conf.getFloat("hbase.regionserver.logroll.multiplier", 0.95f)); + + float memstoreRatio = + conf.getFloat(HeapMemorySizeUtil.MEMSTORE_SIZE_KEY, conf.getFloat( + HeapMemorySizeUtil.MEMSTORE_SIZE_OLD_KEY, HeapMemorySizeUtil.DEFAULT_MEMSTORE_SIZE)); + boolean maxLogsDefined = conf.get("hbase.regionserver.maxlogs") != null; + if (maxLogsDefined) { + LOG.warn("'hbase.regionserver.maxlogs' was deprecated."); + } + this.maxLogs = + conf.getInt("hbase.regionserver.maxlogs", + Math.max(32, calculateMaxLogFiles(memstoreRatio, logrollsize))); + + LOG.info("WAL configuration: blocksize=" + StringUtils.byteDesc(blocksize) + ", rollsize=" + + StringUtils.byteDesc(this.logrollsize) + ", prefix=" + this.walFilePrefix + ", suffix=" + + walFileSuffix + ", logDir=" + this.walDir + ", archiveDir=" + this.walArchiveDir); + this.slowSyncNs = + 1000000 * conf.getInt("hbase.regionserver.hlog.slowsync.ms", DEFAULT_SLOW_SYNC_TIME_MS); + int maxHandlersCount = conf.getInt(HConstants.REGION_SERVER_HANDLER_COUNT, 200); + // Presize our map of SyncFutures by handler objects. + this.syncFuturesByHandler = new ConcurrentHashMap<Thread, SyncFuture>(maxHandlersCount); + } + + @Override + public void registerWALActionsListener(WALActionsListener listener) { + this.listeners.add(listener); + } + + @Override + public boolean unregisterWALActionsListener(WALActionsListener listener) { + return this.listeners.remove(listener); + } + + @Override + public WALCoprocessorHost getCoprocessorHost() { + return coprocessorHost; + } + + @Override + public Long startCacheFlush(byte[] encodedRegionName, Set<byte[]> families) { + if (!closeBarrier.beginOp()) { + LOG.info("Flush not started for " + Bytes.toString(encodedRegionName) + "; server closing."); + return null; + } + return this.sequenceIdAccounting.startCacheFlush(encodedRegionName, families); + } + + @Override + public void completeCacheFlush(byte[] encodedRegionName) { + this.sequenceIdAccounting.completeCacheFlush(encodedRegionName); + closeBarrier.endOp(); + } + + @Override + public void abortCacheFlush(byte[] encodedRegionName) { + this.sequenceIdAccounting.abortCacheFlush(encodedRegionName); + closeBarrier.endOp(); + } + + @Override + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName) { + // Used by tests. Deprecated as too subtle for general usage. + return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName); + } + + @Override + public long getEarliestMemstoreSeqNum(byte[] encodedRegionName, byte[] familyName) { + // This method is used by tests and for figuring if we should flush or not because our + // sequenceids are too old. It is also used reporting the master our oldest sequenceid for use + // figuring what edits can be skipped during log recovery. getEarliestMemStoreSequenceId + // from this.sequenceIdAccounting is looking first in flushingOldestStoreSequenceIds, the + // currently flushing sequence ids, and if anything found there, it is returning these. This is + // the right thing to do for the reporting oldest sequenceids to master; we won't skip edits if + // we crash during the flush. For figuring what to flush, we might get requeued if our sequence + // id is old even though we are currently flushing. This may mean we do too much flushing. + return this.sequenceIdAccounting.getLowestSequenceId(encodedRegionName, familyName); + } + + @Override + public byte[][] rollWriter() throws FailedLogCloseException, IOException { + return rollWriter(false); + } + + /** + * This is a convenience method that computes a new filename with a given file-number. + * @param filenum to use + * @return Path + */ + protected Path computeFilename(final long filenum) { + if (filenum < 0) { + throw new RuntimeException("WAL file number can't be < 0"); + } + String child = walFilePrefix + WAL_FILE_NAME_DELIMITER + filenum + walFileSuffix; + return new Path(walDir, child); + } + + /** + * This is a convenience method that computes a new filename with a given using the current WAL + * file-number + * @return Path + */ + public Path getCurrentFileName() { + return computeFilename(this.filenum.get()); + } + + /** + * retrieve the next path to use for writing. Increments the internal filenum. + */ + private Path getNewPath() throws IOException { + this.filenum.set(System.currentTimeMillis()); + Path newPath = getCurrentFileName(); + while (fs.exists(newPath)) { + this.filenum.incrementAndGet(); + newPath = getCurrentFileName(); + } + return newPath; + } + + @VisibleForTesting + Path getOldPath() { + long currentFilenum = this.filenum.get(); + Path oldPath = null; + if (currentFilenum > 0) { + // ComputeFilename will take care of meta wal filename + oldPath = computeFilename(currentFilenum); + } // I presume if currentFilenum is <= 0, this is first file and null for oldPath if fine? + return oldPath; + } + + /** + * Tell listeners about pre log roll. + */ + private void tellListenersAboutPreLogRoll(final Path oldPath, final Path newPath) + throws IOException { + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.preLogRoll(oldPath, newPath); + } + } + } + + /** + * Tell listeners about post log roll. + */ + private void tellListenersAboutPostLogRoll(final Path oldPath, final Path newPath) + throws IOException { + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.postLogRoll(oldPath, newPath); + } + } + } + + // public only until class moves to o.a.h.h.wal + /** @return the number of rolled log files */ + public int getNumRolledLogFiles() { + return byWalRegionSequenceIds.size(); + } + + // public only until class moves to o.a.h.h.wal + /** @return the number of log files in use */ + public int getNumLogFiles() { + // +1 for current use log + return getNumRolledLogFiles() + 1; + } + + /** + * If the number of un-archived WAL files is greater than maximum allowed, check the first + * (oldest) WAL file, and returns those regions which should be flushed so that it can be + * archived. + * @return regions (encodedRegionNames) to flush in order to archive oldest WAL file. + */ + byte[][] findRegionsToForceFlush() throws IOException { + byte[][] regions = null; + int logCount = getNumRolledLogFiles(); + if (logCount > this.maxLogs && logCount > 0) { + Map.Entry<Path, Map<byte[], Long>> firstWALEntry = this.byWalRegionSequenceIds.firstEntry(); + regions = this.sequenceIdAccounting.findLower(firstWALEntry.getValue()); + } + if (regions != null) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < regions.length; i++) { + if (i > 0) { + sb.append(", "); + } + sb.append(Bytes.toStringBinary(regions[i])); + } + LOG.info("Too many WALs; count=" + logCount + ", max=" + this.maxLogs + "; forcing flush of " + + regions.length + " regions(s): " + sb.toString()); + } + return regions; + } + + /** + * Archive old logs. A WAL is eligible for archiving if all its WALEdits have been flushed. + */ + private void cleanOldLogs() throws IOException { + List<Path> logsToArchive = null; + // For each log file, look at its Map of regions to highest sequence id; if all sequence ids + // are older than what is currently in memory, the WAL can be GC'd. + for (Map.Entry<Path, Map<byte[], Long>> e : this.byWalRegionSequenceIds.entrySet()) { + Path log = e.getKey(); + Map<byte[], Long> sequenceNums = e.getValue(); + if (this.sequenceIdAccounting.areAllLower(sequenceNums)) { + if (logsToArchive == null) { + logsToArchive = new ArrayList<Path>(); + } + logsToArchive.add(log); + if (LOG.isTraceEnabled()) { + LOG.trace("WAL file ready for archiving " + log); + } + } + } + if (logsToArchive != null) { + for (Path p : logsToArchive) { + this.totalLogSize.addAndGet(-this.fs.getFileStatus(p).getLen()); + archiveLogFile(p); + this.byWalRegionSequenceIds.remove(p); + } + } + } + + /* + * only public so WALSplitter can use. + * @return archived location of a WAL file with the given path p + */ + public static Path getWALArchivePath(Path archiveDir, Path p) { + return new Path(archiveDir, p.getName()); + } + + private void archiveLogFile(final Path p) throws IOException { + Path newPath = getWALArchivePath(this.walArchiveDir, p); + // Tell our listeners that a log is going to be archived. + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.preLogArchive(p, newPath); + } + } + LOG.info("Archiving " + p + " to " + newPath); + if (!FSUtils.renameAndSetModifyTime(this.fs, p, newPath)) { + throw new IOException("Unable to rename " + p + " to " + newPath); + } + // Tell our listeners that a log has been archived. + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.postLogArchive(p, newPath); + } + } + } + + /** + * Cleans up current writer closing it and then puts in place the passed in + * <code>nextWriter</code>. + * <p> + * <ul> + * <li>In the case of creating a new WAL, oldPath will be null.</li> + * <li>In the case of rolling over from one file to the next, none of the parameters will be null. + * </li> + * <li>In the case of closing out this FSHLog with no further use newPath and nextWriter will be + * null.</li> + * </ul> + * @param oldPath may be null + * @param newPath may be null + * @param nextWriter may be null + * @return the passed in <code>newPath</code> + * @throws IOException if there is a problem flushing or closing the underlying FS + */ + Path replaceWriter(Path oldPath, Path newPath, W nextWriter) throws IOException { + TraceScope scope = Trace.startSpan("FSHFile.replaceWriter"); + try { + long oldFileLen = 0L; + doReplaceWriter(oldPath, newPath, nextWriter); + int oldNumEntries = this.numEntries.get(); + final String newPathString = (null == newPath ? null : FSUtils.getPath(newPath)); + if (oldPath != null) { + this.byWalRegionSequenceIds.put(oldPath, this.sequenceIdAccounting.resetHighest()); + this.totalLogSize.addAndGet(oldFileLen); + LOG.info("Rolled WAL " + FSUtils.getPath(oldPath) + " with entries=" + oldNumEntries + + ", filesize=" + StringUtils.byteDesc(oldFileLen) + "; new WAL " + newPathString); + } else { + LOG.info("New WAL " + newPathString); + } + return newPath; + } finally { + scope.close(); + } + } + + protected Span blockOnSync(final SyncFuture syncFuture) throws IOException { + // Now we have published the ringbuffer, halt the current thread until we get an answer back. + try { + syncFuture.get(); + return syncFuture.getSpan(); + } catch (InterruptedException ie) { + LOG.warn("Interrupted", ie); + throw convertInterruptedExceptionToIOException(ie); + } catch (ExecutionException e) { + throw ensureIOException(e.getCause()); + } + } + + private static IOException ensureIOException(final Throwable t) { + return (t instanceof IOException) ? (IOException) t : new IOException(t); + } + + private IOException convertInterruptedExceptionToIOException(final InterruptedException ie) { + Thread.currentThread().interrupt(); + IOException ioe = new InterruptedIOException(); + ioe.initCause(ie); + return ioe; + } + + @Override + public byte[][] rollWriter(boolean force) throws FailedLogCloseException, IOException { + rollWriterLock.lock(); + try { + // Return if nothing to flush. + if (!force && this.writer != null && this.numEntries.get() <= 0) { + return null; + } + byte[][] regionsToFlush = null; + if (this.closed) { + LOG.debug("WAL closed. Skipping rolling of writer"); + return regionsToFlush; + } + if (!closeBarrier.beginOp()) { + LOG.debug("WAL closing. Skipping rolling of writer"); + return regionsToFlush; + } + TraceScope scope = Trace.startSpan("FSHLog.rollWriter"); + try { + Path oldPath = getOldPath(); + Path newPath = getNewPath(); + // Any exception from here on is catastrophic, non-recoverable so we currently abort. + W nextWriter = this.createWriterInstance(newPath); + tellListenersAboutPreLogRoll(oldPath, newPath); + // NewPath could be equal to oldPath if replaceWriter fails. + newPath = replaceWriter(oldPath, newPath, nextWriter); + tellListenersAboutPostLogRoll(oldPath, newPath); + // Can we delete any of the old log files? + if (getNumRolledLogFiles() > 0) { + cleanOldLogs(); + regionsToFlush = findRegionsToForceFlush(); + } + } finally { + closeBarrier.endOp(); + assert scope == NullScope.INSTANCE || !scope.isDetached(); + scope.close(); + } + return regionsToFlush; + } finally { + rollWriterLock.unlock(); + } + } + + // public only until class moves to o.a.h.h.wal + /** @return the size of log files in use */ + public long getLogFileSize() { + return this.totalLogSize.get(); + } + + // public only until class moves to o.a.h.h.wal + public void requestLogRoll() { + requestLogRoll(false); + } + + /** + * Get the backing files associated with this WAL. + * @return may be null if there are no files. + */ + protected FileStatus[] getFiles() throws IOException { + return FSUtils.listStatus(fs, walDir, ourFiles); + } + + @Override + public void shutdown() throws IOException { + if (!shutdown.compareAndSet(false, true)) { + return; + } + closed = true; + try { + // Prevent all further flushing and rolling. + closeBarrier.stopAndDrainOps(); + } catch (InterruptedException e) { + LOG.error("Exception while waiting for cache flushes and log rolls", e); + Thread.currentThread().interrupt(); + } + // Tell our listeners that the log is closing + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.logCloseRequested(); + } + } + doShutdown(); + } + + @Override + public void close() throws IOException { + shutdown(); + final FileStatus[] files = getFiles(); + if (null != files && 0 != files.length) { + for (FileStatus file : files) { + Path p = getWALArchivePath(this.walArchiveDir, file.getPath()); + // Tell our listeners that a log is going to be archived. + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.preLogArchive(file.getPath(), p); + } + } + + if (!FSUtils.renameAndSetModifyTime(fs, file.getPath(), p)) { + throw new IOException("Unable to rename " + file.getPath() + " to " + p); + } + // Tell our listeners that a log was archived. + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.postLogArchive(file.getPath(), p); + } + } + } + LOG.debug("Moved " + files.length + " WAL file(s) to " + FSUtils.getPath(this.walArchiveDir)); + } + LOG.info("Closed WAL: " + toString()); + } + + protected SyncFuture getSyncFuture(final long sequence, Span span) { + SyncFuture syncFuture = this.syncFuturesByHandler.get(Thread.currentThread()); + if (syncFuture == null) { + syncFuture = new SyncFuture(); + this.syncFuturesByHandler.put(Thread.currentThread(), syncFuture); + } + return syncFuture.reset(sequence, span); + } + + protected void requestLogRoll(boolean tooFewReplicas) { + if (!this.listeners.isEmpty()) { + for (WALActionsListener i : this.listeners) { + i.logRollRequested(tooFewReplicas); + } + } + } + + long getUnflushedEntriesCount() { + long highestSynced = this.highestSyncedTxid.get(); + long highestUnsynced = this.highestUnsyncedTxid; + return highestSynced >= highestUnsynced ? 0 : highestUnsynced - highestSynced; + } + + boolean isUnflushedEntries() { + return getUnflushedEntriesCount() > 0; + } + + /** + * Exposed for testing only. Use to tricks like halt the ring buffer appending. + */ + @VisibleForTesting + void atHeadOfRingBufferEventHandlerAppend() { + // Noop + } + + protected boolean append(W writer, FSWALEntry entry) throws IOException { + // TODO: WORK ON MAKING THIS APPEND FASTER. DOING WAY TOO MUCH WORK WITH CPs, PBing, etc. + atHeadOfRingBufferEventHandlerAppend(); + long start = EnvironmentEdgeManager.currentTime(); + byte[] encodedRegionName = entry.getKey().getEncodedRegionName(); + long regionSequenceId = WALKey.NO_SEQUENCE_ID; + // We are about to append this edit; update the region-scoped sequence number. Do it + // here inside this single appending/writing thread. Events are ordered on the ringbuffer + // so region sequenceids will also be in order. + regionSequenceId = entry.stampRegionSequenceId(); + // Edits are empty, there is nothing to append. Maybe empty when we are looking for a + // region sequence id only, a region edit/sequence id that is not associated with an actual + // edit. It has to go through all the rigmarole to be sure we have the right ordering. + if (entry.getEdit().isEmpty()) { + return false; + } + + // Coprocessor hook. + if (!coprocessorHost.preWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit())) { + if (entry.getEdit().isReplay()) { + // Set replication scope null so that this won't be replicated + entry.getKey().serializeReplicationScope(false); + } + } + if (!listeners.isEmpty()) { + for (WALActionsListener i : listeners) { + i.visitLogEntryBeforeWrite(entry.getKey(), entry.getEdit()); + } + } + doAppend(writer, entry); + assert highestUnsyncedTxid < entry.getTxid(); + highestUnsyncedTxid = entry.getTxid(); + sequenceIdAccounting.update(encodedRegionName, entry.getFamilyNames(), regionSequenceId, + entry.isInMemstore()); + coprocessorHost.postWALWrite(entry.getHRegionInfo(), entry.getKey(), entry.getEdit()); + // Update metrics. + postAppend(entry, EnvironmentEdgeManager.currentTime() - start); + numEntries.incrementAndGet(); + return true; + } + + private long postAppend(final Entry e, final long elapsedTime) { + long len = 0; + if (!listeners.isEmpty()) { + for (Cell cell : e.getEdit().getCells()) { + len += CellUtil.estimatedSerializedSizeOf(cell); + } + for (WALActionsListener listener : listeners) { + listener.postAppend(len, elapsedTime); + } + } + return len; + } + + protected void postSync(final long timeInNanos, final int handlerSyncs) { + if (timeInNanos > this.slowSyncNs) { + String msg = + new StringBuilder().append("Slow sync cost: ").append(timeInNanos / 1000000) + .append(" ms, current pipeline: ").append(Arrays.toString(getPipeline())).toString(); + Trace.addTimelineAnnotation(msg); + LOG.info(msg); + } + if (!listeners.isEmpty()) { + for (WALActionsListener listener : listeners) { + listener.postSync(timeInNanos, handlerSyncs); + } + } + } + + /** + * NOTE: This append, at a time that is usually after this call returns, starts an mvcc + * transaction by calling 'begin' wherein which we assign this update a sequenceid. At assignment + * time, we stamp all the passed in Cells inside WALEdit with their sequenceId. You must + * 'complete' the transaction this mvcc transaction by calling + * MultiVersionConcurrencyControl#complete(...) or a variant otherwise mvcc will get stuck. Do it + * in the finally of a try/finally block within which this append lives and any subsequent + * operations like sync or update of memstore, etc. Get the WriteEntry to pass mvcc out of the + * passed in WALKey <code>walKey</code> parameter. Be warned that the WriteEntry is not + * immediately available on return from this method. It WILL be available subsequent to a sync of + * this append; otherwise, you will just have to wait on the WriteEntry to get filled in. + */ + @Override + public abstract long append(HRegionInfo info, WALKey key, WALEdit edits, boolean inMemstore) + throws IOException; + + protected abstract void doAppend(W writer, FSWALEntry entry) throws IOException; + + protected abstract W createWriterInstance(Path path) throws IOException; + + /** + * @return old wal file size + */ + protected abstract long doReplaceWriter(Path oldPath, Path newPath, W nextWriter) + throws IOException; + + protected abstract void doShutdown() throws IOException; + + /** + * This method gets the pipeline for the current WAL. + */ + @VisibleForTesting + abstract DatanodeInfo[] getPipeline(); + + /** + * This method gets the datanode replication count for the current WAL. + */ + @VisibleForTesting + abstract int getLogReplication(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java new file mode 100644 index 0000000..66f1f54 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractProtobufLogWriter.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.DEFAULT_WAL_TRAILER_WARN_SIZE; +import static org.apache.hadoop.hbase.regionserver.wal.ProtobufLogReader.WAL_TRAILER_WARN_SIZE; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.codec.Codec; +import org.apache.hadoop.hbase.io.util.LRUDictionary; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.util.FSUtils; + +/** + * Base class for Protobuf log writer. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public abstract class AbstractProtobufLogWriter { + + private static final Log LOG = LogFactory.getLog(AbstractProtobufLogWriter.class); + + protected CompressionContext compressionContext; + protected Configuration conf; + protected Codec.Encoder cellEncoder; + protected WALCellCodec.ByteStringCompressor compressor; + protected boolean trailerWritten; + protected WALTrailer trailer; + // maximum size of the wal Trailer in bytes. If a user writes/reads a trailer with size larger + // than this size, it is written/read respectively, with a WARN message in the log. + protected int trailerWarnSize; + + protected AtomicLong length = new AtomicLong(); + + private WALCellCodec getCodec(Configuration conf, CompressionContext compressionContext) + throws IOException { + return WALCellCodec.create(conf, null, compressionContext); + } + + protected WALHeader buildWALHeader(Configuration conf, WALHeader.Builder builder) + throws IOException { + if (!builder.hasWriterClsName()) { + builder.setWriterClsName(ProtobufLogWriter.class.getSimpleName()); + } + if (!builder.hasCellCodecClsName()) { + builder.setCellCodecClsName(WALCellCodec.getWALCellCodecClass(conf)); + } + return builder.build(); + } + + private boolean initializeCompressionContext(Configuration conf, Path path) throws IOException { + boolean doCompress = conf.getBoolean(HConstants.ENABLE_WAL_COMPRESSION, false); + if (doCompress) { + try { + this.compressionContext = new CompressionContext(LRUDictionary.class, + FSUtils.isRecoveredEdits(path), + conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true)); + } catch (Exception e) { + throw new IOException("Failed to initiate CompressionContext", e); + } + } + return doCompress; + } + + public void init(FileSystem fs, Path path, Configuration conf, boolean overwritable) + throws IOException { + this.conf = conf; + boolean doCompress = initializeCompressionContext(conf, path); + this.trailerWarnSize = conf.getInt(WAL_TRAILER_WARN_SIZE, DEFAULT_WAL_TRAILER_WARN_SIZE); + int bufferSize = FSUtils.getDefaultBufferSize(fs); + short replication = (short) conf.getInt("hbase.regionserver.hlog.replication", + FSUtils.getDefaultReplication(fs, path)); + long blockSize = conf.getLong("hbase.regionserver.hlog.blocksize", + FSUtils.getDefaultBlockSize(fs, path)); + + initOutput(fs, path, overwritable, bufferSize, replication, blockSize); + + boolean doTagCompress = doCompress + && conf.getBoolean(CompressionContext.ENABLE_WAL_TAGS_COMPRESSION, true); + length.set(writeMagicAndWALHeader(ProtobufLogReader.PB_WAL_MAGIC, buildWALHeader(conf, + WALHeader.newBuilder().setHasCompression(doCompress).setHasTagCompression(doTagCompress)))); + + initAfterHeader(doCompress); + + // instantiate trailer to default value. + trailer = WALTrailer.newBuilder().build(); + if (LOG.isTraceEnabled()) { + LOG.trace("Initialized protobuf WAL=" + path + ", compression=" + doCompress); + } + } + + protected void initAfterHeader(boolean doCompress) throws IOException { + WALCellCodec codec = getCodec(conf, this.compressionContext); + this.cellEncoder = codec.getEncoder(getOutputStreamForCellEncoder()); + if (doCompress) { + this.compressor = codec.getByteStringCompressor(); + } + } + + void setWALTrailer(WALTrailer walTrailer) { + this.trailer = walTrailer; + } + + public long getLength() { + return length.get(); + } + + private WALTrailer buildWALTrailer(WALTrailer.Builder builder) { + return builder.build(); + } + + protected void writeWALTrailer() { + try { + int trailerSize = 0; + if (this.trailer == null) { + // use default trailer. + LOG.warn("WALTrailer is null. Continuing with default."); + this.trailer = buildWALTrailer(WALTrailer.newBuilder()); + trailerSize = this.trailer.getSerializedSize(); + } else if ((trailerSize = this.trailer.getSerializedSize()) > this.trailerWarnSize) { + // continue writing after warning the user. + LOG.warn("Please investigate WALTrailer usage. Trailer size > maximum size : " + trailerSize + + " > " + this.trailerWarnSize); + } + length.set(writeWALTrailerAndMagic(trailer, ProtobufLogReader.PB_WAL_COMPLETE_MAGIC)); + this.trailerWritten = true; + } catch (IOException ioe) { + LOG.warn("Failed to write trailer, non-fatal, continuing...", ioe); + } + } + + protected abstract void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, + short replication, long blockSize) throws IOException; + + /** + * return the file length after written. + */ + protected abstract long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException; + + protected abstract long writeWALTrailerAndMagic(WALTrailer trailer, byte[] magic) + throws IOException; + + protected abstract OutputStream getOutputStreamForCellEncoder(); +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java new file mode 100644 index 0000000..b80f2c9 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java @@ -0,0 +1,732 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import static org.apache.hadoop.hbase.HConstants.REGION_SERVER_HANDLER_COUNT; +import static org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.shouldRetryCreate; +import io.netty.channel.EventLoop; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.Promise; +import io.netty.util.concurrent.ScheduledFuture; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.nio.channels.CompletionHandler; +import java.util.ArrayDeque; +import java.util.Comparator; +import java.util.Deque; +import java.util.Iterator; +import java.util.List; +import java.util.PriorityQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.client.ConnectionUtils; +import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutput; +import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper.NameNodeException; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.WALKey; +import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.htrace.NullScope; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +/** + * An asynchronous implementation of FSWAL. + * <p> + * Here 'waitingConsumePayloads' acts as the RingBuffer in FSHLog. We do not use RingBuffer here + * because RingBuffer need an exclusive thread to consume the entries in it, and here we want to run + * the append and sync operation inside EventLoop. We can not use EventLoop as the RingBuffer's + * executor otherwise the EventLoop can not process any other events such as socket read and write. + * <p> + * For append, we process it as follow: + * <ol> + * <li>In the caller thread(typically, in the rpc handler thread): + * <ol> + * <li>Lock 'waitingConsumePayloads', bump nextTxid, and insert the entry to + * 'waitingConsumePayloads'.</li> + * <li>Schedule the consumer task if needed. See {@link #shouldScheduleConsumer()} for more details. + * </li> + * </ol> + * </li> + * <li>In the consumer task(in the EventLoop thread) + * <ol> + * <li>Poll the entry from 'waitingConsumePayloads' and insert it into 'waitingAppendEntries'</li> + * <li>Poll the entry from 'waitingAppendEntries', append it to the AsyncWriter, and insert it into + * 'unackedEntries'</li> + * <li>If the buffered size reaches {@link #batchSize}, or there is a sync request, then we call + * sync on the AsyncWriter.</li> + * <li>In the callback methods(CompletionHandler): + * <ul> + * <li>If succeeded, poll the entry from 'unackedEntries' and drop it.</li> + * <li>If failed, add all the entries in 'unackedEntries' back to 'waitingAppendEntries' and wait + * for writing them again.</li> + * </ul> + * </li> + * </ol> + * </li> + * </ol> + * For sync, the processing stages are almost same except that if it is not assigned with a new + * 'txid', we just assign the previous 'txid' to it without bumping the 'nextTxid'. And different + * from FSHLog, we will open a new writer and rewrite unacked entries to the new writer and sync + * again if we hit a sync error. + * <p> + * Here we only describe the logic of doReplaceWriter. The main logic of rollWriter is same with + * FSHLog.<br> + * For a normal roll request(for example, we have reached the log roll size): + * <ol> + * <li>In the log roller thread, we add a roll payload to 'waitingConsumePayloads', and then wait on + * the rollPromise(see {@link #waitForSafePoint()}).</li> + * <li>In the consumer thread, we will stop polling entries from 'waitingConsumePayloads' if we hit + * a Payload which contains a roll request.</li> + * <li>Append all entries to current writer, issue a sync request if possible.</li> + * <li>If sync succeeded, check if we could finish a roll request. There 3 conditions: + * <ul> + * <li>'rollPromise' is not null which means we have a pending roll request.</li> + * <li>'waitingAppendEntries' is empty.</li> + * <li>'unackedEntries' is empty.</li> + * </ul> + * </li> + * <li>Back to the log roller thread, now we can confirm that there are no out-going entries, i.e., + * we reach a safe point. So it is safe to replace old writer with new writer now.</li> + * <li>Acquire 'waitingConsumePayloads' lock, set 'writerBroken' and 'waitingRoll' to false, cancel + * log roller exit checker if any(see the comments in the 'failed' method of the sync + * CompletionHandler to see why we need a checker here).</li> + * <li>Schedule the consumer task if needed.</li> + * <li>Schedule a background task to close the old writer.</li> + * </ol> + * For a broken writer roll request, the only difference is that we can bypass the wait for safe + * point stage. See the comments in the 'failed' method of the sync CompletionHandler for more + * details. + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.CONFIG) +public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> { + + private static final Log LOG = LogFactory.getLog(AsyncFSWAL.class); + + public static final String WAL_BATCH_SIZE = "hbase.wal.batch.size"; + public static final long DEFAULT_WAL_BATCH_SIZE = 64L * 1024; + + public static final String ASYNC_WAL_CREATE_MAX_RETRIES = "hbase.wal.async.create.retries"; + public static final int DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES = 10; + + public static final String ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = + "hbase.wal.async.logroller.exited.check.interval.ms"; + public static final long DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS = 1000; + + /** + * Carry things that we want to pass to the consume task in event loop. Only one field can be + * non-null. + * <p> + * TODO: need to unify this and {@link RingBufferTruck}. There are mostly the same thing. + */ + private static final class Payload { + + // a wal entry which need to be appended + public final FSWALEntry entry; + + // indicate that we need to sync our wal writer. + public final SyncFuture sync; + + // incidate that we want to roll the writer. + public final Promise<Void> roll; + + public Payload(FSWALEntry entry) { + this.entry = entry; + this.sync = null; + this.roll = null; + } + + public Payload(SyncFuture sync) { + this.entry = null; + this.sync = sync; + this.roll = null; + } + + public Payload(Promise<Void> roll) { + this.entry = null; + this.sync = null; + this.roll = roll; + } + + @Override + public String toString() { + return "Payload [entry=" + entry + ", sync=" + sync + ", roll=" + roll + "]"; + } + } + + private final EventLoop eventLoop; + + private final Deque<Payload> waitingConsumePayloads; + + // like the ringbuffer sequence. Every FSWALEntry and SyncFuture will be assigned a txid and + // then added to waitingConsumePayloads. + private long nextTxid = 1L; + + private boolean consumerScheduled; + + // new writer is created and we are waiting for old writer to be closed. + private boolean waitingRoll; + + // writer is broken and rollWriter is needed. + private boolean writerBroken; + + private final long batchSize; + + private final int createMaxRetries; + + private final long logRollerExitedCheckIntervalMs; + + private final ExecutorService closeExecutor = Executors + .newCachedThreadPool(new ThreadFactoryBuilder().setDaemon(true) + .setNameFormat("Close-WAL-Writer-%d").build()); + + private volatile FanOutOneBlockAsyncDFSOutput hdfsOut; + + private final Deque<FSWALEntry> waitingAppendEntries = new ArrayDeque<FSWALEntry>(); + + private final Deque<FSWALEntry> unackedEntries = new ArrayDeque<FSWALEntry>(); + + private final PriorityQueue<SyncFuture> syncFutures = new PriorityQueue<SyncFuture>(11, + SEQ_COMPARATOR); + + private Promise<Void> rollPromise; + + // the highest txid of WAL entries being processed + private long highestProcessedTxid; + + // file length when we issue last sync request on the writer + private long fileLengthAtLastSync; + + private volatile boolean logRollerExited; + + private final class LogRollerExitedChecker implements Runnable { + + private boolean cancelled; + + private ScheduledFuture<?> future; + + public synchronized void setFuture(ScheduledFuture<?> future) { + this.future = future; + } + + @Override + public void run() { + if (!logRollerExited) { + return; + } + // rollWriter is called in the log roller thread, and logRollerExited will be set just before + // the log rolled exit. So here we can confirm that no one could cancel us if the 'canceled' + // check passed. So it is safe to release the lock after checking 'canceled' flag. + synchronized (this) { + if (cancelled) { + return; + } + } + unackedEntries.clear(); + waitingAppendEntries.clear(); + IOException error = new IOException("sync failed but log roller exited"); + for (SyncFuture future; (future = syncFutures.peek()) != null;) { + future.done(highestProcessedTxid, error); + syncFutures.remove(); + } + synchronized (waitingConsumePayloads) { + for (Payload p : waitingConsumePayloads) { + if (p.entry != null) { + try { + p.entry.stampRegionSequenceId(); + } catch (IOException e) { + throw new AssertionError("should not happen", e); + } + } else if (p.sync != null) { + p.sync.done(nextTxid, error); + } + } + waitingConsumePayloads.clear(); + } + } + + public synchronized void cancel() { + future.cancel(false); + cancelled = true; + } + } + + private LogRollerExitedChecker logRollerExitedChecker; + + public AsyncFSWAL(FileSystem fs, Path rootDir, String logDir, String archiveDir, + Configuration conf, List<WALActionsListener> listeners, boolean failIfWALExists, + String prefix, String suffix, EventLoop eventLoop) throws FailedLogCloseException, + IOException { + super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix); + this.eventLoop = eventLoop; + int maxHandlersCount = conf.getInt(REGION_SERVER_HANDLER_COUNT, 200); + waitingConsumePayloads = new ArrayDeque<Payload>(maxHandlersCount * 3); + batchSize = conf.getLong(WAL_BATCH_SIZE, DEFAULT_WAL_BATCH_SIZE); + createMaxRetries = + conf.getInt(ASYNC_WAL_CREATE_MAX_RETRIES, DEFAULT_ASYNC_WAL_CREATE_MAX_RETRIES); + logRollerExitedCheckIntervalMs = + conf.getLong(ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS, + DEFAULT_ASYNC_WAL_LOG_ROLLER_EXITED_CHECK_INTERVAL_MS); + rollWriter(); + } + + private void tryFinishRoll() { + // 1. a roll is requested + // 2. we have written out all entries before the roll point. + // 3. all entries have been acked. + if (rollPromise != null && waitingAppendEntries.isEmpty() && unackedEntries.isEmpty()) { + rollPromise.trySuccess(null); + rollPromise = null; + } + } + + private void sync(final AsyncWriter writer, final long processedTxid) { + fileLengthAtLastSync = writer.getLength(); + final long startTimeNs = System.nanoTime(); + writer.sync(new CompletionHandler<Long, Void>() { + + @Override + public void completed(Long result, Void attachment) { + highestSyncedTxid.set(processedTxid); + int syncCount = finishSync(true); + for (Iterator<FSWALEntry> iter = unackedEntries.iterator(); iter.hasNext();) { + if (iter.next().getTxid() <= processedTxid) { + iter.remove(); + } else { + break; + } + } + postSync(System.nanoTime() - startTimeNs, syncCount); + tryFinishRoll(); + if (!rollWriterLock.tryLock()) { + return; + } + try { + if (writer.getLength() >= logrollsize) { + requestLogRoll(); + } + } finally { + rollWriterLock.unlock(); + } + } + + @Override + public void failed(Throwable exc, Void attachment) { + LOG.warn("sync failed", exc); + // Here we depends on the implementation of FanOutOneBlockAsyncDFSOutput and netty. + // When error occur, FanOutOneBlockAsyncDFSOutput will fail all pending flush requests. It + // is execute inside EventLoop. And in DefaultPromise in netty, it will notifyListener + // directly if it is already in the EventLoop thread. And in the listener method, it will + // call us. So here we know that all failed flush request will call us continuously, and + // before the last one finish, no other task can be executed in EventLoop. So here we are + // safe to use writerBroken as a guard. + // Do not forget to revisit this if we change the implementation of + // FanOutOneBlockAsyncDFSOutput! + synchronized (waitingConsumePayloads) { + if (writerBroken) { + return; + } + // schedule a periodical task to check if log roller is exited. Otherwise the the sync + // request maybe blocked forever since we are still waiting for a new writer to write the + // pending data and sync it... + logRollerExitedChecker = new LogRollerExitedChecker(); + // we are currently in the EventLoop thread, so it is safe to set the future after + // schedule it since the task can not be executed before we release the thread. + logRollerExitedChecker.setFuture(eventLoop.scheduleAtFixedRate(logRollerExitedChecker, + logRollerExitedCheckIntervalMs, logRollerExitedCheckIntervalMs, TimeUnit.MILLISECONDS)); + writerBroken = true; + } + for (Iterator<FSWALEntry> iter = unackedEntries.descendingIterator(); iter.hasNext();) { + waitingAppendEntries.addFirst(iter.next()); + } + highestUnsyncedTxid = highestSyncedTxid.get(); + if (rollPromise != null) { + rollPromise.trySuccess(null); + rollPromise = null; + return; + } + // request a roll. + if (!rollWriterLock.tryLock()) { + return; + } + try { + requestLogRoll(); + } finally { + rollWriterLock.unlock(); + } + } + }, null); + } + + private void addTimeAnnotation(SyncFuture future, String annotation) { + TraceScope scope = Trace.continueSpan(future.getSpan()); + Trace.addTimelineAnnotation(annotation); + future.setSpan(scope.detach()); + } + + private int finishSync(boolean addSyncTrace) { + long doneTxid = highestSyncedTxid.get(); + int finished = 0; + for (SyncFuture future; (future = syncFutures.peek()) != null;) { + if (future.getTxid() <= doneTxid) { + future.done(doneTxid, null); + syncFutures.remove(); + finished++; + addTimeAnnotation(future, "writer synced"); + } else { + break; + } + } + return finished; + } + + private void consume() { + final AsyncWriter writer = this.writer; + // maybe a sync request is not queued when we issue a sync, so check here to see if we could + // finish some. + finishSync(false); + long newHighestProcessedTxid = -1L; + for (Iterator<FSWALEntry> iter = waitingAppendEntries.iterator(); iter.hasNext();) { + FSWALEntry entry = iter.next(); + boolean appended; + try { + appended = append(writer, entry); + } catch (IOException e) { + throw new AssertionError("should not happen", e); + } + newHighestProcessedTxid = entry.getTxid(); + iter.remove(); + if (appended) { + unackedEntries.addLast(entry); + if (writer.getLength() - fileLengthAtLastSync >= batchSize) { + break; + } + } + } + // if we have a newer transaction id, update it. + // otherwise, use the previous transaction id. + if (newHighestProcessedTxid > 0) { + highestProcessedTxid = newHighestProcessedTxid; + } else { + newHighestProcessedTxid = highestProcessedTxid; + } + if (writer.getLength() - fileLengthAtLastSync >= batchSize) { + // sync because buffer size limit. + sync(writer, newHighestProcessedTxid); + } else if ((!syncFutures.isEmpty() || rollPromise != null) + && writer.getLength() > fileLengthAtLastSync) { + // first we should have at least one sync request or a roll request + // second we should have some unsynced data. + sync(writer, newHighestProcessedTxid); + } else if (writer.getLength() == fileLengthAtLastSync) { + // we haven't written anything out, just advance the highestSyncedSequence since we may only + // stamped some region sequence id. + highestSyncedTxid.set(newHighestProcessedTxid); + finishSync(false); + tryFinishRoll(); + } + } + + private static final Comparator<SyncFuture> SEQ_COMPARATOR = new Comparator<SyncFuture>() { + + @Override + public int compare(SyncFuture o1, SyncFuture o2) { + int c = Long.compare(o1.getTxid(), o2.getTxid()); + return c != 0 ? c : Integer.compare(System.identityHashCode(o1), System.identityHashCode(o2)); + } + }; + + private final Runnable consumer = new Runnable() { + + @Override + public void run() { + synchronized (waitingConsumePayloads) { + assert consumerScheduled; + if (writerBroken) { + // waiting for reschedule after rollWriter. + consumerScheduled = false; + return; + } + if (waitingRoll) { + // we may have toWriteEntries if the consume method does not write all pending entries + // out, this is usually happen if we have too many toWriteEntries that exceeded the + // batchSize limit. + if (waitingAppendEntries.isEmpty()) { + consumerScheduled = false; + return; + } + } else { + for (Payload p; (p = waitingConsumePayloads.pollFirst()) != null;) { + if (p.entry != null) { + waitingAppendEntries.addLast(p.entry); + } else if (p.sync != null) { + syncFutures.add(p.sync); + } else { + rollPromise = p.roll; + waitingRoll = true; + break; + } + } + } + } + consume(); + synchronized (waitingConsumePayloads) { + if (waitingRoll) { + if (waitingAppendEntries.isEmpty()) { + consumerScheduled = false; + return; + } + } else { + if (waitingConsumePayloads.isEmpty() && waitingAppendEntries.isEmpty()) { + consumerScheduled = false; + return; + } + } + } + // reschedule if we still have something to write. + eventLoop.execute(this); + } + }; + + private boolean shouldScheduleConsumer() { + if (writerBroken || waitingRoll) { + return false; + } + if (consumerScheduled) { + return false; + } + consumerScheduled = true; + return true; + } + + @Override + public long append(HRegionInfo hri, WALKey key, WALEdit edits, boolean inMemstore) + throws IOException { + boolean scheduleTask; + long txid; + synchronized (waitingConsumePayloads) { + if (this.closed) { + throw new IOException("Cannot append; log is closed"); + } + txid = nextTxid++; + FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore); + scheduleTask = shouldScheduleConsumer(); + waitingConsumePayloads.add(new Payload(entry)); + } + if (scheduleTask) { + eventLoop.execute(consumer); + } + return txid; + } + + @Override + public void sync() throws IOException { + TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); + try { + SyncFuture future; + boolean scheduleTask; + synchronized (waitingConsumePayloads) { + scheduleTask = shouldScheduleConsumer(); + future = getSyncFuture(nextTxid - 1, scope.detach()); + waitingConsumePayloads.addLast(new Payload(future)); + } + if (scheduleTask) { + eventLoop.execute(consumer); + } + scope = Trace.continueSpan(blockOnSync(future)); + } finally { + assert scope == NullScope.INSTANCE || !scope.isDetached(); + scope.close(); + } + } + + @Override + public void sync(long txid) throws IOException { + if (highestSyncedTxid.get() >= txid) { + return; + } + TraceScope scope = Trace.startSpan("AsyncFSWAL.sync"); + try { + SyncFuture future = getSyncFuture(txid, scope.detach()); + boolean scheduleTask; + synchronized (waitingConsumePayloads) { + scheduleTask = shouldScheduleConsumer(); + waitingConsumePayloads.addLast(new Payload(future)); + } + if (scheduleTask) { + eventLoop.execute(consumer); + } + scope = Trace.continueSpan(blockOnSync(future)); + } finally { + assert scope == NullScope.INSTANCE || !scope.isDetached(); + scope.close(); + } + } + + @Override + public void logRollerExited() { + logRollerExited = true; + } + + @Override + protected AsyncWriter createWriterInstance(Path path) throws IOException { + boolean overwrite = false; + for (int retry = 0;; retry++) { + try { + return AsyncFSWALProvider.createAsyncWriter(conf, fs, path, overwrite, eventLoop); + } catch (RemoteException e) { + LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e); + if (shouldRetryCreate(e)) { + if (retry >= createMaxRetries) { + break; + } + } else { + throw e.unwrapRemoteException(); + } + } catch (NameNodeException e) { + throw e; + } catch (IOException e) { + LOG.warn("create wal log writer " + path + " failed, retry = " + retry, e); + if (retry >= createMaxRetries) { + break; + } + // overwrite the old broken file. + overwrite = true; + try { + Thread.sleep(ConnectionUtils.getPauseTime(100, retry)); + } catch (InterruptedException ie) { + throw new InterruptedIOException(); + } + } + } + throw new IOException("Failed to create wal log writer " + path + " after retrying " + + createMaxRetries + " time(s)"); + } + + private void waitForSafePoint() { + Future<Void> roll; + boolean scheduleTask; + synchronized (waitingConsumePayloads) { + if (!writerBroken && this.writer != null) { + Promise<Void> promise = eventLoop.newPromise(); + if (consumerScheduled) { + scheduleTask = false; + } else { + scheduleTask = consumerScheduled = true; + } + waitingConsumePayloads.addLast(new Payload(promise)); + roll = promise; + } else { + roll = eventLoop.newSucceededFuture(null); + scheduleTask = false; + } + } + if (scheduleTask) { + eventLoop.execute(consumer); + } + roll.awaitUninterruptibly(); + } + + @Override + protected long doReplaceWriter(Path oldPath, Path newPath, AsyncWriter nextWriter) + throws IOException { + waitForSafePoint(); + final AsyncWriter oldWriter = this.writer; + this.writer = nextWriter; + if (nextWriter != null && nextWriter instanceof AsyncProtobufLogWriter) { + this.hdfsOut = ((AsyncProtobufLogWriter) nextWriter).getOutput(); + } + this.fileLengthAtLastSync = 0L; + boolean scheduleTask; + synchronized (waitingConsumePayloads) { + writerBroken = waitingRoll = false; + if (logRollerExitedChecker != null) { + logRollerExitedChecker.cancel(); + logRollerExitedChecker = null; + } + if (consumerScheduled) { + scheduleTask = false; + } else { + if (waitingConsumePayloads.isEmpty() && waitingAppendEntries.isEmpty()) { + scheduleTask = false; + } else { + scheduleTask = consumerScheduled = true; + } + } + } + if (scheduleTask) { + eventLoop.execute(consumer); + } + long oldFileLen; + if (oldWriter != null) { + oldFileLen = oldWriter.getLength(); + closeExecutor.execute(new Runnable() { + + @Override + public void run() { + try { + oldWriter.close(); + } catch (IOException e) { + LOG.warn("close old writer failed", e); + } + } + }); + } else { + oldFileLen = 0L; + } + return oldFileLen; + } + + @Override + protected void doShutdown() throws IOException { + waitForSafePoint(); + this.writer.close(); + this.writer = null; + closeExecutor.shutdown(); + } + + @Override + protected void doAppend(AsyncWriter writer, FSWALEntry entry) { + writer.append(entry); + } + + @Override + DatanodeInfo[] getPipeline() { + FanOutOneBlockAsyncDFSOutput output = this.hdfsOut; + return output != null ? output.getPipeline() : new DatanodeInfo[0]; + } + + @Override + int getLogReplication() { + return getPipeline().length; + } +} http://git-wip-us.apache.org/repos/asf/hbase/blob/c96b642f/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java ---------------------------------------------------------------------- diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java new file mode 100644 index 0000000..894f3dd --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncProtobufLogWriter.java @@ -0,0 +1,203 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.wal; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.io.OutputStream; +import java.nio.channels.CompletionHandler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.io.ByteArrayOutputStream; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALHeader; +import org.apache.hadoop.hbase.protobuf.generated.WALProtos.WALTrailer; +import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutput; +import org.apache.hadoop.hbase.util.FanOutOneBlockAsyncDFSOutputHelper; +import org.apache.hadoop.hbase.wal.AsyncFSWALProvider; +import org.apache.hadoop.hbase.wal.WAL.Entry; +import org.apache.hadoop.hdfs.DistributedFileSystem; + +import com.google.common.base.Throwables; +import com.google.common.primitives.Ints; + +import io.netty.channel.EventLoop; + +/** + * AsyncWriter for protobuf-based WAL. + */ +@InterfaceAudience.Private +public class AsyncProtobufLogWriter extends AbstractProtobufLogWriter implements + AsyncFSWALProvider.AsyncWriter { + + private static final Log LOG = LogFactory.getLog(AsyncProtobufLogWriter.class); + + private static final class BlockingCompletionHandler implements CompletionHandler<Long, Void> { + + private long size; + + private Throwable error; + + private boolean finished; + + @Override + public void completed(Long result, Void attachment) { + synchronized (this) { + size = result.longValue(); + finished = true; + notifyAll(); + } + } + + @Override + public void failed(Throwable exc, Void attachment) { + synchronized (this) { + error = exc; + finished = true; + notifyAll(); + } + } + + public long get() throws IOException { + synchronized (this) { + while (!finished) { + try { + wait(); + } catch (InterruptedException e) { + throw new InterruptedIOException(); + } + } + if (error != null) { + Throwables.propagateIfPossible(error, IOException.class); + throw new RuntimeException(error); + } + return size; + } + } + } + + private final EventLoop eventLoop; + + private FanOutOneBlockAsyncDFSOutput output; + + private ByteArrayOutputStream buf; + + public AsyncProtobufLogWriter(EventLoop eventLoop) { + this.eventLoop = eventLoop; + } + + @Override + public void append(Entry entry) { + buf.reset(); + entry.setCompressionContext(compressionContext); + try { + entry.getKey().getBuilder(compressor).setFollowingKvCount(entry.getEdit().size()).build() + .writeDelimitedTo(buf); + } catch (IOException e) { + throw new AssertionError("should not happen", e); + } + length.addAndGet(buf.size()); + output.write(buf.getBuffer(), 0, buf.size()); + try { + for (Cell cell : entry.getEdit().getCells()) { + buf.reset(); + cellEncoder.write(cell); + length.addAndGet(buf.size()); + output.write(buf.getBuffer(), 0, buf.size()); + } + } catch (IOException e) { + throw new AssertionError("should not happen", e); + } + } + + @Override + public <A> void sync(CompletionHandler<Long, A> handler, A attachment) { + output.flush(attachment, handler, false); + } + + @Override + public synchronized void close() throws IOException { + if (this.output == null) { + return; + } + try { + writeWALTrailer(); + output.close(); + } catch (Exception e) { + LOG.warn("normal close failed, try recover", e); + output.recoverAndClose(null); + } + this.output = null; + } + + public FanOutOneBlockAsyncDFSOutput getOutput() { + return this.output; + } + + @Override + protected void initOutput(FileSystem fs, Path path, boolean overwritable, int bufferSize, + short replication, long blockSize) throws IOException { + this.output = + FanOutOneBlockAsyncDFSOutputHelper.createOutput((DistributedFileSystem) fs, path, + overwritable, false, replication, blockSize, eventLoop); + this.buf = new ByteArrayOutputStream(); + } + + @Override + protected long writeMagicAndWALHeader(byte[] magic, WALHeader header) throws IOException { + buf.reset(); + header.writeDelimitedTo(buf); + final BlockingCompletionHandler handler = new BlockingCompletionHandler(); + eventLoop.execute(new Runnable() { + + @Override + public void run() { + output.write(ProtobufLogReader.PB_WAL_MAGIC); + output.write(buf.getBuffer(), 0, buf.size()); + output.flush(null, handler, false); + } + }); + return handler.get(); + } + + @Override + protected long writeWALTrailerAndMagic(WALTrailer trailer, final byte[] magic) + throws IOException { + buf.reset(); + trailer.writeTo(buf); + final BlockingCompletionHandler handler = new BlockingCompletionHandler(); + eventLoop.execute(new Runnable() { + public void run() { + output.write(buf.getBuffer(), 0, buf.size()); + output.write(Ints.toByteArray(buf.size())); + output.write(magic); + output.flush(null, handler, false); + } + }); + return handler.get(); + } + + @Override + protected OutputStream getOutputStreamForCellEncoder() { + return buf; + } +}