HDFS-4025. QJM: Sychronize past log segments to JNs that missed them. Contributed by Hanisha Koneru.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/13d4bcfe Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/13d4bcfe Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/13d4bcfe Branch: refs/heads/HADOOP-13345 Commit: 13d4bcfe3535a2df79c2a56e7578716d15497ff4 Parents: b10e962 Author: Jing Zhao <ji...@apache.org> Authored: Wed Feb 22 16:33:38 2017 -0800 Committer: Jing Zhao <ji...@apache.org> Committed: Wed Feb 22 16:33:38 2017 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 16 + .../qjournal/client/QuorumJournalManager.java | 38 +- .../hadoop/hdfs/qjournal/server/JNStorage.java | 9 +- .../hadoop/hdfs/qjournal/server/Journal.java | 19 + .../hdfs/qjournal/server/JournalNode.java | 23 +- .../hdfs/qjournal/server/JournalNodeSyncer.java | 413 +++++++++++++++++++ .../hadoop/hdfs/server/common/Storage.java | 9 + .../apache/hadoop/hdfs/server/common/Util.java | 46 ++- .../hadoop/hdfs/server/namenode/NNStorage.java | 5 +- .../hdfs/server/namenode/TransferFsImage.java | 3 +- .../src/main/resources/hdfs-default.xml | 41 ++ .../hdfs/qjournal/MiniJournalCluster.java | 8 + .../hadoop/hdfs/qjournal/MiniQJMHACluster.java | 1 + .../hdfs/qjournal/TestJournalNodeSync.java | 264 ++++++++++++ 14 files changed, 853 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index cf1d21a..cfd16aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -707,6 +707,16 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_IMAGE_TRANSFER_CHUNKSIZE_KEY = "dfs.image.transfer.chunksize"; public static final int DFS_IMAGE_TRANSFER_CHUNKSIZE_DEFAULT = 64 * 1024; + // Edit Log segment transfer timeout + public static final String DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY = + "dfs.edit.log.transfer.timeout"; + public static final int DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT = 30 * 1000; + + // Throttling Edit Log Segment transfer for Journal Sync + public static final String DFS_EDIT_LOG_TRANSFER_RATE_KEY = + "dfs.edit.log.transfer.bandwidthPerSec"; + public static final long DFS_EDIT_LOG_TRANSFER_RATE_DEFAULT = 0; //no throttling + // Datanode File IO Stats public static final String DFS_DATANODE_ENABLE_FILEIO_PROFILING_KEY = "dfs.datanode.enable.fileio.profiling"; @@ -891,6 +901,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_JOURNALNODE_KEYTAB_FILE_KEY = "dfs.journalnode.keytab.file"; public static final String DFS_JOURNALNODE_KERBEROS_PRINCIPAL_KEY = "dfs.journalnode.kerberos.principal"; public static final String DFS_JOURNALNODE_KERBEROS_INTERNAL_SPNEGO_PRINCIPAL_KEY = "dfs.journalnode.kerberos.internal.spnego.principal"; + public static final String DFS_JOURNALNODE_ENABLE_SYNC_KEY = + "dfs.journalnode.enable.sync"; + public static final boolean DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT = false; + public static final String DFS_JOURNALNODE_SYNC_INTERVAL_KEY = + "dfs.journalnode.sync.interval"; + public static final long DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT = 2*60*1000L; // Journal-node related configs for the client side. public static final String DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY = "dfs.qjournal.queued-edits.limit.mb"; http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index ae3358b..97c0050 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -21,7 +21,6 @@ import java.io.IOException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URL; -import java.net.UnknownHostException; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -42,6 +41,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRe import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageInfo; +import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.namenode.EditLogFileInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; @@ -51,8 +51,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.web.URLConnectionFactory; -import org.apache.hadoop.net.NetUtils; -import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; @@ -362,41 +360,17 @@ public class QuorumJournalManager implements JournalManager { URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory) throws IOException { List<AsyncLogger> ret = Lists.newArrayList(); - List<InetSocketAddress> addrs = getLoggerAddresses(uri); + List<InetSocketAddress> addrs = Util.getAddressesList(uri); + if (addrs.size() % 2 == 0) { + LOG.warn("Quorum journal URI '" + uri + "' has an even number " + + "of Journal Nodes specified. This is not recommended!"); + } String jid = parseJournalId(uri); for (InetSocketAddress addr : addrs) { ret.add(factory.createLogger(conf, nsInfo, jid, addr)); } return ret; } - - private static List<InetSocketAddress> getLoggerAddresses(URI uri) - throws IOException { - String authority = uri.getAuthority(); - Preconditions.checkArgument(authority != null && !authority.isEmpty(), - "URI has no authority: " + uri); - - String[] parts = StringUtils.split(authority, ';'); - for (int i = 0; i < parts.length; i++) { - parts[i] = parts[i].trim(); - } - - if (parts.length % 2 == 0) { - LOG.warn("Quorum journal URI '" + uri + "' has an even number " + - "of Journal Nodes specified. This is not recommended!"); - } - - List<InetSocketAddress> addrs = Lists.newArrayList(); - for (String addr : parts) { - InetSocketAddress isa = NetUtils.createSocketAddr( - addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT); - if (isa.isUnresolved()) { - throw new UnknownHostException(addr); - } - addrs.add(isa); - } - return addrs; - } @Override public EditLogOutputStream startLogSegment(long txId, int layoutVersion) http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java index 07c9286..8f40f6b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java @@ -49,7 +49,6 @@ class JNStorage extends Storage { private final FileJournalManager fjm; private final StorageDirectory sd; private StorageState state; - private static final List<Pattern> CURRENT_DIR_PURGE_REGEXES = ImmutableList.of( @@ -121,6 +120,14 @@ class JNStorage extends Storage { return new File(sd.getCurrentDir(), name); } + File getTemporaryEditsFile(long startTxId, long endTxId, long timestamp) { + return NNStorage.getTemporaryEditsFile(sd, startTxId, endTxId, timestamp); + } + + File getFinalizedEditsFile(long startTxId, long endTxId) { + return NNStorage.getFinalizedEditsFile(sd, startTxId, endTxId); + } + /** * @return the path for the file which contains persisted data for the * paxos-like recovery process for the given log segment. http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 3760641..ca21373 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -1092,6 +1092,25 @@ public class Journal implements Closeable { committedTxnId.set(startTxId - 1); } + synchronized boolean renameTmpSegment(File tmpFile, File finalFile, + long endTxId) throws IOException { + final boolean success; + if (endTxId <= committedTxnId.get()) { + success = tmpFile.renameTo(finalFile); + if (!success) { + LOG.warn("Unable to rename edits file from " + tmpFile + " to " + + finalFile); + } + } else { + success = false; + LOG.error("The endTxId of the temporary file is not less than the " + + "last committed transaction id. Aborting renaming to final file" + + finalFile); + } + + return success; + } + public Long getJournalCTime() throws IOException { return storage.getJournalManager().getJournalCTime(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java index cde0112..42e9be7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java @@ -68,6 +68,8 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean { private JournalNodeRpcServer rpcServer; private JournalNodeHttpServer httpServer; private final Map<String, Journal> journalsById = Maps.newHashMap(); + private final Map<String, JournalNodeSyncer> journalSyncersById = Maps + .newHashMap(); private ObjectName journalNodeInfoBeanName; private String httpServerURI; private File localDir; @@ -92,11 +94,24 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean { LOG.info("Initializing journal in directory " + logDir); journal = new Journal(conf, logDir, jid, startOpt, new ErrorReporter()); journalsById.put(jid, journal); + + // Start SyncJouranl thread, if JournalNode Sync is enabled + if (conf.getBoolean( + DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, + DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT)) { + startSyncer(journal, jid); + } } - + return journal; } + private void startSyncer(Journal journal, String jid) { + JournalNodeSyncer jSyncer = new JournalNodeSyncer(this, journal, jid, conf); + journalSyncersById.put(jid, jSyncer); + jSyncer.start(); + } + @VisibleForTesting public Journal getOrCreateJournal(String jid) throws IOException { return getOrCreateJournal(jid, StartupOption.REGULAR); @@ -190,7 +205,11 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean { */ public void stop(int rc) { this.resultCode = rc; - + + for (JournalNodeSyncer jSyncer : journalSyncersById.values()) { + jSyncer.stopSync(); + } + if (rpcServer != null) { rpcServer.stop(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java new file mode 100644 index 0000000..f195c00 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -0,0 +1,413 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.server; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.collect.Sets; +import com.google.protobuf.ServiceException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.hdfs.DFSConfigKeys; + +import org.apache.hadoop.hdfs.protocolPB.PBHelper; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos + .JournalIdProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos + .GetEditLogManifestRequestProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos + .GetEditLogManifestResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocolPB.QJournalProtocolPB; +import org.apache.hadoop.hdfs.server.common.Util; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; +import org.apache.hadoop.hdfs.util.DataTransferThrottler; +import org.apache.hadoop.ipc.RPC; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; +import java.util.List; + +/** + * A Journal Sync thread runs through the lifetime of the JN. It periodically + * gossips with other journal nodes to compare edit log manifests and if it + * detects any missing log segment, it downloads it from the other journal node + */ +@InterfaceAudience.Private +public class JournalNodeSyncer { + public static final Logger LOG = LoggerFactory.getLogger( + JournalNodeSyncer.class); + private final JournalNode jn; + private final Journal journal; + private final String jid; + private final JournalIdProto jidProto; + private final JNStorage jnStorage; + private final Configuration conf; + private volatile Daemon syncJournalDaemon; + private volatile boolean shouldSync = true; + + private List<JournalNodeProxy> otherJNProxies = Lists.newArrayList(); + private int numOtherJNs; + private int journalNodeIndexForSync = 0; + private final long journalSyncInterval; + private final int logSegmentTransferTimeout; + private final DataTransferThrottler throttler; + + JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid, + Configuration conf) { + this.jn = jouranlNode; + this.journal = journal; + this.jid = jid; + this.jidProto = convertJournalId(this.jid); + this.jnStorage = journal.getStorage(); + this.conf = conf; + journalSyncInterval = conf.getLong( + DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, + DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_DEFAULT); + logSegmentTransferTimeout = conf.getInt( + DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_KEY, + DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT); + throttler = getThrottler(conf); + } + + void stopSync() { + shouldSync = false; + if (syncJournalDaemon != null) { + syncJournalDaemon.interrupt(); + } + } + + public void start() { + LOG.info("Starting SyncJournal daemon for journal " + jid); + if (getOtherJournalNodeProxies()) { + startSyncJournalsDaemon(); + } else { + LOG.warn("Failed to start SyncJournal daemon for journal " + jid); + } + } + + private boolean getOtherJournalNodeProxies() { + List<InetSocketAddress> otherJournalNodes = getOtherJournalNodeAddrs(); + if (otherJournalNodes == null || otherJournalNodes.isEmpty()) { + LOG.warn("Other JournalNode addresses not available. Journal Syncing " + + "cannot be done"); + return false; + } + for (InetSocketAddress addr : otherJournalNodes) { + try { + otherJNProxies.add(new JournalNodeProxy(addr)); + } catch (IOException e) { + LOG.warn("Could not add proxy for Journal at addresss " + addr, e); + } + } + if (otherJNProxies.isEmpty()) { + LOG.error("Cannot sync as there is no other JN available for sync."); + return false; + } + numOtherJNs = otherJNProxies.size(); + return true; + } + + private void startSyncJournalsDaemon() { + syncJournalDaemon = new Daemon(new Runnable() { + @Override + public void run() { + while(shouldSync) { + try { + if (!journal.isFormatted()) { + LOG.warn("Journal not formatted. Cannot sync."); + } else { + syncJournals(); + } + Thread.sleep(journalSyncInterval); + } catch (Throwable t) { + if (!shouldSync) { + if (t instanceof InterruptedException) { + LOG.info("Stopping JournalNode Sync."); + } else { + LOG.warn("JournalNodeSyncer received an exception while " + + "shutting down.", t); + } + break; + } else { + if (t instanceof InterruptedException) { + LOG.warn("JournalNodeSyncer interrupted", t); + break; + } + } + LOG.error( + "JournalNodeSyncer daemon received Runtime exception. ", t); + } + } + } + }); + syncJournalDaemon.start(); + } + + private void syncJournals() { + syncWithJournalAtIndex(journalNodeIndexForSync); + journalNodeIndexForSync = (journalNodeIndexForSync + 1) % numOtherJNs; + } + + private void syncWithJournalAtIndex(int index) { + LOG.info("Syncing Journal " + jn.getBoundIpcAddress().getAddress() + ":" + + jn.getBoundIpcAddress().getPort() + " with " + + otherJNProxies.get(index) + ", journal id: " + jid); + final QJournalProtocolPB jnProxy = otherJNProxies.get(index).jnProxy; + if (jnProxy == null) { + LOG.error("JournalNode Proxy not found."); + return; + } + + List<RemoteEditLog> thisJournalEditLogs; + try { + thisJournalEditLogs = journal.getEditLogManifest(0, false).getLogs(); + } catch (IOException e) { + LOG.error("Exception in getting local edit log manifest", e); + return; + } + + GetEditLogManifestResponseProto editLogManifest; + try { + editLogManifest = jnProxy.getEditLogManifest(null, + GetEditLogManifestRequestProto.newBuilder().setJid(jidProto) + .setSinceTxId(0) + .setInProgressOk(false).build()); + } catch (ServiceException e) { + LOG.error("Could not sync with Journal at " + + otherJNProxies.get(journalNodeIndexForSync), e); + return; + } + + getMissingLogSegments(thisJournalEditLogs, editLogManifest, + otherJNProxies.get(index)); + } + + private List<InetSocketAddress> getOtherJournalNodeAddrs() { + URI uri = null; + try { + String uriStr = conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY); + if (uriStr == null || uriStr.isEmpty()) { + LOG.warn("Could not construct Shared Edits Uri"); + return null; + } + uri = new URI(uriStr); + return Util.getLoggerAddresses(uri, + Sets.newHashSet(jn.getBoundIpcAddress())); + } catch (URISyntaxException e) { + LOG.error("The conf property " + DFSConfigKeys + .DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " not set properly."); + } catch (IOException e) { + LOG.error("Could not parse JournalNode addresses: " + uri); + } + return null; + } + + private JournalIdProto convertJournalId(String journalId) { + return QJournalProtocolProtos.JournalIdProto.newBuilder() + .setIdentifier(journalId) + .build(); + } + + private void getMissingLogSegments(List<RemoteEditLog> thisJournalEditLogs, + GetEditLogManifestResponseProto response, + JournalNodeProxy remoteJNproxy) { + + List<RemoteEditLog> otherJournalEditLogs = PBHelper.convert( + response.getManifest()).getLogs(); + if (otherJournalEditLogs == null || otherJournalEditLogs.isEmpty()) { + LOG.warn("Journal at " + remoteJNproxy.jnAddr + " has no edit logs"); + return; + } + List<RemoteEditLog> missingLogs = getMissingLogList(thisJournalEditLogs, + otherJournalEditLogs); + + if (!missingLogs.isEmpty()) { + NamespaceInfo nsInfo = jnStorage.getNamespaceInfo(); + + for (RemoteEditLog missingLog : missingLogs) { + URL url = null; + boolean success = false; + try { + if (remoteJNproxy.httpServerUrl == null) { + if (response.hasFromURL()) { + URI uri = URI.create(response.getFromURL()); + remoteJNproxy.httpServerUrl = getHttpServerURI(uri.getScheme(), + uri.getHost(), uri.getPort()); + } else { + remoteJNproxy.httpServerUrl = getHttpServerURI("http", + remoteJNproxy.jnAddr.getHostName(), response.getHttpPort()); + } + } + + String urlPath = GetJournalEditServlet.buildPath(jid, missingLog + .getStartTxId(), nsInfo); + url = new URL(remoteJNproxy.httpServerUrl, urlPath); + success = downloadMissingLogSegment(url, missingLog); + } catch (MalformedURLException e) { + LOG.error("MalformedURL when download missing log segment", e); + } catch (Exception e) { + LOG.error("Exception in downloading missing log segment from url " + + url, e); + } + if (!success) { + LOG.error("Aborting current sync attempt."); + break; + } + } + } + } + + /** + * Returns the logs present in otherJournalEditLogs and missing from + * thisJournalEditLogs. + */ + private List<RemoteEditLog> getMissingLogList( + List<RemoteEditLog> thisJournalEditLogs, + List<RemoteEditLog> otherJournalEditLogs) { + if (thisJournalEditLogs.isEmpty()) { + return otherJournalEditLogs; + } + + List<RemoteEditLog> missingEditLogs = Lists.newArrayList(); + + int thisJnIndex = 0, otherJnIndex = 0; + int thisJnNumLogs = thisJournalEditLogs.size(); + int otherJnNumLogs = otherJournalEditLogs.size(); + + while (thisJnIndex < thisJnNumLogs && otherJnIndex < otherJnNumLogs) { + long localJNstartTxId = thisJournalEditLogs.get(thisJnIndex) + .getStartTxId(); + long remoteJNstartTxId = otherJournalEditLogs.get(otherJnIndex) + .getStartTxId(); + + if (localJNstartTxId == remoteJNstartTxId) { + thisJnIndex++; + otherJnIndex++; + } else if (localJNstartTxId > remoteJNstartTxId) { + missingEditLogs.add(otherJournalEditLogs.get(otherJnIndex)); + otherJnIndex++; + } else { + thisJnIndex++; + } + } + + if (otherJnIndex < otherJnNumLogs) { + for (; otherJnIndex < otherJnNumLogs; otherJnIndex++) { + missingEditLogs.add(otherJournalEditLogs.get(otherJnIndex)); + } + } + + return missingEditLogs; + } + + private URL getHttpServerURI(String scheme, String hostname, int port) + throws MalformedURLException { + return new URL(scheme, hostname, port, ""); + } + + /** + * Transfer an edit log from one journal node to another for sync-up. + */ + private boolean downloadMissingLogSegment(URL url, RemoteEditLog log) throws + IOException { + LOG.info("Downloading missing Edit Log from " + url + " to " + jnStorage + .getRoot()); + + assert log.getStartTxId() > 0 && log.getEndTxId() > 0 : "bad log: " + log; + File finalEditsFile = jnStorage.getFinalizedEditsFile(log.getStartTxId(), + log.getEndTxId()); + + if (finalEditsFile.exists() && FileUtil.canRead(finalEditsFile)) { + LOG.info("Skipping download of remote edit log " + log + " since it's" + + " already stored locally at " + finalEditsFile); + return true; + } + + final long milliTime = Time.monotonicNow(); + File tmpEditsFile = jnStorage.getTemporaryEditsFile(log.getStartTxId(), log + .getEndTxId(), milliTime); + try { + Util.doGetUrl(url, ImmutableList.of(tmpEditsFile), jnStorage, false, + logSegmentTransferTimeout, throttler); + } catch (IOException e) { + LOG.error("Download of Edit Log file for Syncing failed. Deleting temp " + + "file: " + tmpEditsFile); + if (!tmpEditsFile.delete()) { + LOG.warn("Deleting " + tmpEditsFile + " has failed"); + } + return false; + } + LOG.info("Downloaded file " + tmpEditsFile.getName() + " of size " + + tmpEditsFile.length() + " bytes."); + + LOG.debug("Renaming " + tmpEditsFile.getName() + " to " + + finalEditsFile.getName()); + boolean renameSuccess = journal.renameTmpSegment(tmpEditsFile, + finalEditsFile, log.getEndTxId()); + if (!renameSuccess) { + //If rename is not successful, delete the tmpFile + LOG.debug("Renaming unsuccessful. Deleting temporary file: " + + tmpEditsFile); + if (!tmpEditsFile.delete()) { + LOG.warn("Deleting " + tmpEditsFile + " has failed"); + } + return false; + } + return true; + } + + private static DataTransferThrottler getThrottler(Configuration conf) { + long transferBandwidth = + conf.getLong(DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_RATE_KEY, + DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_RATE_DEFAULT); + DataTransferThrottler throttler = null; + if (transferBandwidth > 0) { + throttler = new DataTransferThrottler(transferBandwidth); + } + return throttler; + } + + private class JournalNodeProxy { + private final InetSocketAddress jnAddr; + private final QJournalProtocolPB jnProxy; + private URL httpServerUrl; + + JournalNodeProxy(InetSocketAddress jnAddr) throws IOException { + this.jnAddr = jnAddr; + this.jnProxy = RPC.getProxy(QJournalProtocolPB.class, + RPC.getProtocolVersion(QJournalProtocolPB.class), jnAddr, conf); + } + + @Override + public String toString() { + return jnAddr.toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java index 1af7877..4493772 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.datanode.StorageLocation; +import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIOException; import org.apache.hadoop.util.ToolRunner; @@ -1010,6 +1011,14 @@ public abstract class Storage extends StorageInfo { return false; } + public NamespaceInfo getNamespaceInfo() { + return new NamespaceInfo( + getNamespaceID(), + getClusterID(), + null, + getCTime()); + } + /** * Return true if the layout of the given storage directory is from a version * of Hadoop prior to the introduction of the "current" and "previous" http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java index f08c3fa..9c67f0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Util.java @@ -22,9 +22,11 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.HttpURLConnection; +import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; import java.net.URL; +import java.net.UnknownHostException; import java.security.DigestInputStream; import java.security.MessageDigest; import java.util.ArrayList; @@ -32,18 +34,23 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import com.google.common.base.Preconditions; import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtilClient; import org.apache.hadoop.hdfs.server.namenode.ImageServlet; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.MD5Hash; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.hdfs.web.URLConnectionFactory; @@ -143,7 +150,8 @@ public final class Util { * storage. */ public static MD5Hash doGetUrl(URL url, List<File> localPaths, - Storage dstStorage, boolean getChecksum, int timeout) throws IOException { + Storage dstStorage, boolean getChecksum, int timeout, + DataTransferThrottler throttler) throws IOException { HttpURLConnection connection; try { connection = (HttpURLConnection) @@ -176,7 +184,7 @@ public final class Util { return receiveFile(url.toExternalForm(), localPaths, dstStorage, getChecksum, advertisedSize, advertisedDigest, fsImageName, stream, - null); + throttler); } /** @@ -268,7 +276,7 @@ public final class Util { long xferKb = received / 1024; xferCombined += xferSec; xferStats.append( - String.format(" The fsimage download took %.2fs at %.2f KB/s.", + String.format(" The file download took %.2fs at %.2f KB/s.", xferSec, xferKb / xferSec)); } finally { stream.close(); @@ -301,7 +309,7 @@ public final class Util { advertisedSize); } } - xferStats.insert(0, String.format("Combined time for fsimage download and" + + xferStats.insert(0, String.format("Combined time for file download and" + " fsync to all disks took %.2fs.", xferCombined)); LOG.info(xferStats.toString()); @@ -350,4 +358,34 @@ public final class Util { String header = connection.getHeaderField(MD5_HEADER); return (header != null) ? new MD5Hash(header) : null; } + + public static List<InetSocketAddress> getAddressesList(URI uri) + throws IOException{ + String authority = uri.getAuthority(); + Preconditions.checkArgument(authority != null && !authority.isEmpty(), + "URI has no authority: " + uri); + + String[] parts = StringUtils.split(authority, ';'); + for (int i = 0; i < parts.length; i++) { + parts[i] = parts[i].trim(); + } + + List<InetSocketAddress> addrs = Lists.newArrayList(); + for (String addr : parts) { + InetSocketAddress isa = NetUtils.createSocketAddr( + addr, DFSConfigKeys.DFS_JOURNALNODE_RPC_PORT_DEFAULT); + if (isa.isUnresolved()) { + throw new UnknownHostException(addr); + } + addrs.add(isa); + } + return addrs; + } + + public static List<InetSocketAddress> getLoggerAddresses(URI uri, + Set<InetSocketAddress> addrsToExclude) throws IOException { + List<InetSocketAddress> addrsList = getAddressesList(uri); + addrsList.removeAll(addrsToExclude); + return addrsList; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java index c79ba4a..63d1a28 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java @@ -763,13 +763,13 @@ public class NNStorage extends Storage implements Closeable, return new File(sd.getCurrentDir(), getInProgressEditsFileName(startTxId)); } - static File getFinalizedEditsFile(StorageDirectory sd, + public static File getFinalizedEditsFile(StorageDirectory sd, long startTxId, long endTxId) { return new File(sd.getCurrentDir(), getFinalizedEditsFileName(startTxId, endTxId)); } - static File getTemporaryEditsFile(StorageDirectory sd, + public static File getTemporaryEditsFile(StorageDirectory sd, long startTxId, long endTxId, long timestamp) { return new File(sd.getCurrentDir(), getTemporaryEditsFileName(startTxId, endTxId, timestamp)); @@ -1106,6 +1106,7 @@ public class NNStorage extends Storage implements Closeable, return inspector; } + @Override public NamespaceInfo getNamespaceInfo() { return new NamespaceInfo( getNamespaceID(), http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java index 5821353..7316414 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java @@ -401,7 +401,8 @@ public class TransferFsImage { public static MD5Hash doGetUrl(URL url, List<File> localPaths, Storage dstStorage, boolean getChecksum) throws IOException { - return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout); + return Util.doGetUrl(url, localPaths, dstStorage, getChecksum, timeout, + null); } private static MD5Hash parseMD5Header(HttpServletRequest request) { http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 03f1a08..652b216 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1279,6 +1279,26 @@ </property> <property> + <name>dfs.edit.log.transfer.timeout</name> + <value>30000</value> + <description> + Socket timeout for edit log transfer in milliseconds. This timeout + should be configured such that normal edit log transfer for journal + node syncing can complete successfully. + </description> +</property> + +<property> + <name>dfs.edit.log.transfer.bandwidthPerSec</name> + <value>0</value> + <description> + Maximum bandwidth used for transferring edit log to between journal nodes + for syncing, in bytes per second. + A default value of 0 indicates that throttling is disabled. + </description> +</property> + +<property> <name>dfs.namenode.support.allow.format</name> <value>true</value> <description>Does HDFS namenode allow itself to be formatted? @@ -3785,6 +3805,27 @@ </property> <property> + <name>dfs.journalnode.enable.sync</name> + <value>true</value> + <description> + If true, the journal nodes wil sync with each other. The journal nodes + will periodically gossip with other journal nodes to compare edit log + manifests and if they detect any missing log segment, they will download + it from the other journal nodes. + </description> +</property> + +<property> + <name>dfs.journalnode.sync.interval</name> + <value>120000</value> + <description> + Time interval, in milliseconds, between two Journal Node syncs. + This configuration takes effect only if the journalnode sync is enabled + by setting the configuration parameter dfs.journalnode.enable.sync to true. + </description> +</property> + +<property> <name>dfs.journalnode.kerberos.internal.spnego.principal</name> <value></value> <description> http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java index 7b974c3..2314e22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniJournalCluster.java @@ -255,4 +255,12 @@ public class MiniJournalCluster { } } } + + public void setNamenodeSharedEditsConf(String jid) { + URI quorumJournalURI = getQuorumJournalURI(jid); + for (int i = 0; i < nodes.length; i++) { + nodes[i].node.getConf().set(DFSConfigKeys + .DFS_NAMENODE_SHARED_EDITS_DIR_KEY, quorumJournalURI.toString()); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java index 0764f12..c163894 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/MiniQJMHACluster.java @@ -101,6 +101,7 @@ public class MiniQJMHACluster { journalCluster = new MiniJournalCluster.Builder(conf).format(true) .build(); journalCluster.waitActive(); + journalCluster.setNamenodeSharedEditsConf(NAMESERVICE); URI journalURI = journalCluster.getQuorumJournalURI(NAMESERVICE); // start cluster with specified NameNodes http://git-wip-us.apache.org/repos/asf/hadoop/blob/13d4bcfe/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java new file mode 100644 index 0000000..5375b02 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestJournalNodeSync.java @@ -0,0 +1,264 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal; + +import com.google.common.base.Supplier; +import com.google.common.collect.Lists; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; +import static org.apache.hadoop.hdfs.server.namenode.FileJournalManager + .getLogFile; + +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Random; + +/** + * Unit test for Journal Node formatting upon re-installation and syncing. + */ +public class TestJournalNodeSync { + private MiniQJMHACluster qjmhaCluster; + private MiniDFSCluster dfsCluster; + private MiniJournalCluster jCluster; + private FileSystem fs; + private FSNamesystem namesystem; + private int editsPerformed = 0; + private final String jid = "ns1"; + + @Before + public void setUpMiniCluster() throws IOException { + final Configuration conf = new HdfsConfiguration(); + conf.setLong(DFSConfigKeys.DFS_JOURNALNODE_SYNC_INTERVAL_KEY, 1000L); + qjmhaCluster = new MiniQJMHACluster.Builder(conf).setNumNameNodes(2) + .build(); + dfsCluster = qjmhaCluster.getDfsCluster(); + jCluster = qjmhaCluster.getJournalCluster(); + + dfsCluster.transitionToActive(0); + fs = dfsCluster.getFileSystem(0); + namesystem = dfsCluster.getNamesystem(0); + } + + @After + public void shutDownMiniCluster() throws IOException { + if (qjmhaCluster != null) { + qjmhaCluster.shutdown(); + } + } + + @Test(timeout=30000) + public void testJournalNodeSync() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + // Generate some edit logs and delete one. + long firstTxId = generateEditLog(); + generateEditLog(); + + File missingLog = deleteEditLog(firstJournalCurrentDir, firstTxId); + + GenericTestUtils.waitFor(editLogExists(Lists.newArrayList(missingLog)), + 500, 10000); + } + + @Test(timeout=30000) + public void testSyncForMultipleMissingLogs() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + // Generate some edit logs and delete two. + long firstTxId = generateEditLog(); + long nextTxId = generateEditLog(); + + List<File> missingLogs = Lists.newArrayList(); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId)); + + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000); + } + + @Test(timeout=30000) + public void testSyncForDiscontinuousMissingLogs() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + // Generate some edit logs and delete two discontinuous logs. + long firstTxId = generateEditLog(); + generateEditLog(); + long nextTxId = generateEditLog(); + + List<File> missingLogs = Lists.newArrayList(); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, nextTxId)); + + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000); + } + + @Test(timeout=30000) + public void testMultipleJournalsMissingLogs() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + File secondJournalDir = jCluster.getJournalDir(1, jid); + StorageDirectory sd = new StorageDirectory(secondJournalDir); + File secondJournalCurrentDir = sd.getCurrentDir(); + + // Generate some edit logs and delete one log from two journals. + long firstTxId = generateEditLog(); + generateEditLog(); + + List<File> missingLogs = Lists.newArrayList(); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); + missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId)); + + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 10000); + } + + @Test(timeout=60000) + public void testMultipleJournalsMultipleMissingLogs() throws Exception { + File firstJournalDir = jCluster.getJournalDir(0, jid); + File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) + .getCurrentDir(); + + File secondJournalDir = jCluster.getJournalDir(1, jid); + File secondJournalCurrentDir = new StorageDirectory(secondJournalDir) + .getCurrentDir(); + + File thirdJournalDir = jCluster.getJournalDir(2, jid); + File thirdJournalCurrentDir = new StorageDirectory(thirdJournalDir) + .getCurrentDir(); + + // Generate some edit logs and delete multiple logs in multiple journals. + long firstTxId = generateEditLog(); + long secondTxId = generateEditLog(); + long thirdTxId = generateEditLog(); + + List<File> missingLogs = Lists.newArrayList(); + missingLogs.add(deleteEditLog(firstJournalCurrentDir, firstTxId)); + missingLogs.add(deleteEditLog(secondJournalCurrentDir, firstTxId)); + missingLogs.add(deleteEditLog(secondJournalCurrentDir, secondTxId)); + missingLogs.add(deleteEditLog(thirdJournalCurrentDir, thirdTxId)); + + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + } + + // Test JournalNode Sync by randomly deleting edit logs from one or two of + // the journals. + @Test(timeout=60000) + public void testRandomJournalMissingLogs() throws Exception { + Random randomJournal = new Random(); + + List<File> journalCurrentDirs = Lists.newArrayList(); + + for (int i = 0; i < 3; i++) { + journalCurrentDirs.add(new StorageDirectory(jCluster.getJournalDir(i, + jid)).getCurrentDir()); + } + + int count = 0; + long lastStartTxId; + int journalIndex; + List<File> missingLogs = Lists.newArrayList(); + while (count < 5) { + lastStartTxId = generateEditLog(); + + // Delete the last edit log segment from randomly selected journal node + journalIndex = randomJournal.nextInt(3); + missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex), + lastStartTxId)); + + // Delete the last edit log segment from two journals for some logs + if (count % 2 == 0) { + journalIndex = (journalIndex + 1) % 3; + missingLogs.add(deleteEditLog(journalCurrentDirs.get(journalIndex), + lastStartTxId)); + } + + count++; + } + + GenericTestUtils.waitFor(editLogExists(missingLogs), 500, 30000); + } + + private File deleteEditLog(File currentDir, long startTxId) + throws IOException { + EditLogFile logFile = getLogFile(currentDir, startTxId); + while (logFile.isInProgress()) { + dfsCluster.getNameNode(0).getRpcServer().rollEditLog(); + logFile = getLogFile(currentDir, startTxId); + } + File deleteFile = logFile.getFile(); + Assert.assertTrue("Couldn't delete edit log file", deleteFile.delete()); + + return deleteFile; + } + + /** + * Do a mutative metadata operation on the file system. + * + * @return true if the operation was successful, false otherwise. + */ + private boolean doAnEdit() throws IOException { + return fs.mkdirs(new Path("/tmp", Integer.toString(editsPerformed++))); + } + + /** + * Does an edit and rolls the Edit Log. + * + * @return the startTxId of next segment after rolling edits. + */ + private long generateEditLog() throws IOException { + long startTxId = namesystem.getFSImage().getEditLog().getLastWrittenTxId(); + Assert.assertTrue("Failed to do an edit", doAnEdit()); + dfsCluster.getNameNode(0).getRpcServer().rollEditLog(); + return startTxId; + } + + private Supplier<Boolean> editLogExists(List<File> editLogs) { + Supplier<Boolean> supplier = new Supplier<Boolean>() { + @Override + public Boolean get() { + for (File editLog : editLogs) { + if (!editLog.exists()) { + return false; + } + } + return true; + } + }; + return supplier; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org