HDFS-7964. Add support for async edit logging. Contributed by Daryn Sharp.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/21517168 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/21517168 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/21517168 Branch: refs/heads/trunk Commit: 2151716832ad14932dd65b1a4e47e64d8d6cd767 Parents: 0fa54d4 Author: Jing Zhao <ji...@apache.org> Authored: Mon Feb 29 15:34:43 2016 -0800 Committer: Jing Zhao <ji...@apache.org> Committed: Mon Feb 29 15:34:43 2016 -0800 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../bkjournal/TestBookKeeperAsHASharedDir.java | 46 ++- .../src/test/resources/log4j.properties | 2 +- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 6 +- .../hadoop/hdfs/server/namenode/BackupNode.java | 4 + .../hadoop/hdfs/server/namenode/FSEditLog.java | 109 ++++--- .../hdfs/server/namenode/FSEditLogAsync.java | 322 +++++++++++++++++++ .../hdfs/server/namenode/FSEditLogOp.java | 213 ++++++------ .../hdfs/server/namenode/FSEditLogOpCodes.java | 108 ++++--- .../hadoop/hdfs/server/namenode/FSImage.java | 2 +- .../hadoop/hdfs/server/namenode/NameNode.java | 1 - .../namenode/metrics/NameNodeMetrics.java | 4 +- .../org/apache/hadoop/hdfs/DFSTestUtil.java | 3 + .../hdfs/server/namenode/TestAuditLogs.java | 15 +- .../hdfs/server/namenode/TestEditLog.java | 59 +++- .../server/namenode/TestEditLogAutoroll.java | 26 ++ .../namenode/TestEditLogJournalFailures.java | 35 +- .../hdfs/server/namenode/TestEditLogRace.java | 144 +++++---- .../server/namenode/TestFSEditLogLoader.java | 37 ++- .../server/namenode/TestNameNodeRecovery.java | 31 +- .../server/namenode/ha/TestEditLogTailer.java | 39 ++- 21 files changed, 904 insertions(+), 304 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3d57efa..c3ea5ce 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -1037,6 +1037,8 @@ Release 2.9.0 - UNRELEASED HDFS-9754. Avoid unnecessary getBlockCollection calls in BlockManager. (jing9) + HDFS-7964. Add support for async edit logging. (Daryn Sharp) + OPTIMIZATIONS BUG FIXES http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java index 5611bb8..ff8c00d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/java/org/apache/hadoop/contrib/bkjournal/TestBookKeeperAsHASharedDir.java @@ -24,6 +24,9 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.AfterClass; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; @@ -56,11 +59,14 @@ import org.apache.commons.logging.LogFactory; import java.io.File; import java.io.IOException; import java.net.URISyntaxException; +import java.util.ArrayList; +import java.util.Collection; /** * Integration test to ensure that the BookKeeper JournalManager * works for HDFS Namenode HA */ +@RunWith(Parameterized.class) public class TestBookKeeperAsHASharedDir { static final Log LOG = LogFactory.getLog(TestBookKeeperAsHASharedDir.class); @@ -69,6 +75,27 @@ public class TestBookKeeperAsHASharedDir { private static final String TEST_FILE_DATA = "HA BookKeeperJournalManager"; + @Parameters + public static Collection<Object[]> data() { + Collection<Object[]> params = new ArrayList<Object[]>(); + params.add(new Object[]{ Boolean.FALSE }); + params.add(new Object[]{ Boolean.TRUE }); + return params; + } + + private static boolean useAsyncEditLog; + public TestBookKeeperAsHASharedDir(Boolean async) { + useAsyncEditLog = async; + } + + private static Configuration getConf() { + Configuration conf = new Configuration(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); + return conf; + } + @BeforeClass public static void setupBookkeeper() throws Exception { bkutil = new BKJMUtil(numBookies); @@ -92,8 +119,7 @@ public class TestBookKeeperAsHASharedDir { public void testFailoverWithBK() throws Exception { MiniDFSCluster cluster = null; try { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + Configuration conf = getConf(); conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil.createJournalURI("/hotfailover").toString()); BKJMUtil.addJournalManagerDefinition(conf); @@ -144,8 +170,7 @@ public class TestBookKeeperAsHASharedDir { MiniDFSCluster cluster = null; try { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + Configuration conf = getConf(); conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil.createJournalURI("/hotfailoverWithFail").toString()); conf.setInt(BookKeeperJournalManager.BKJM_BOOKKEEPER_ENSEMBLE_SIZE, @@ -221,8 +246,7 @@ public class TestBookKeeperAsHASharedDir { MiniDFSCluster cluster = null; try { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + Configuration conf = getConf(); conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil.createJournalURI("/hotfailoverMultiple").toString()); BKJMUtil.addJournalManagerDefinition(conf); @@ -245,7 +269,9 @@ public class TestBookKeeperAsHASharedDir { fs = cluster.getFileSystem(0); // get the older active server. try { - fs.delete(p1, true); + System.out.println("DMS: > *************"); + boolean foo = fs.delete(p1, true); + System.out.println("DMS: < ************* "+foo); fail("Log update on older active should cause it to exit"); } catch (RemoteException re) { assertTrue(re.getClassName().contains("ExitException")); @@ -267,9 +293,8 @@ public class TestBookKeeperAsHASharedDir { public void testInitializeBKSharedEdits() throws Exception { MiniDFSCluster cluster = null; try { - Configuration conf = new Configuration(); + Configuration conf = getConf(); HAUtil.setAllowStandbyReads(conf, true); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology(); cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology) @@ -358,8 +383,7 @@ public class TestBookKeeperAsHASharedDir { public void testNameNodeMultipleSwitchesUsingBKJM() throws Exception { MiniDFSCluster cluster = null; try { - Configuration conf = new Configuration(); - conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + Configuration conf = getConf(); conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil .createJournalURI("/correctEditLogSelection").toString()); BKJMUtil.addJournalManagerDefinition(conf); http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties index 93c22f7..52aac43 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/test/resources/log4j.properties @@ -26,7 +26,7 @@ # Format is "<default threshold> (, <appender>)+ # DEFAULT: console appender only -log4j.rootLogger=OFF, CONSOLE +log4j.rootLogger=DEBUG, CONSOLE # Example with rolling log file #log4j.rootLogger=DEBUG, CONSOLE, ROLLINGFILE http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index bfb6203..9c06e29 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -291,7 +291,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH = "dfs.namenode.edits.noeditlogchannelflush"; public static final boolean DFS_NAMENODE_EDITS_NOEDITLOGCHANNELFLUSH_DEFAULT = false; - + + public static final String DFS_NAMENODE_EDITS_ASYNC_LOGGING = + "dfs.namenode.edits.asynclogging"; + public static final boolean DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT = false; + public static final String DFS_LIST_LIMIT = "dfs.ls.limit"; public static final int DFS_LIST_LIMIT_DEFAULT = 1000; public static final String DFS_CONTENT_SUMMARY_LIMIT_KEY = "dfs.content-summary.limit"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java index 36053f7..d36e0b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java @@ -142,6 +142,10 @@ public class BackupNode extends NameNode { @Override // NameNode protected void initialize(Configuration conf) throws IOException { + // async edit logs are incompatible with backup node due to race + // conditions resulting from laxer synchronization + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, false); + // Trash is disabled in BackupNameNode, // but should be turned back on if it ever becomes active. conf.setLong(CommonConfigurationKeys.FS_TRASH_INTERVAL_KEY, http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index c8986dc..809d9e6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -78,7 +78,9 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeFinalizeOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeOp; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RollingUpgradeStartOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetAclOp; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op; @@ -115,7 +117,7 @@ import com.google.common.collect.Lists; @InterfaceStability.Evolving public class FSEditLog implements LogsPurgeable { - static final Log LOG = LogFactory.getLog(FSEditLog.class); + public static final Log LOG = LogFactory.getLog(FSEditLog.class); /** * State machine for edit log. @@ -178,17 +180,11 @@ public class FSEditLog implements LogsPurgeable { private final NNStorage storage; private final Configuration conf; - + private final List<URI> editsDirs; - private final ThreadLocal<OpInstanceCache> cache = - new ThreadLocal<OpInstanceCache>() { - @Override - protected OpInstanceCache initialValue() { - return new OpInstanceCache(); - } - }; - + protected final OpInstanceCache cache = new OpInstanceCache(); + /** * The edit directories that are shared between primary and secondary. */ @@ -217,6 +213,17 @@ public class FSEditLog implements LogsPurgeable { } }; + static FSEditLog newInstance(Configuration conf, NNStorage storage, + List<URI> editsDirs) { + boolean asyncEditLogging = conf.getBoolean( + DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING_DEFAULT); + LOG.info("Edit logging is async:" + asyncEditLogging); + return asyncEditLogging + ? new FSEditLogAsync(conf, storage, editsDirs) + : new FSEditLog(conf, storage, editsDirs); + } + /** * Constructor for FSEditLog. Underlying journals are constructed, but * no streams are opened until open() is called. @@ -423,33 +430,35 @@ public class FSEditLog implements LogsPurgeable { // wait if an automatic sync is scheduled waitIfAutoSyncScheduled(); - - long start = beginTransaction(); - op.setTransactionId(txid); - - try { - editLogStream.write(op); - } catch (IOException ex) { - // All journals failed, it is handled in logSync. - } finally { - op.reset(); - } - endTransaction(start); - // check if it is time to schedule an automatic sync - needsSync = shouldForceSync(); + needsSync = doEditTransaction(op); if (needsSync) { isAutoSyncScheduled = true; } } - + // Sync the log if an automatic sync is required. if (needsSync) { logSync(); } } + synchronized boolean doEditTransaction(final FSEditLogOp op) { + long start = beginTransaction(); + op.setTransactionId(txid); + + try { + editLogStream.write(op); + } catch (IOException ex) { + // All journals failed, it is handled in logSync. + } finally { + op.reset(); + } + endTransaction(start); + return shouldForceSync(); + } + /** * Wait if an automatic sync is scheduled */ @@ -544,15 +553,10 @@ public class FSEditLog implements LogsPurgeable { * else more operations can start writing while this is in progress. */ void logSyncAll() { - // Record the most recent transaction ID as our own id - synchronized (this) { - TransactionId id = myTransactionId.get(); - id.txid = txid; - } - // Then make sure we're synced up to this point - logSync(); + // Make sure we're synced up to the most recent transaction ID. + logSync(getLastWrittenTxId()); } - + /** * Sync all modifications done by this thread. * @@ -582,12 +586,14 @@ public class FSEditLog implements LogsPurgeable { * waitForSyncToFinish() before assuming they are running alone. */ public void logSync() { - long syncStart = 0; + // Fetch the transactionId of this thread. + logSync(myTransactionId.get().txid); + } - // Fetch the transactionId of this thread. - long mytxid = myTransactionId.get().txid; - + protected void logSync(long mytxid) { + long syncStart = 0; boolean sync = false; + long editsBatchedInSync = 0; try { EditLogOutputStream logStream = null; synchronized (this) { @@ -606,19 +612,17 @@ public class FSEditLog implements LogsPurgeable { // If this transaction was already flushed, then nothing to do // if (mytxid <= synctxid) { - numTransactionsBatchedInSync++; - if (metrics != null) { - // Metrics is non-null only when used inside name node - metrics.incrTransactionsBatchedInSync(); - } return; } - - // now, this thread will do the sync + + // now, this thread will do the sync. track if other edits were + // included in the sync - ie. batched. if this is the only edit + // synced then the batched count is 0 + editsBatchedInSync = txid - synctxid - 1; syncStart = txid; isSyncRunning = true; sync = true; - + // swap buffers try { if (journalSet.isEmpty()) { @@ -667,6 +671,8 @@ public class FSEditLog implements LogsPurgeable { if (metrics != null) { // Metrics non-null only when used inside name node metrics.addSync(elapsed); + metrics.incrTransactionsBatchedInSync(editsBatchedInSync); + numTransactionsBatchedInSync += editsBatchedInSync; } } finally { @@ -1138,13 +1144,13 @@ public class FSEditLog implements LogsPurgeable { } void logStartRollingUpgrade(long startTime) { - RollingUpgradeOp op = RollingUpgradeOp.getStartInstance(cache.get()); + RollingUpgradeStartOp op = RollingUpgradeStartOp.getInstance(cache.get()); op.setTime(startTime); logEdit(op); } void logFinalizeRollingUpgrade(long finalizeTime) { - RollingUpgradeOp op = RollingUpgradeOp.getFinalizeInstance(cache.get()); + RollingUpgradeOp op = RollingUpgradeFinalizeOp.getInstance(cache.get()); op.setTime(finalizeTime); logEdit(op); } @@ -1313,8 +1319,9 @@ public class FSEditLog implements LogsPurgeable { if (writeEndTxn) { logEdit(LogSegmentOp.getInstance(cache.get(), FSEditLogOpCodes.OP_END_LOG_SEGMENT)); - logSync(); } + // always sync to ensure all edits are flushed. + logSyncAll(); printStatistics(true); @@ -1701,6 +1708,12 @@ public class FSEditLog implements LogsPurgeable { } } + @VisibleForTesting + // needed by async impl to restart thread when edit log is replaced by a + // spy because a spy is a shallow copy + public void restart() { + } + /** * Return total number of syncs happened on this edit log. * @return long - count http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java new file mode 100644 index 0000000..c14a310 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogAsync.java @@ -0,0 +1,322 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayDeque; +import java.util.Deque; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.util.ExitUtil; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +class FSEditLogAsync extends FSEditLog implements Runnable { + static final Log LOG = LogFactory.getLog(FSEditLog.class); + + // use separate mutex to avoid possible deadlock when stopping the thread. + private final Object syncThreadLock = new Object(); + private Thread syncThread; + private static ThreadLocal<Edit> threadEdit = new ThreadLocal<Edit>(); + + // requires concurrent access from caller threads and syncing thread. + private final BlockingQueue<Edit> editPendingQ = + new ArrayBlockingQueue<Edit>(4096); + + // only accessed by syncing thread so no synchronization required. + // queue is unbounded because it's effectively limited by the size + // of the edit log buffer - ie. a sync will eventually be forced. + private final Deque<Edit> syncWaitQ = new ArrayDeque<Edit>(); + + FSEditLogAsync(Configuration conf, NNStorage storage, List<URI> editsDirs) { + super(conf, storage, editsDirs); + // op instances cannot be shared due to queuing for background thread. + cache.disableCache(); + } + + private boolean isSyncThreadAlive() { + synchronized(syncThreadLock) { + return syncThread != null && syncThread.isAlive(); + } + } + + private void startSyncThread() { + synchronized(syncThreadLock) { + if (!isSyncThreadAlive()) { + syncThread = new Thread(this, this.getClass().getSimpleName()); + syncThread.start(); + } + } + } + + private void stopSyncThread() { + synchronized(syncThreadLock) { + if (syncThread != null) { + try { + syncThread.interrupt(); + syncThread.join(); + } catch (InterruptedException e) { + // we're quitting anyway. + } finally { + syncThread = null; + } + } + } + } + + @VisibleForTesting + @Override + public void restart() { + stopSyncThread(); + startSyncThread(); + } + + @Override + void openForWrite(int layoutVersion) throws IOException { + try { + startSyncThread(); + super.openForWrite(layoutVersion); + } catch (IOException ioe) { + stopSyncThread(); + throw ioe; + } + } + + @Override + public void close() { + super.close(); + stopSyncThread(); + } + + @Override + void logEdit(final FSEditLogOp op) { + Edit edit = getEditInstance(op); + threadEdit.set(edit); + enqueueEdit(edit); + } + + @Override + public void logSync() { + Edit edit = threadEdit.get(); + if (edit != null) { + // do NOT remove to avoid expunge & rehash penalties. + threadEdit.set(null); + if (LOG.isDebugEnabled()) { + LOG.debug("logSync " + edit); + } + edit.logSyncWait(); + } + } + + @Override + public void logSyncAll() { + // doesn't actually log anything, just ensures that the queues are + // drained when it returns. + Edit edit = new SyncEdit(this, null){ + @Override + public boolean logEdit() { + return true; + } + }; + enqueueEdit(edit); + edit.logSyncWait(); + } + + private void enqueueEdit(Edit edit) { + if (LOG.isDebugEnabled()) { + LOG.debug("logEdit " + edit); + } + try { + if (!editPendingQ.offer(edit, 1, TimeUnit.SECONDS)) { + Preconditions.checkState( + isSyncThreadAlive(), "sync thread is not alive"); + editPendingQ.put(edit); + } + } catch (Throwable t) { + // should never happen! failure to enqueue an edit is fatal + terminate(t); + } + } + + private Edit dequeueEdit() throws InterruptedException { + // only block for next edit if no pending syncs. + return syncWaitQ.isEmpty() ? editPendingQ.take() : editPendingQ.poll(); + } + + @Override + public void run() { + try { + while (true) { + boolean doSync; + Edit edit = dequeueEdit(); + if (edit != null) { + // sync if requested by edit log. + doSync = edit.logEdit(); + syncWaitQ.add(edit); + } else { + // sync when editq runs dry, but have edits pending a sync. + doSync = !syncWaitQ.isEmpty(); + } + if (doSync) { + // normally edit log exceptions cause the NN to terminate, but tests + // relying on ExitUtil.terminate need to see the exception. + RuntimeException syncEx = null; + try { + logSync(getLastWrittenTxId()); + } catch (RuntimeException ex) { + syncEx = ex; + } + while ((edit = syncWaitQ.poll()) != null) { + edit.logSyncNotify(syncEx); + } + } + } + } catch (InterruptedException ie) { + LOG.info(Thread.currentThread().getName() + " was interrupted, exiting"); + } catch (Throwable t) { + terminate(t); + } + } + + private void terminate(Throwable t) { + String message = "Exception while edit logging: "+t.getMessage(); + LOG.fatal(message, t); + ExitUtil.terminate(1, message); + } + + private Edit getEditInstance(FSEditLogOp op) { + final Edit edit; + final Server.Call rpcCall = Server.getCurCall().get(); + // only rpc calls not explicitly sync'ed on the log will be async. + if (rpcCall != null && !Thread.holdsLock(this)) { + edit = new RpcEdit(this, op, rpcCall); + } else { + edit = new SyncEdit(this, op); + } + return edit; + } + + private abstract static class Edit { + final FSEditLog log; + final FSEditLogOp op; + + Edit(FSEditLog log, FSEditLogOp op) { + this.log = log; + this.op = op; + } + + // return whether edit log wants to sync. + boolean logEdit() { + return log.doEditTransaction(op); + } + + // wait for background thread to finish syncing. + abstract void logSyncWait(); + // wake up the thread in logSyncWait. + abstract void logSyncNotify(RuntimeException ex); + } + + // the calling thread is synchronously waiting for the edit to complete. + private static class SyncEdit extends Edit { + private final Object lock; + private boolean done = false; + private RuntimeException syncEx; + + SyncEdit(FSEditLog log, FSEditLogOp op) { + super(log, op); + // if the log is already sync'ed (ex. log rolling), must wait on it to + // avoid deadlock with sync thread. the fsn lock protects against + // logging during a roll. else lock on this object to avoid sync + // contention on edit log. + lock = Thread.holdsLock(log) ? log : this; + } + + @Override + public void logSyncWait() { + synchronized(lock) { + while (!done) { + try { + lock.wait(10); + } catch (InterruptedException e) {} + } + // only needed by tests that rely on ExitUtil.terminate() since + // normally exceptions terminate the NN. + if (syncEx != null) { + syncEx.fillInStackTrace(); + throw syncEx; + } + } + } + + @Override + public void logSyncNotify(RuntimeException ex) { + synchronized(lock) { + done = true; + syncEx = ex; + lock.notifyAll(); + } + } + + @Override + public String toString() { + return "["+getClass().getSimpleName()+" op:"+op+"]"; + } + } + + // the calling rpc thread will return immediately from logSync but the + // rpc response will not be sent until the edit is durable. + private static class RpcEdit extends Edit { + private final Server.Call call; + + RpcEdit(FSEditLog log, FSEditLogOp op, Server.Call call) { + super(log, op); + this.call = call; + call.postponeResponse(); + } + + @Override + public void logSyncWait() { + // logSync is a no-op to immediately free up rpc handlers. the + // response is sent when the sync thread calls syncNotify. + } + + @Override + public void logSyncNotify(RuntimeException syncEx) { + try { + if (syncEx == null) { + call.sendResponse(); + } else { + call.abortResponse(syncEx); + } + } catch (Exception e) {} // don't care if not sent. + } + + @Override + public String toString() { + return "["+getClass().getSimpleName()+" op:"+op+" call:"+call+"]"; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index a8389f0..c4e1a78 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -147,6 +147,55 @@ public abstract class FSEditLogOp { byte[] rpcClientId; int rpcCallId; + public static class OpInstanceCache { + private static ThreadLocal<OpInstanceCacheMap> cache = + new ThreadLocal<OpInstanceCacheMap>() { + @Override + protected OpInstanceCacheMap initialValue() { + return new OpInstanceCacheMap(); + } + }; + + @SuppressWarnings("serial") + static final class OpInstanceCacheMap extends + EnumMap<FSEditLogOpCodes, FSEditLogOp> { + OpInstanceCacheMap() { + super(FSEditLogOpCodes.class); + for (FSEditLogOpCodes opCode : FSEditLogOpCodes.values()) { + put(opCode, newInstance(opCode)); + } + } + } + + private boolean useCache = true; + + void disableCache() { + useCache = false; + } + + public OpInstanceCache get() { + return this; + } + + @SuppressWarnings("unchecked") + public <T extends FSEditLogOp> T get(FSEditLogOpCodes opCode) { + return useCache ? (T)cache.get().get(opCode) : (T)newInstance(opCode); + } + + private static FSEditLogOp newInstance(FSEditLogOpCodes opCode) { + FSEditLogOp instance = null; + Class<? extends FSEditLogOp> clazz = opCode.getOpClass(); + if (clazz != null) { + try { + instance = clazz.newInstance(); + } catch (Exception ex) { + throw new RuntimeException("Failed to instantiate "+opCode, ex); + } + } + return instance; + } + } + final void reset() { txid = HdfsServerConstants.INVALID_TXID; rpcClientId = RpcConstants.DUMMY_CLIENT_ID; @@ -156,70 +205,6 @@ public abstract class FSEditLogOp { abstract void resetSubFields(); - final public static class OpInstanceCache { - private final EnumMap<FSEditLogOpCodes, FSEditLogOp> inst = - new EnumMap<FSEditLogOpCodes, FSEditLogOp>(FSEditLogOpCodes.class); - - public OpInstanceCache() { - inst.put(OP_ADD, new AddOp()); - inst.put(OP_CLOSE, new CloseOp()); - inst.put(OP_SET_REPLICATION, new SetReplicationOp()); - inst.put(OP_CONCAT_DELETE, new ConcatDeleteOp()); - inst.put(OP_RENAME_OLD, new RenameOldOp()); - inst.put(OP_DELETE, new DeleteOp()); - inst.put(OP_MKDIR, new MkdirOp()); - inst.put(OP_SET_GENSTAMP_V1, new SetGenstampV1Op()); - inst.put(OP_SET_PERMISSIONS, new SetPermissionsOp()); - inst.put(OP_SET_OWNER, new SetOwnerOp()); - inst.put(OP_SET_NS_QUOTA, new SetNSQuotaOp()); - inst.put(OP_CLEAR_NS_QUOTA, new ClearNSQuotaOp()); - inst.put(OP_SET_QUOTA, new SetQuotaOp()); - inst.put(OP_TIMES, new TimesOp()); - inst.put(OP_SYMLINK, new SymlinkOp()); - inst.put(OP_RENAME, new RenameOp()); - inst.put(OP_REASSIGN_LEASE, new ReassignLeaseOp()); - inst.put(OP_GET_DELEGATION_TOKEN, new GetDelegationTokenOp()); - inst.put(OP_RENEW_DELEGATION_TOKEN, new RenewDelegationTokenOp()); - inst.put(OP_CANCEL_DELEGATION_TOKEN, new CancelDelegationTokenOp()); - inst.put(OP_UPDATE_MASTER_KEY, new UpdateMasterKeyOp()); - inst.put(OP_START_LOG_SEGMENT, new LogSegmentOp(OP_START_LOG_SEGMENT)); - inst.put(OP_END_LOG_SEGMENT, new LogSegmentOp(OP_END_LOG_SEGMENT)); - inst.put(OP_UPDATE_BLOCKS, new UpdateBlocksOp()); - inst.put(OP_TRUNCATE, new TruncateOp()); - - inst.put(OP_ALLOW_SNAPSHOT, new AllowSnapshotOp()); - inst.put(OP_DISALLOW_SNAPSHOT, new DisallowSnapshotOp()); - inst.put(OP_CREATE_SNAPSHOT, new CreateSnapshotOp()); - inst.put(OP_DELETE_SNAPSHOT, new DeleteSnapshotOp()); - inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp()); - inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op()); - inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp()); - - inst.put(OP_ADD_CACHE_DIRECTIVE, new AddCacheDirectiveInfoOp()); - inst.put(OP_MODIFY_CACHE_DIRECTIVE, new ModifyCacheDirectiveInfoOp()); - inst.put(OP_REMOVE_CACHE_DIRECTIVE, new RemoveCacheDirectiveInfoOp()); - inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp()); - inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp()); - inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp()); - - inst.put(OP_ADD_BLOCK, new AddBlockOp()); - inst.put(OP_SET_ACL, new SetAclOp()); - inst.put(OP_ROLLING_UPGRADE_START, new RollingUpgradeOp( - OP_ROLLING_UPGRADE_START, "start")); - inst.put(OP_ROLLING_UPGRADE_FINALIZE, new RollingUpgradeOp( - OP_ROLLING_UPGRADE_FINALIZE, "finalize")); - inst.put(OP_SET_XATTR, new SetXAttrOp()); - inst.put(OP_REMOVE_XATTR, new RemoveXAttrOp()); - inst.put(OP_SET_STORAGE_POLICY, new SetStoragePolicyOp()); - inst.put(OP_APPEND, new AppendOp()); - inst.put(OP_SET_QUOTA_BY_STORAGETYPE, new SetQuotaByStorageTypeOp()); - } - - public FSEditLogOp get(FSEditLogOpCodes opcode) { - return inst.get(opcode); - } - } - private static ImmutableMap<String, FsAction> fsActionMap() { ImmutableMap.Builder<String, FsAction> b = ImmutableMap.builder(); for (FsAction v : FsAction.values()) @@ -774,7 +759,7 @@ public abstract class FSEditLogOp { * {@link ClientProtocol#append} */ static class AddOp extends AddCloseOp { - private AddOp() { + AddOp() { super(OP_ADD); } @@ -802,7 +787,7 @@ public abstract class FSEditLogOp { * finally log an AddOp. */ static class CloseOp extends AddCloseOp { - private CloseOp() { + CloseOp() { super(OP_CLOSE); } @@ -830,7 +815,7 @@ public abstract class FSEditLogOp { String clientMachine; boolean newBlock; - private AppendOp() { + AppendOp() { super(OP_APPEND); } @@ -920,7 +905,7 @@ public abstract class FSEditLogOp { private Block penultimateBlock; private Block lastBlock; - private AddBlockOp() { + AddBlockOp() { super(OP_ADD_BLOCK); } @@ -1032,7 +1017,7 @@ public abstract class FSEditLogOp { String path; Block[] blocks; - private UpdateBlocksOp() { + UpdateBlocksOp() { super(OP_UPDATE_BLOCKS); } @@ -1125,7 +1110,7 @@ public abstract class FSEditLogOp { String path; short replication; - private SetReplicationOp() { + SetReplicationOp() { super(OP_SET_REPLICATION); } @@ -1204,7 +1189,7 @@ public abstract class FSEditLogOp { long timestamp; final static public int MAX_CONCAT_SRC = 1024 * 1024; - private ConcatDeleteOp() { + ConcatDeleteOp() { super(OP_CONCAT_DELETE); } @@ -1362,7 +1347,7 @@ public abstract class FSEditLogOp { String dst; long timestamp; - private RenameOldOp() { + RenameOldOp() { super(OP_RENAME_OLD); } @@ -1474,7 +1459,7 @@ public abstract class FSEditLogOp { String path; long timestamp; - private DeleteOp() { + DeleteOp() { super(OP_DELETE); } @@ -1575,7 +1560,7 @@ public abstract class FSEditLogOp { List<AclEntry> aclEntries; List<XAttr> xAttrs; - private MkdirOp() { + MkdirOp() { super(OP_MKDIR); } @@ -1748,7 +1733,7 @@ public abstract class FSEditLogOp { static class SetGenstampV1Op extends FSEditLogOp { long genStampV1; - private SetGenstampV1Op() { + SetGenstampV1Op() { super(OP_SET_GENSTAMP_V1); } @@ -1806,7 +1791,7 @@ public abstract class FSEditLogOp { static class SetGenstampV2Op extends FSEditLogOp { long genStampV2; - private SetGenstampV2Op() { + SetGenstampV2Op() { super(OP_SET_GENSTAMP_V2); } @@ -1864,7 +1849,7 @@ public abstract class FSEditLogOp { static class AllocateBlockIdOp extends FSEditLogOp { long blockId; - private AllocateBlockIdOp() { + AllocateBlockIdOp() { super(OP_ALLOCATE_BLOCK_ID); } @@ -1923,7 +1908,7 @@ public abstract class FSEditLogOp { String src; FsPermission permissions; - private SetPermissionsOp() { + SetPermissionsOp() { super(OP_SET_PERMISSIONS); } @@ -1996,7 +1981,7 @@ public abstract class FSEditLogOp { String username; String groupname; - private SetOwnerOp() { + SetOwnerOp() { super(OP_SET_OWNER); } @@ -2083,7 +2068,7 @@ public abstract class FSEditLogOp { String src; long nsQuota; - private SetNSQuotaOp() { + SetNSQuotaOp() { super(OP_SET_NS_QUOTA); } @@ -2141,7 +2126,7 @@ public abstract class FSEditLogOp { static class ClearNSQuotaOp extends FSEditLogOp { String src; - private ClearNSQuotaOp() { + ClearNSQuotaOp() { super(OP_CLEAR_NS_QUOTA); } @@ -2195,7 +2180,7 @@ public abstract class FSEditLogOp { long nsQuota; long dsQuota; - private SetQuotaOp() { + SetQuotaOp() { super(OP_SET_QUOTA); } @@ -2280,7 +2265,7 @@ public abstract class FSEditLogOp { long dsQuota; StorageType type; - private SetQuotaByStorageTypeOp() { + SetQuotaByStorageTypeOp() { super(OP_SET_QUOTA_BY_STORAGETYPE); } @@ -2363,7 +2348,7 @@ public abstract class FSEditLogOp { long mtime; long atime; - private TimesOp() { + TimesOp() { super(OP_TIMES); } @@ -2472,7 +2457,7 @@ public abstract class FSEditLogOp { long atime; PermissionStatus permissionStatus; - private SymlinkOp() { + SymlinkOp() { super(OP_SYMLINK); } @@ -2631,7 +2616,7 @@ public abstract class FSEditLogOp { long timestamp; Rename[] options; - private RenameOp() { + RenameOp() { super(OP_RENAME); } @@ -2796,7 +2781,7 @@ public abstract class FSEditLogOp { long timestamp; Block truncateBlock; - private TruncateOp() { + TruncateOp() { super(OP_TRUNCATE); } @@ -2929,7 +2914,7 @@ public abstract class FSEditLogOp { String path; String newHolder; - private ReassignLeaseOp() { + ReassignLeaseOp() { super(OP_REASSIGN_LEASE); } @@ -3011,7 +2996,7 @@ public abstract class FSEditLogOp { DelegationTokenIdentifier token; long expiryTime; - private GetDelegationTokenOp() { + GetDelegationTokenOp() { super(OP_GET_DELEGATION_TOKEN); } @@ -3090,7 +3075,7 @@ public abstract class FSEditLogOp { DelegationTokenIdentifier token; long expiryTime; - private RenewDelegationTokenOp() { + RenewDelegationTokenOp() { super(OP_RENEW_DELEGATION_TOKEN); } @@ -3168,7 +3153,7 @@ public abstract class FSEditLogOp { static class CancelDelegationTokenOp extends FSEditLogOp { DelegationTokenIdentifier token; - private CancelDelegationTokenOp() { + CancelDelegationTokenOp() { super(OP_CANCEL_DELEGATION_TOKEN); } @@ -3227,7 +3212,7 @@ public abstract class FSEditLogOp { static class UpdateMasterKeyOp extends FSEditLogOp { DelegationKey key; - private UpdateMasterKeyOp() { + UpdateMasterKeyOp() { super(OP_UPDATE_MASTER_KEY); } @@ -3332,8 +3317,20 @@ public abstract class FSEditLogOp { } } + static class StartLogSegmentOp extends LogSegmentOp { + StartLogSegmentOp() { + super(OP_START_LOG_SEGMENT); + } + } + + static class EndLogSegmentOp extends LogSegmentOp { + EndLogSegmentOp() { + super(OP_END_LOG_SEGMENT); + } + } + static class InvalidOp extends FSEditLogOp { - private InvalidOp() { + InvalidOp() { super(OP_INVALID); } @@ -4144,7 +4141,7 @@ public abstract class FSEditLogOp { List<XAttr> xAttrs; String src; - private RemoveXAttrOp() { + RemoveXAttrOp() { super(OP_REMOVE_XATTR); } @@ -4197,7 +4194,7 @@ public abstract class FSEditLogOp { List<XAttr> xAttrs; String src; - private SetXAttrOp() { + SetXAttrOp() { super(OP_SET_XATTR); } @@ -4250,7 +4247,7 @@ public abstract class FSEditLogOp { List<AclEntry> aclEntries = Lists.newArrayList(); String src; - private SetAclOp() { + SetAclOp() { super(OP_SET_ACL); } @@ -4347,7 +4344,7 @@ public abstract class FSEditLogOp { /** * Operation corresponding to upgrade */ - static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent + abstract static class RollingUpgradeOp extends FSEditLogOp { // @Idempotent private final String name; private long time; @@ -4414,7 +4411,7 @@ public abstract class FSEditLogOp { String path; byte policyId; - private SetStoragePolicyOp() { + SetStoragePolicyOp() { super(OP_SET_STORAGE_POLICY); } @@ -4480,6 +4477,26 @@ public abstract class FSEditLogOp { } } + static class RollingUpgradeStartOp extends RollingUpgradeOp { + RollingUpgradeStartOp() { + super(OP_ROLLING_UPGRADE_START, "start"); + } + + static RollingUpgradeStartOp getInstance(OpInstanceCache cache) { + return (RollingUpgradeStartOp) cache.get(OP_ROLLING_UPGRADE_START); + } + } + + static class RollingUpgradeFinalizeOp extends RollingUpgradeOp { + RollingUpgradeFinalizeOp() { + super(OP_ROLLING_UPGRADE_FINALIZE, "finalize"); + } + + static RollingUpgradeFinalizeOp getInstance(OpInstanceCache cache) { + return (RollingUpgradeFinalizeOp) cache.get(OP_ROLLING_UPGRADE_FINALIZE); + } + } + /** * Class for writing editlog ops */ http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java index 1a0a296..3f8feba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.*; /** * Op codes for edits file @@ -27,60 +28,64 @@ import org.apache.hadoop.classification.InterfaceStability; @InterfaceStability.Unstable public enum FSEditLogOpCodes { // last op code in file - OP_ADD ((byte) 0), - OP_RENAME_OLD ((byte) 1), // deprecated operation - OP_DELETE ((byte) 2), - OP_MKDIR ((byte) 3), - OP_SET_REPLICATION ((byte) 4), + OP_ADD ((byte) 0, AddOp.class), + // deprecated operation + OP_RENAME_OLD ((byte) 1, RenameOldOp.class), + OP_DELETE ((byte) 2, DeleteOp.class), + OP_MKDIR ((byte) 3, MkdirOp.class), + OP_SET_REPLICATION ((byte) 4, SetReplicationOp.class), @Deprecated OP_DATANODE_ADD ((byte) 5), // obsolete @Deprecated OP_DATANODE_REMOVE((byte) 6), // obsolete - OP_SET_PERMISSIONS ((byte) 7), - OP_SET_OWNER ((byte) 8), - OP_CLOSE ((byte) 9), - OP_SET_GENSTAMP_V1 ((byte) 10), - OP_SET_NS_QUOTA ((byte) 11), // obsolete - OP_CLEAR_NS_QUOTA ((byte) 12), // obsolete - OP_TIMES ((byte) 13), // set atime, mtime - OP_SET_QUOTA ((byte) 14), - OP_RENAME ((byte) 15), // filecontext rename - OP_CONCAT_DELETE ((byte) 16), // concat files - OP_SYMLINK ((byte) 17), - OP_GET_DELEGATION_TOKEN ((byte) 18), - OP_RENEW_DELEGATION_TOKEN ((byte) 19), - OP_CANCEL_DELEGATION_TOKEN ((byte) 20), - OP_UPDATE_MASTER_KEY ((byte) 21), - OP_REASSIGN_LEASE ((byte) 22), - OP_END_LOG_SEGMENT ((byte) 23), - OP_START_LOG_SEGMENT ((byte) 24), - OP_UPDATE_BLOCKS ((byte) 25), - OP_CREATE_SNAPSHOT ((byte) 26), - OP_DELETE_SNAPSHOT ((byte) 27), - OP_RENAME_SNAPSHOT ((byte) 28), - OP_ALLOW_SNAPSHOT ((byte) 29), - OP_DISALLOW_SNAPSHOT ((byte) 30), - OP_SET_GENSTAMP_V2 ((byte) 31), - OP_ALLOCATE_BLOCK_ID ((byte) 32), - OP_ADD_BLOCK ((byte) 33), - OP_ADD_CACHE_DIRECTIVE ((byte) 34), - OP_REMOVE_CACHE_DIRECTIVE ((byte) 35), - OP_ADD_CACHE_POOL ((byte) 36), - OP_MODIFY_CACHE_POOL ((byte) 37), - OP_REMOVE_CACHE_POOL ((byte) 38), - OP_MODIFY_CACHE_DIRECTIVE ((byte) 39), - OP_SET_ACL ((byte) 40), - OP_ROLLING_UPGRADE_START ((byte) 41), - OP_ROLLING_UPGRADE_FINALIZE ((byte) 42), - OP_SET_XATTR ((byte) 43), - OP_REMOVE_XATTR ((byte) 44), - OP_SET_STORAGE_POLICY ((byte) 45), - OP_TRUNCATE ((byte) 46), - OP_APPEND ((byte) 47), - OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48), + OP_SET_PERMISSIONS ((byte) 7, SetPermissionsOp.class), + OP_SET_OWNER ((byte) 8, SetOwnerOp.class), + OP_CLOSE ((byte) 9, CloseOp.class), + OP_SET_GENSTAMP_V1 ((byte) 10, SetGenstampV1Op.class), + OP_SET_NS_QUOTA ((byte) 11, SetNSQuotaOp.class), // obsolete + OP_CLEAR_NS_QUOTA ((byte) 12, ClearNSQuotaOp.class), // obsolete + OP_TIMES ((byte) 13, TimesOp.class), // set atime, mtime + OP_SET_QUOTA ((byte) 14, SetQuotaOp.class), + // filecontext rename + OP_RENAME ((byte) 15, RenameOp.class), + // concat files + OP_CONCAT_DELETE ((byte) 16, ConcatDeleteOp.class), + OP_SYMLINK ((byte) 17, SymlinkOp.class), + OP_GET_DELEGATION_TOKEN ((byte) 18, GetDelegationTokenOp.class), + OP_RENEW_DELEGATION_TOKEN ((byte) 19, RenewDelegationTokenOp.class), + OP_CANCEL_DELEGATION_TOKEN ((byte) 20, CancelDelegationTokenOp.class), + OP_UPDATE_MASTER_KEY ((byte) 21, UpdateMasterKeyOp.class), + OP_REASSIGN_LEASE ((byte) 22, ReassignLeaseOp.class), + OP_END_LOG_SEGMENT ((byte) 23, EndLogSegmentOp.class), + OP_START_LOG_SEGMENT ((byte) 24, StartLogSegmentOp.class), + OP_UPDATE_BLOCKS ((byte) 25, UpdateBlocksOp.class), + OP_CREATE_SNAPSHOT ((byte) 26, CreateSnapshotOp.class), + OP_DELETE_SNAPSHOT ((byte) 27, DeleteSnapshotOp.class), + OP_RENAME_SNAPSHOT ((byte) 28, RenameSnapshotOp.class), + OP_ALLOW_SNAPSHOT ((byte) 29, AllowSnapshotOp.class), + OP_DISALLOW_SNAPSHOT ((byte) 30, DisallowSnapshotOp.class), + OP_SET_GENSTAMP_V2 ((byte) 31, SetGenstampV2Op.class), + OP_ALLOCATE_BLOCK_ID ((byte) 32, AllocateBlockIdOp.class), + OP_ADD_BLOCK ((byte) 33, AddBlockOp.class), + OP_ADD_CACHE_DIRECTIVE ((byte) 34, AddCacheDirectiveInfoOp.class), + OP_REMOVE_CACHE_DIRECTIVE ((byte) 35, RemoveCacheDirectiveInfoOp.class), + OP_ADD_CACHE_POOL ((byte) 36, AddCachePoolOp.class), + OP_MODIFY_CACHE_POOL ((byte) 37, ModifyCachePoolOp.class), + OP_REMOVE_CACHE_POOL ((byte) 38, RemoveCachePoolOp.class), + OP_MODIFY_CACHE_DIRECTIVE ((byte) 39, ModifyCacheDirectiveInfoOp.class), + OP_SET_ACL ((byte) 40, SetAclOp.class), + OP_ROLLING_UPGRADE_START ((byte) 41, RollingUpgradeStartOp.class), + OP_ROLLING_UPGRADE_FINALIZE ((byte) 42, RollingUpgradeFinalizeOp.class), + OP_SET_XATTR ((byte) 43, SetXAttrOp.class), + OP_REMOVE_XATTR ((byte) 44, RemoveXAttrOp.class), + OP_SET_STORAGE_POLICY ((byte) 45, SetStoragePolicyOp.class), + OP_TRUNCATE ((byte) 46, TruncateOp.class), + OP_APPEND ((byte) 47, AppendOp.class), + OP_SET_QUOTA_BY_STORAGETYPE ((byte) 48, SetQuotaByStorageTypeOp.class), // Note that the current range of the valid OP code is 0~127 OP_INVALID ((byte) -1); private final byte opCode; + private final Class<? extends FSEditLogOp> opClass; /** * Constructor @@ -88,7 +93,12 @@ public enum FSEditLogOpCodes { * @param opCode byte value of constructed enum */ FSEditLogOpCodes(byte opCode) { + this(opCode, null); + } + + FSEditLogOpCodes(byte opCode, Class<? extends FSEditLogOp> opClass) { this.opCode = opCode; + this.opClass = opClass; } /** @@ -100,6 +110,10 @@ public enum FSEditLogOpCodes { return opCode; } + public Class<? extends FSEditLogOp> getOpClass() { + return opClass; + } + private static final FSEditLogOpCodes[] VALUES; static { http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java index 084f82a..b637105 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java @@ -140,7 +140,7 @@ public class FSImage implements Closeable { storage.setRestoreFailedStorage(true); } - this.editLog = new FSEditLog(conf, storage, editsDirs); + this.editLog = FSEditLog.newInstance(conf, storage, editsDirs); archivalManager = new NNStorageRetentionManager(conf, storage, editLog); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index f376901..e8900ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -1263,7 +1263,6 @@ public class NameNode implements NameNodeStatusMXBean { newSharedEditLog.logEdit(op); if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) { - newSharedEditLog.logSync(); newSharedEditLog.endCurrentLogSegment(false); LOG.debug("ending log segment because of END_LOG_SEGMENT op in " + stream); http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java index 54b5c6e..ce0b050 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java @@ -283,8 +283,8 @@ public class NameNodeMetrics { transactions.add(latency); } - public void incrTransactionsBatchedInSync() { - transactionsBatchedInSync.incr(); + public void incrTransactionsBatchedInSync(long count) { + transactionsBatchedInSync.incr(count); } public void addSync(long elapsed) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index dc1853a..0b08996 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -267,6 +267,9 @@ public class DFSTestUtil { } public static void setEditLogForTesting(FSNamesystem fsn, FSEditLog newLog) { + // spies are shallow copies, must allow async log to restart its thread + // so it has the new copy + newLog.restart(); Whitebox.setInternalState(fsn.getFSImage(), "editLog", newLog); Whitebox.setInternalState(fsn.getFSDirectory(), "editLog", newLog); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java index c79e0c2..9b42cac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java @@ -69,17 +69,21 @@ import org.junit.runners.Parameterized.Parameters; public class TestAuditLogs { static final String auditLogFile = PathUtils.getTestDirName(TestAuditLogs.class) + "/TestAuditLogs-audit.log"; final boolean useAsyncLog; - + final boolean useAsyncEdits; + @Parameters public static Collection<Object[]> data() { Collection<Object[]> params = new ArrayList<Object[]>(); - params.add(new Object[]{new Boolean(false)}); - params.add(new Object[]{new Boolean(true)}); + params.add(new Object[]{Boolean.FALSE, Boolean.FALSE}); + params.add(new Object[]{Boolean.TRUE, Boolean.FALSE}); + params.add(new Object[]{Boolean.FALSE, Boolean.TRUE}); + params.add(new Object[]{Boolean.TRUE, Boolean.TRUE}); return params; } - - public TestAuditLogs(boolean useAsyncLog) { + + public TestAuditLogs(boolean useAsyncLog, boolean useAsyncEdits) { this.useAsyncLog = useAsyncLog; + this.useAsyncEdits = useAsyncEdits; } // Pattern for: @@ -116,6 +120,7 @@ public class TestAuditLogs { conf.setLong(DFSConfigKeys.DFS_NAMENODE_ACCESSTIME_PRECISION_KEY, precision); conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 10000L); conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_AUDIT_LOG_ASYNC_KEY, useAsyncLog); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, useAsyncEdits); util = new DFSTestUtil.Builder().setName("TestAuditAllowed"). setNumFiles(20).build(); cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4).build(); http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java index 14240e0..1eb377a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLog.java @@ -88,6 +88,9 @@ import org.apache.log4j.AppenderSkeleton; import org.apache.log4j.LogManager; import org.apache.log4j.spi.LoggingEvent; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.mockito.Mockito; import org.xml.sax.ContentHandler; import org.xml.sax.SAXException; @@ -98,12 +101,33 @@ import com.google.common.collect.Lists; /** * This class tests the creation and validation of a checkpoint. */ +@RunWith(Parameterized.class) public class TestEditLog { - + static { GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL); } + @Parameters + public static Collection<Object[]> data() { + Collection<Object[]> params = new ArrayList<Object[]>(); + params.add(new Object[]{ Boolean.FALSE }); + params.add(new Object[]{ Boolean.TRUE }); + return params; + } + + private static boolean useAsyncEditLog; + public TestEditLog(Boolean async) { + useAsyncEditLog = async; + } + + public static Configuration getConf() { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); + return conf; + } + /** * A garbage mkdir op which is used for testing * {@link EditLogFileInputStream#scanEditLog(File)} @@ -225,11 +249,12 @@ public class TestEditLog { * @param storage Storage object used by namenode */ private static FSEditLog getFSEditLog(NNStorage storage) throws IOException { - Configuration conf = new Configuration(); + Configuration conf = getConf(); // Make sure the edits dirs are set in the provided configuration object. conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, StringUtils.join(",", storage.getEditsDirectories())); - FSEditLog log = new FSEditLog(conf, storage, FSNamesystem.getNamespaceEditsDirs(conf)); + FSEditLog log = FSEditLog.newInstance( + conf, storage, FSNamesystem.getNamespaceEditsDirs(conf)); return log; } @@ -252,7 +277,7 @@ public class TestEditLog { */ @Test public void testPreTxidEditLogWithEdits() throws Exception { - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; try { @@ -282,7 +307,7 @@ public class TestEditLog { @Test public void testSimpleEditLog() throws IOException { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; try { @@ -351,7 +376,7 @@ public class TestEditLog { private void testEditLog(int initialSize) throws IOException { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; @@ -482,8 +507,12 @@ public class TestEditLog { @Test public void testSyncBatching() throws Exception { - // start a cluster - Configuration conf = new HdfsConfiguration(); + if (useAsyncEditLog) { + // semantics are completely differently since edits will be auto-synced + return; + } + // start a cluster + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; ExecutorService threadA = Executors.newSingleThreadExecutor(); @@ -546,7 +575,7 @@ public class TestEditLog { @Test public void testBatchedSyncWithClosedLogs() throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; ExecutorService threadA = Executors.newSingleThreadExecutor(); @@ -586,7 +615,7 @@ public class TestEditLog { @Test public void testEditChecksum() throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); @@ -658,7 +687,7 @@ public class TestEditLog { */ private void testCrashRecovery(int numTransactions) throws Exception { MiniDFSCluster cluster = null; - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, CHECKPOINT_ON_STARTUP_MIN_TXNS); @@ -803,7 +832,7 @@ public class TestEditLog { boolean updateTransactionIdFile, boolean shouldSucceed) throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(NUM_DATA_NODES).build(); @@ -1134,7 +1163,7 @@ public class TestEditLog { public static NNStorage setupEdits(List<URI> editUris, int numrolls, boolean closeOnFinish, AbortSpec... abortAtRolls) throws IOException { List<AbortSpec> aborts = new ArrayList<AbortSpec>(Arrays.asList(abortAtRolls)); - NNStorage storage = new NNStorage(new Configuration(), + NNStorage storage = new NNStorage(getConf(), Collections.<URI>emptyList(), editUris); storage.format(new NamespaceInfo()); @@ -1296,7 +1325,7 @@ public class TestEditLog { EditLogFileOutputStream elfos = null; EditLogFileInputStream elfis = null; try { - elfos = new EditLogFileOutputStream(new Configuration(), TEST_LOG_NAME, 0); + elfos = new EditLogFileOutputStream(getConf(), TEST_LOG_NAME, 0); elfos.create(NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); elfos.writeRaw(garbage, 0, garbage.length); elfos.setReadyToFlush(); @@ -1474,7 +1503,7 @@ public class TestEditLog { public void testManyEditLogSegments() throws IOException { final int NUM_EDIT_LOG_ROLLS = 1000; // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java index f22ee2f..c60d79f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogAutoroll.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hdfs.server.namenode; import java.net.BindException; +import java.util.ArrayList; +import java.util.Collection; import java.util.Random; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_PERIOD_KEY; @@ -30,18 +32,40 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem.NameNodeEditLogRoller; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import com.google.common.base.Supplier; +@RunWith(Parameterized.class) public class TestEditLogAutoroll { + static { + GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL); + } + + @Parameters + public static Collection<Object[]> data() { + Collection<Object[]> params = new ArrayList<Object[]>(); + params.add(new Object[]{ Boolean.FALSE }); + params.add(new Object[]{ Boolean.TRUE }); + return params; + } + + private static boolean useAsyncEditLog; + public TestEditLogAutoroll(Boolean async) { + useAsyncEditLog = async; + } private Configuration conf; private MiniDFSCluster cluster; @@ -61,6 +85,8 @@ public class TestEditLogAutoroll { // Make it autoroll after 10 edits conf.setFloat(DFS_NAMENODE_EDIT_LOG_AUTOROLL_MULTIPLIER_THRESHOLD, 0.5f); conf.setInt(DFS_NAMENODE_EDIT_LOG_AUTOROLL_CHECK_INTERVAL_MS, 100); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); int retryCount = 0; while (true) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java index 51dfc3e..28169bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogJournalFailures.java @@ -21,12 +21,13 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.spy; import java.io.File; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; @@ -43,13 +44,37 @@ import org.apache.hadoop.util.ExitUtil.ExitException; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.mockito.Mockito; +@RunWith(Parameterized.class) public class TestEditLogJournalFailures { private int editsPerformed = 0; private MiniDFSCluster cluster; private FileSystem fs; + private boolean useAsyncEdits; + + @Parameters + public static Collection<Object[]> data() { + Collection<Object[]> params = new ArrayList<Object[]>(); + params.add(new Object[]{Boolean.FALSE}); + params.add(new Object[]{Boolean.TRUE}); + return params; + } + + public TestEditLogJournalFailures(boolean useAsyncEdits) { + this.useAsyncEdits = useAsyncEdits; + } + + private Configuration getConf() { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEdits); + return conf; + } /** * Create the mini cluster for testing and sub in a custom runtime so that @@ -57,9 +82,9 @@ public class TestEditLogJournalFailures { */ @Before public void setUpMiniCluster() throws IOException { - setUpMiniCluster(new HdfsConfiguration(), true); + setUpMiniCluster(getConf(), true); } - + public void setUpMiniCluster(Configuration conf, boolean manageNameDfsDirs) throws IOException { cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0) @@ -153,7 +178,7 @@ public class TestEditLogJournalFailures { String[] editsDirs = cluster.getConfiguration(0).getTrimmedStrings( DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY); shutDownMiniCluster(); - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, editsDirs[0]); conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, 0); conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0); @@ -193,7 +218,7 @@ public class TestEditLogJournalFailures { throws IOException { // Set up 4 name/edits dirs. shutDownMiniCluster(); - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); String[] nameDirs = new String[4]; for (int i = 0; i < nameDirs.length; i++) { File nameDir = new File(PathUtils.getTestDir(getClass()), "name-dir" + i); http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java index fcffbc3..195ce5e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestEditLogRace.java @@ -26,14 +26,17 @@ import static org.mockito.Mockito.spy; import java.io.File; import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.hdfs.DFSConfigKeys; @@ -46,10 +49,14 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -57,15 +64,27 @@ import org.mockito.stubbing.Answer; * This class tests various synchronization bugs in FSEditLog rolling * and namespace saving. */ +@RunWith(Parameterized.class) public class TestEditLogRace { static { GenericTestUtils.setLogLevel(FSEditLog.LOG, Level.ALL); } - private static final Log LOG = LogFactory.getLog(TestEditLogRace.class); + @Parameters + public static Collection<Object[]> data() { + Collection<Object[]> params = new ArrayList<Object[]>(); + params.add(new Object[]{ false }); + params.add(new Object[]{ true }); + return params; + } + + private static boolean useAsyncEditLog; - private static final String NAME_DIR = - MiniDFSCluster.getBaseDirectory() + "name1"; + public TestEditLogRace(boolean useAsyncEditLog) { + TestEditLogRace.useAsyncEditLog = useAsyncEditLog; + } + + private static final Log LOG = LogFactory.getLog(TestEditLogRace.class); // This test creates NUM_THREADS threads and each thread continuously writes // transactions @@ -94,21 +113,29 @@ public class TestEditLogRace { * This value needs to be significantly longer than the average * time for an fsync() or enterSafeMode(). */ - private static final int BLOCK_TIME = 10; - + private static final int BLOCK_TIME = 4; // 4 sec pretty generous + // // an object that does a bunch of transactions // static class Transactions implements Runnable { final NamenodeProtocols nn; + final MiniDFSCluster cluster; + FileSystem fs; short replication = 3; long blockSize = 64; volatile boolean stopped = false; volatile Thread thr; final AtomicReference<Throwable> caught; - Transactions(NamenodeProtocols ns, AtomicReference<Throwable> caught) { - nn = ns; + Transactions(MiniDFSCluster cluster, AtomicReference<Throwable> caught) { + this.cluster = cluster; + this.nn = cluster.getNameNodeRpc(); + try { + this.fs = cluster.getFileSystem(); + } catch (IOException e) { + caught.set(e); + } this.caught = caught; } @@ -122,11 +149,23 @@ public class TestEditLogRace { while (!stopped) { try { String dirname = "/thr-" + thr.getId() + "-dir-" + i; - nn.mkdirs(dirname, p, true); - nn.delete(dirname, true); + if (i % 2 == 0) { + Path dirnamePath = new Path(dirname); + fs.mkdirs(dirnamePath); + fs.delete(dirnamePath, true); + } else { + nn.mkdirs(dirname, p, true); + nn.delete(dirname, true); + } } catch (SafeModeException sme) { // This is OK - the tests will bring NN in and out of safemode } catch (Throwable e) { + // This is OK - the tests will bring NN in and out of safemode + if (e instanceof RemoteException && + ((RemoteException)e).getClassName() + .contains("SafeModeException")) { + return; + } LOG.warn("Got error in transaction thread", e); caught.compareAndSet(null, e); break; @@ -144,11 +183,11 @@ public class TestEditLogRace { } } - private void startTransactionWorkers(NamenodeProtocols namesystem, + private void startTransactionWorkers(MiniDFSCluster cluster, AtomicReference<Throwable> caughtErr) { // Create threads and make them run transactions concurrently. for (int i = 0; i < NUM_THREADS; i++) { - Transactions trans = new Transactions(namesystem, caughtErr); + Transactions trans = new Transactions(cluster, caughtErr); new Thread(trans, "TransactionThread-" + i).start(); workers.add(trans); } @@ -174,21 +213,21 @@ public class TestEditLogRace { @Test public void testEditLogRolling() throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); - MiniDFSCluster cluster = null; + Configuration conf = getConf(); + final MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); FileSystem fileSys = null; AtomicReference<Throwable> caughtErr = new AtomicReference<Throwable>(); try { - cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES).build(); cluster.waitActive(); fileSys = cluster.getFileSystem(); final NamenodeProtocols nn = cluster.getNameNode().getRpcServer(); FSImage fsimage = cluster.getNamesystem().getFSImage(); StorageDirectory sd = fsimage.getStorage().getStorageDir(0); - startTransactionWorkers(nn, caughtErr); + startTransactionWorkers(cluster, caughtErr); long previousLogTxId = 1; @@ -256,7 +295,7 @@ public class TestEditLogRace { @Test public void testSaveNamespace() throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; @@ -266,12 +305,11 @@ public class TestEditLogRace { cluster.waitActive(); fileSys = cluster.getFileSystem(); final FSNamesystem namesystem = cluster.getNamesystem(); - final NamenodeProtocols nn = cluster.getNameNodeRpc(); FSImage fsimage = namesystem.getFSImage(); FSEditLog editLog = fsimage.getEditLog(); - startTransactionWorkers(nn, caughtErr); + startTransactionWorkers(cluster, caughtErr); for (int i = 0; i < NUM_SAVE_IMAGE && caughtErr.get() == null; i++) { try { @@ -321,11 +359,13 @@ public class TestEditLogRace { private Configuration getConf() { Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); conf.set(DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY, "0.0.0.0:0"); - conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR); - conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR); - conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); + //conf.set(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY, NAME_DIR); + //conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, NAME_DIR); + conf.setBoolean(DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, false); return conf; } @@ -389,7 +429,7 @@ public class TestEditLogRace { @Override public Void answer(InvocationOnMock invocation) throws Throwable { LOG.info("Flush called"); - if (Thread.currentThread() == doAnEditThread) { + if (useAsyncEditLog || Thread.currentThread() == doAnEditThread) { LOG.info("edit thread: Telling main thread we made it to flush section..."); // Signal to main thread that the edit thread is in the racy section waitToEnterFlush.countDown(); @@ -457,62 +497,52 @@ public class TestEditLogRace { try { FSImage fsimage = namesystem.getFSImage(); - FSEditLog editLog = spy(fsimage.getEditLog()); - DFSTestUtil.setEditLogForTesting(namesystem, editLog); + final FSEditLog editLog = fsimage.getEditLog(); final AtomicReference<Throwable> deferredException = new AtomicReference<Throwable>(); - final CountDownLatch waitToEnterSync = new CountDownLatch(1); - + final CountDownLatch sleepingBeforeSync = new CountDownLatch(1); + final Thread doAnEditThread = new Thread() { @Override public void run() { try { - LOG.info("Starting mkdirs"); - namesystem.mkdirs("/test", - new PermissionStatus("test","test", new FsPermission((short)00755)), - true); - LOG.info("mkdirs complete"); + LOG.info("Starting setOwner"); + namesystem.writeLock(); + try { + editLog.logSetOwner("/","test","test"); + } finally { + namesystem.writeUnlock(); + } + sleepingBeforeSync.countDown(); + LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs"); + Thread.sleep(BLOCK_TIME*1000); + editLog.logSync(); + LOG.info("edit thread: logSync complete"); } catch (Throwable ioe) { LOG.fatal("Got exception", ioe); deferredException.set(ioe); - waitToEnterSync.countDown(); - } - } - }; - - Answer<Void> blockingSync = new Answer<Void>() { - @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - LOG.info("logSync called"); - if (Thread.currentThread() == doAnEditThread) { - LOG.info("edit thread: Telling main thread we made it just before logSync..."); - waitToEnterSync.countDown(); - LOG.info("edit thread: sleeping for " + BLOCK_TIME + "secs"); - Thread.sleep(BLOCK_TIME*1000); - LOG.info("Going through to logSync. This will allow the main thread to continue."); + sleepingBeforeSync.countDown(); } - invocation.callRealMethod(); - LOG.info("logSync complete"); - return null; } }; - doAnswer(blockingSync).when(editLog).logSync(); - + doAnEditThread.setDaemon(true); doAnEditThread.start(); LOG.info("Main thread: waiting to just before logSync..."); - waitToEnterSync.await(); + sleepingBeforeSync.await(200, TimeUnit.MILLISECONDS); assertNull(deferredException.get()); LOG.info("Main thread: detected that logSync about to be called."); LOG.info("Trying to enter safe mode."); - LOG.info("This should block for " + BLOCK_TIME + "sec, since we have pending edits"); - + long st = Time.now(); namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER); long et = Time.now(); - LOG.info("Entered safe mode"); - // Make sure we really waited for the flush to complete! - assertTrue(et - st > (BLOCK_TIME - 1)*1000); + LOG.info("Entered safe mode after "+(et-st)+"ms"); + + // Make sure we didn't wait for the thread that did a logEdit but + // not logSync. Going into safemode does a logSyncAll that will flush + // its edit. + assertTrue(et - st < (BLOCK_TIME/2)*1000); // Once we're in safe mode, save namespace. namesystem.saveNamespace(0, 0); http://git-wip-us.apache.org/repos/asf/hadoop/blob/21517168/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java index b0e5704..fe29e1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSEditLogLoader.java @@ -31,6 +31,8 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; import java.nio.channels.FileChannel; +import java.util.ArrayList; +import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.SortedMap; @@ -59,28 +61,51 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.PathUtils; import org.apache.log4j.Level; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; import com.google.common.collect.Maps; import com.google.common.io.Files; +@RunWith(Parameterized.class) public class TestFSEditLogLoader { - + @Parameters + public static Collection<Object[]> data() { + Collection<Object[]> params = new ArrayList<Object[]>(); + params.add(new Object[]{ Boolean.FALSE }); + params.add(new Object[]{ Boolean.TRUE }); + return params; + } + + private static boolean useAsyncEditLog; + public TestFSEditLogLoader(Boolean async) { + useAsyncEditLog = async; + } + + private static Configuration getConf() { + Configuration conf = new HdfsConfiguration(); + conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_EDITS_ASYNC_LOGGING, + useAsyncEditLog); + return conf; + } + static { GenericTestUtils.setLogLevel(FSImage.LOG, Level.ALL); GenericTestUtils.setLogLevel(FSEditLogLoader.LOG, Level.ALL); } - + private static final File TEST_DIR = PathUtils.getTestDir(TestFSEditLogLoader.class); private static final int NUM_DATA_NODES = 0; private static final ErasureCodingPolicy testECPolicy = StripedFileTestUtil.TEST_EC_POLICY; - + @Test public void testDisplayRecentEditLogOpCodes() throws IOException { - // start a cluster - Configuration conf = new HdfsConfiguration(); + // start a cluster + Configuration conf = getConf(); MiniDFSCluster cluster = null; FileSystem fileSys = null; cluster = new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATA_NODES) @@ -130,7 +155,7 @@ public class TestFSEditLogLoader { @Test public void testReplicationAdjusted() throws Exception { // start a cluster - Configuration conf = new HdfsConfiguration(); + Configuration conf = getConf(); // Replicate and heartbeat fast to shave a few seconds off test conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1); conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);