Repository: hadoop
Updated Branches:
  refs/heads/trunk b3ae11d59 -> dd50f5399


HDFS-13062. Provide support for JN to use separate journal disk per namespace. 
Contributed by Bharat Viswanadham.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/dd50f539
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/dd50f539
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/dd50f539

Branch: refs/heads/trunk
Commit: dd50f53997239bf9078481cf46592ca3e41520b5
Parents: b3ae11d
Author: Hanisha Koneru <hanishakon...@apache.org>
Authored: Wed Jan 31 16:34:48 2018 -0800
Committer: Hanisha Koneru <hanishakon...@apache.org>
Committed: Thu Feb 1 12:28:17 2018 -0800

----------------------------------------------------------------------
 .../hdfs/qjournal/server/JournalNode.java       | 129 ++++++++++------
 .../hdfs/qjournal/server/TestJournalNode.java   | 148 ++++++++++++++++---
 2 files changed, 211 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd50f539/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
index 0954eaf..c772dfc 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java
@@ -17,18 +17,10 @@
  */
 package org.apache.hadoop.hdfs.qjournal.server;
 
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
-import java.io.File;
-import java.io.FileFilter;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.management.ObjectName;
-
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -47,14 +39,22 @@ import org.apache.hadoop.metrics2.util.MBeans;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.tracing.TraceUtils;
 import org.apache.hadoop.util.DiskChecker;
+import static org.apache.hadoop.util.ExitUtil.terminate;
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.htrace.core.Tracer;
 import org.eclipse.jetty.util.ajax.JSON;
 
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Maps;
+import javax.management.ObjectName;
+import java.io.File;
+import java.io.FileFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
 
 /**
  * The JournalNode is a daemon which allows namenodes using
@@ -74,7 +74,7 @@ public class JournalNode implements Tool, Configurable, 
JournalNodeMXBean {
       .newHashMap();
   private ObjectName journalNodeInfoBeanName;
   private String httpServerURI;
-  private File localDir;
+  private final ArrayList<File> localDir = Lists.newArrayList();
   Tracer tracer;
 
   static {
@@ -94,11 +94,10 @@ public class JournalNode implements Tool, Configurable, 
JournalNodeMXBean {
     
     Journal journal = journalsById.get(jid);
     if (journal == null) {
-      File logDir = getLogDir(jid);
-      LOG.info("Initializing journal in directory " + logDir);      
+      File logDir = getLogDir(jid, nameServiceId);
+      LOG.info("Initializing journal in directory " + logDir);
       journal = new Journal(conf, logDir, jid, startOpt, new ErrorReporter());
       journalsById.put(jid, journal);
-
       // Start SyncJouranl thread, if JournalNode Sync is enabled
       if (conf.getBoolean(
           DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY,
@@ -148,9 +147,34 @@ public class JournalNode implements Tool, Configurable, 
JournalNodeMXBean {
   @Override
   public void setConf(Configuration conf) {
     this.conf = conf;
-    this.localDir = new File(
-        conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
-        DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT).trim());
+
+    String journalNodeDir = null;
+    Collection<String> nameserviceIds;
+
+    nameserviceIds = conf.getTrimmedStringCollection(
+        DFSConfigKeys.DFS_INTERNAL_NAMESERVICES_KEY);
+
+    if (nameserviceIds.size() == 0) {
+      nameserviceIds = conf.getTrimmedStringCollection(
+          DFSConfigKeys.DFS_NAMESERVICES);
+    }
+
+    //if nameservicesIds size is less than 2, it means it is not a federated
+    // setup
+    if (nameserviceIds.size() < 2) {
+      // Check in HA, if journal edit dir is set by appending with
+      // nameserviceId
+      for (String nameService : nameserviceIds) {
+        journalNodeDir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY +
+        "." + nameService);
+      }
+      if (journalNodeDir == null) {
+        journalNodeDir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
+            DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
+      }
+      localDir.add(new File(journalNodeDir.trim()));
+    }
+
     if (this.tracer == null) {
       this.tracer = new Tracer.Builder("JournalNode").
           conf(TraceUtils.wrapHadoopConf("journalnode.htrace", conf)).
@@ -158,12 +182,13 @@ public class JournalNode implements Tool, Configurable, 
JournalNodeMXBean {
     }
   }
 
-  private static void validateAndCreateJournalDir(File dir) throws IOException 
{
+  private static void validateAndCreateJournalDir(File dir)
+      throws IOException {
+
     if (!dir.isAbsolute()) {
       throw new IllegalArgumentException(
           "Journal dir '" + dir + "' should be an absolute path");
     }
-
     DiskChecker.checkDir(dir);
   }
 
@@ -186,8 +211,9 @@ public class JournalNode implements Tool, Configurable, 
JournalNodeMXBean {
 
     try {
 
-      validateAndCreateJournalDir(localDir);
-
+      for (File journalDir : localDir) {
+        validateAndCreateJournalDir(journalDir);
+      }
       DefaultMetricsSystem.initialize("JournalNode");
       JvmMetrics.create("JournalNode",
           conf.get(DFSConfigKeys.DFS_METRICS_SESSION_ID_KEY),
@@ -297,16 +323,33 @@ public class JournalNode implements Tool, Configurable, 
JournalNodeMXBean {
    * @param jid the journal identifier
    * @return the file, which may or may not exist yet
    */
-  private File getLogDir(String jid) {
-    String dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
-        DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
+  private File getLogDir(String jid, String nameServiceId) throws IOException{
+    String dir = null;
+    if (nameServiceId != null) {
+      dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY + "." +
+          nameServiceId);
+    }
+    if (dir == null) {
+      dir = conf.get(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
+          DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT);
+    }
+
+    File journalDir = new File(dir.trim());
+    if (!localDir.contains(journalDir)) {
+      //It is a federated setup, we need to validate journalDir
+      validateAndCreateJournalDir(journalDir);
+      localDir.add(journalDir);
+    }
+
     Preconditions.checkArgument(jid != null &&
         !jid.isEmpty(),
         "bad journal identifier: %s", jid);
     assert jid != null;
-    return new File(new File(dir), jid);
+    return new File(journalDir, jid);
   }
 
+
+
   @Override // JournalNodeMXBean
   public String getJournalsStatus() {
     // jid:{Formatted:True/False}
@@ -328,20 +371,22 @@ public class JournalNode implements Tool, Configurable, 
JournalNodeMXBean {
     // Also note that we do not need to check localDir here since
     // validateAndCreateJournalDir has been called before we register the
     // MXBean.
-    File[] journalDirs = localDir.listFiles(new FileFilter() {
-      @Override
-      public boolean accept(File file) {
-        return file.isDirectory();
-      }
-    });
-
-    if (journalDirs != null) {
-      for (File journalDir : journalDirs) {
-        String jid = journalDir.getName();
-        if (!status.containsKey(jid)) {
-          Map<String, String> jMap = new HashMap<String, String>();
-          jMap.put("Formatted", "true");
-          status.put(jid, jMap);
+    for (File jDir : localDir) {
+      File[] journalDirs = jDir.listFiles(new FileFilter() {
+        @Override
+        public boolean accept(File file) {
+          return file.isDirectory();
+        }
+      });
+
+      if (journalDirs != null) {
+        for (File journalDir : journalDirs) {
+          String jid = journalDir.getName();
+          if (!status.containsKey(jid)) {
+            Map<String, String> jMap = new HashMap<String, String>();
+            jMap.put("Formatted", "true");
+            status.put(jid, jMap);
+          }
         }
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/dd50f539/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
----------------------------------------------------------------------
diff --git 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
index 9bd686f..581218d 100644
--- 
a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
+++ 
b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java
@@ -17,23 +17,14 @@
  */
 package org.apache.hadoop.hdfs.qjournal.server;
 
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-
+import com.google.common.base.Charsets;
+import com.google.common.primitives.Bytes;
+import com.google.common.primitives.Ints;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.qjournal.QJMTestUtil;
 import org.apache.hadoop.hdfs.qjournal.client.IPCLoggerChannel;
@@ -52,16 +43,21 @@ import org.apache.hadoop.util.Shell;
 import org.apache.hadoop.util.StopWatch;
 import org.junit.After;
 import org.junit.Assert;
+import static org.junit.Assert.*;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
-
-import com.google.common.base.Charsets;
-import com.google.common.primitives.Bytes;
-import com.google.common.primitives.Ints;
 import org.junit.rules.TestName;
 import org.mockito.Mockito;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.Collection;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
 
 public class TestJournalNode {
   private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
@@ -87,9 +83,29 @@ public class TestJournalNode {
     File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
         File.separator + "TestJournalNode");
     FileUtil.fullyDelete(editsDir);
-    
-    conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
-        editsDir.getAbsolutePath());
+
+    if (testName.getMethodName().equals("testJournalDirPerNameSpace")) {
+      setFederationConf();
+      conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns1",
+          editsDir + File.separator + "ns1");
+      conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns2",
+          editsDir + File.separator + "ns2");
+    } else if (testName.getMethodName().equals(
+        "testJournalCommonDirAcrossNameSpace")){
+      setFederationConf();
+      conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
+          editsDir.getAbsolutePath());
+    } else if (testName.getMethodName().equals(
+        "testJournalDefaultDirForOneNameSpace")) {
+      FileUtil.fullyDelete(new File(DFSConfigKeys
+          .DFS_JOURNALNODE_EDITS_DIR_DEFAULT));
+      setFederationConf();
+      conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY+ ".ns1",
+          editsDir + File.separator + "ns1");
+    } else {
+      conf.set(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_KEY,
+          editsDir.getAbsolutePath());
+    }
     conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
         "0.0.0.0:0");
     if (testName.getMethodName().equals(
@@ -128,18 +144,102 @@ public class TestJournalNode {
     jn = new JournalNode();
     jn.setConf(conf);
     jn.start();
-    journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId();
-    journal = jn.getOrCreateJournal(journalId);
-    journal.format(FAKE_NSINFO);
+
+
+    if (testName.getMethodName().equals("testJournalDirPerNameSpace") ||
+        testName.getMethodName().equals(
+            "testJournalCommonDirAcrossNameSpace") ||
+        testName.getMethodName().equals(
+            "testJournalDefaultDirForOneNameSpace")) {
+      Collection<String> nameServiceIds = 
DFSUtilClient.getNameServiceIds(conf);
+      for(String nsId: nameServiceIds) {
+        journalId = "test-journalid-" + nsId;
+        journal = jn.getOrCreateJournal(journalId, nsId,
+            HdfsServerConstants.StartupOption.REGULAR);
+        NamespaceInfo fakeNameSpaceInfo = new NamespaceInfo(
+            12345, "mycluster", "my-bp"+nsId, 0L);
+        journal.format(fakeNameSpaceInfo);
+      }
+    } else {
+      journalId = "test-journalid-" + GenericTestUtils.uniqueSequenceId();
+      journal = jn.getOrCreateJournal(journalId);
+      journal.format(FAKE_NSINFO);
+    }
+
     
     ch = new IPCLoggerChannel(conf, FAKE_NSINFO, journalId, 
jn.getBoundIpcAddress());
   }
+
+  private void setFederationConf() {
+    conf.set(DFSConfigKeys.DFS_NAMESERVICES, "ns1, ns2");
+
+    //ns1
+    conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns1", "nn1,nn2");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn1",
+        "qjournal://journalnode0:9900;journalnode1:9901/ns1");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns1" +".nn2",
+        "qjournal://journalnode0:9900;journalnode1:9901/ns2");
+
+    //ns2
+    conf.set(DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + ".ns2", "nn3,nn4");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns2" +".nn3",
+        "qjournal://journalnode0:9900;journalnode1:9901/ns2");
+    conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY +".ns2" +".nn4",
+        "qjournal://journalnode0:9900;journalnode1:9901/ns2");
+  }
   
   @After
   public void teardown() throws Exception {
     jn.stop(0);
   }
-  
+
+  @Test(timeout=100000)
+  public void testJournalDirPerNameSpace() {
+    Collection<String> nameServiceIds = DFSUtilClient.getNameServiceIds(conf);
+    setupStaticHostResolution(2, "journalnode");
+    for (String nsId : nameServiceIds) {
+      String jid = "test-journalid-" + nsId;
+      Journal nsJournal = jn.getJournal(jid);
+      JNStorage journalStorage = nsJournal.getStorage();
+      File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
+          File.separator + "TestJournalNode" + File.separator
+          + nsId + File.separator + jid);
+      assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testJournalCommonDirAcrossNameSpace() {
+    Collection<String> nameServiceIds = DFSUtilClient.getNameServiceIds(conf);
+    setupStaticHostResolution(2, "journalnode");
+    for (String nsId : nameServiceIds) {
+      String jid = "test-journalid-" + nsId;
+      Journal nsJournal = jn.getJournal(jid);
+      JNStorage journalStorage = nsJournal.getStorage();
+      File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
+          File.separator + "TestJournalNode" + File.separator + jid);
+      assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
+    }
+  }
+
+  @Test(timeout=100000)
+  public void testJournalDefaultDirForOneNameSpace() {
+    Collection<String> nameServiceIds = DFSUtilClient.getNameServiceIds(conf);
+    setupStaticHostResolution(2, "journalnode");
+    String jid = "test-journalid-ns1";
+    Journal nsJournal = jn.getJournal(jid);
+    JNStorage journalStorage = nsJournal.getStorage();
+    File editsDir = new File(MiniDFSCluster.getBaseDirectory() +
+        File.separator + "TestJournalNode" + File.separator + "ns1" + File
+        .separator + jid);
+    assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
+    jid = "test-journalid-ns2";
+    nsJournal = jn.getJournal(jid);
+    journalStorage = nsJournal.getStorage();
+    editsDir = new File(DFSConfigKeys.DFS_JOURNALNODE_EDITS_DIR_DEFAULT +
+        File.separator + jid);
+    assertEquals(editsDir.toString(), journalStorage.getRoot().toString());
+  }
   @Test(timeout=100000)
   public void testJournal() throws Exception {
     MetricsRecordBuilder metrics = MetricsAsserts.getMetrics(


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to