HDFS-4176. EditLogTailer should call rollEdits with a timeout. (Lei Xu)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/67406460 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/67406460 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/67406460 Branch: refs/heads/HADOOP-12756 Commit: 67406460f0b6c05edde1d1185aeb42b6324df202 Parents: 328c855 Author: Lei Xu <l...@apache.org> Authored: Thu Jul 28 13:32:00 2016 -0700 Committer: Lei Xu <l...@apache.org> Committed: Thu Jul 28 13:32:00 2016 -0700 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 3 + .../hdfs/server/namenode/ha/EditLogTailer.java | 76 +++++++++++++++----- .../src/main/resources/hdfs-default.xml | 7 ++ .../server/namenode/ha/TestEditLogTailer.java | 46 ++++++++++++ 4 files changed, 116 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/67406460/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 231dea7..3385751 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -731,6 +731,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_HA_TAILEDITS_INPROGRESS_KEY = "dfs.ha.tail-edits.in-progress"; public static final boolean DFS_HA_TAILEDITS_INPROGRESS_DEFAULT = false; + public static final String DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY = + "dfs.ha.tail-edits.rolledits.timeout"; + public static final int DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT = 60; // 1m public static final String DFS_HA_LOGROLL_RPC_TIMEOUT_KEY = "dfs.ha.log-roll.rpc.timeout"; public static final int DFS_HA_LOGROLL_RPC_TIMEOUT_DEFAULT = 20000; // 20s public static final String DFS_HA_FENCE_METHODS_KEY = "dfs.ha.fencing.methods"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/67406460/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java index 1447375..95c3d58 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/EditLogTailer.java @@ -27,16 +27,21 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import com.google.common.collect.Iterators; -import com.google.common.collect.Lists; +import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB; import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; @@ -102,6 +107,17 @@ public class EditLogTailer { private final long logRollPeriodMs; /** + * The timeout in milliseconds of calling rollEdits RPC to Active NN. + * @see HDFS-4176. + */ + private final long rollEditsTimeoutMs; + + /** + * The executor to run roll edit RPC call in a daemon thread. + */ + private final ExecutorService rollEditsRpcExecutor; + + /** * How often the Standby should check if there are new finalized segment(s) * available to be read from. */ @@ -159,6 +175,13 @@ public class EditLogTailer { sleepTimeMs = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_DEFAULT) * 1000; + rollEditsTimeoutMs = conf.getInt( + DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, + DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_DEFAULT) * 1000; + + rollEditsRpcExecutor = Executors.newSingleThreadExecutor( + new ThreadFactoryBuilder().setDaemon(true).build()); + maxRetries = conf.getInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_DEFAULT); if (maxRetries <= 0) { @@ -186,6 +209,7 @@ public class EditLogTailer { } public void stop() throws IOException { + rollEditsRpcExecutor.shutdown(); tailerThread.setShouldRun(false); tailerThread.interrupt(); try { @@ -205,7 +229,7 @@ public class EditLogTailer { public void setEditLog(FSEditLog editLog) { this.editLog = editLog; } - + public void catchupDuringFailover() throws IOException { Preconditions.checkState(tailerThread == null || !tailerThread.isAlive(), @@ -300,30 +324,50 @@ public class EditLogTailer { } /** + * NameNodeProxy factory method. + * @return a Callable to roll logs on remote NameNode. + */ + @VisibleForTesting + Callable<Void> getNameNodeProxy() { + return new MultipleNameNodeProxy<Void>() { + @Override + protected Void doWork() throws IOException { + cachedActiveProxy.rollEditLog(); + return null; + } + }; + } + + /** * Trigger the active node to roll its logs. */ - private void triggerActiveLogRoll() { + @VisibleForTesting + void triggerActiveLogRoll() { LOG.info("Triggering log roll on remote NameNode"); + Future<Void> future = null; try { - new MultipleNameNodeProxy<Void>() { - @Override - protected Void doWork() throws IOException { - cachedActiveProxy.rollEditLog(); - return null; - } - }.call(); + future = rollEditsRpcExecutor.submit(getNameNodeProxy()); + future.get(rollEditsTimeoutMs, TimeUnit.MILLISECONDS); lastRollTriggerTxId = lastLoadedTxnId; - } catch (IOException ioe) { - if (ioe instanceof RemoteException) { - ioe = ((RemoteException)ioe).unwrapRemoteException(); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof RemoteException) { + IOException ioe = ((RemoteException) cause).unwrapRemoteException(); if (ioe instanceof StandbyException) { LOG.info("Skipping log roll. Remote node is not in Active state: " + ioe.getMessage().split("\n")[0]); return; } } - - LOG.warn("Unable to trigger a roll of the active NN", ioe); + LOG.warn("Unable to trigger a roll of the active NN", e); + } catch (TimeoutException e) { + if (future != null) { + future.cancel(true); + } + LOG.warn(String.format( + "Unable to finish rolling edits in %d ms", rollEditsTimeoutMs)); + } catch (InterruptedException e) { + LOG.warn("Unable to trigger a roll of the active NN", e); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/67406460/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index b82fa31..e6fde8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1496,6 +1496,13 @@ </property> <property> + <name>dfs.ha.tail-edits.rolledits.timeout</name> + <value>60</value> + <description>The timeout in seconds of calling rollEdits RPC on Active NN. + </description> +</property> + +<property> <name>dfs.ha.automatic-failover.enabled</name> <value>false</value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/67406460/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java index 3af201d..0d0873d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestEditLogTailer.java @@ -17,15 +17,19 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Collection; +import java.util.concurrent.Callable; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.permission.FsPermission; @@ -50,6 +54,7 @@ import org.junit.runners.Parameterized; import org.junit.runners.Parameterized.Parameters; import com.google.common.base.Supplier; +import org.mockito.Mockito; @RunWith(Parameterized.class) public class TestEditLogTailer { @@ -249,4 +254,45 @@ public class TestEditLogTailer { } }, 100, 10000); } + + @Test(timeout=20000) + public void testRollEditTimeoutForActiveNN() throws IOException { + Configuration conf = getConf(); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ROLLEDITS_TIMEOUT_KEY, 5); // 5s + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_ALL_NAMESNODES_RETRY_KEY, 100); + + HAUtil.setAllowStandbyReads(conf, true); + + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(0) + .build(); + cluster.waitActive(); + + cluster.transitionToActive(0); + + try { + EditLogTailer tailer = Mockito.spy( + cluster.getNamesystem(1).getEditLogTailer()); + AtomicInteger flag = new AtomicInteger(0); + + // Return a slow roll edit process. + when(tailer.getNameNodeProxy()).thenReturn( + new Callable<Void>() { + @Override + public Void call() throws Exception { + Thread.sleep(30000); // sleep for 30 seconds. + assertTrue(Thread.currentThread().isInterrupted()); + flag.addAndGet(1); + return null; + } + } + ); + tailer.triggerActiveLogRoll(); + assertEquals(0, flag.get()); + } finally { + cluster.shutdown(); + } + } } --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org