HDFS-6634. inotify in HDFS. Contributed by James Thomas.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/faa4455b Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/faa4455b Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/faa4455b Branch: refs/heads/HDFS-6584 Commit: faa4455be512e070fa420084be8d1be5c72f3b08 Parents: 6595e92 Author: Andrew Wang <w...@apache.org> Authored: Tue Sep 2 14:02:29 2014 -0700 Committer: Andrew Wang <w...@apache.org> Committed: Tue Sep 2 14:02:29 2014 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../dev-support/findbugsExcludeFile.xml | 9 + hadoop-hdfs-project/hadoop-hdfs/pom.xml | 1 + .../bkjournal/BookKeeperEditLogInputStream.java | 5 + .../java/org/apache/hadoop/hdfs/DFSClient.java | 9 + .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 6 + .../hadoop/hdfs/DFSInotifyEventInputStream.java | 220 +++++++++ .../hadoop/hdfs/DistributedFileSystem.java | 9 + .../apache/hadoop/hdfs/client/HdfsAdmin.java | 50 ++ .../org/apache/hadoop/hdfs/inotify/Event.java | 452 +++++++++++++++++++ .../apache/hadoop/hdfs/inotify/EventsList.java | 63 +++ .../hdfs/inotify/MissingEventsException.java | 54 +++ .../hadoop/hdfs/protocol/ClientProtocol.java | 18 + ...tNamenodeProtocolServerSideTranslatorPB.java | 25 + .../ClientNamenodeProtocolTranslatorPB.java | 25 + .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 245 ++++++++++ .../hdfs/qjournal/client/IPCLoggerChannel.java | 78 +++- .../hadoop/hdfs/qjournal/server/Journal.java | 3 +- .../namenode/EditLogBackupInputStream.java | 5 + .../server/namenode/EditLogFileInputStream.java | 5 + .../server/namenode/EditLogInputStream.java | 6 + .../hadoop/hdfs/server/namenode/FSEditLog.java | 67 ++- .../server/namenode/FileJournalManager.java | 16 +- .../namenode/InotifyFSEditLogOpTranslator.java | 146 ++++++ .../hadoop/hdfs/server/namenode/JournalSet.java | 38 +- .../hdfs/server/namenode/NameNodeRpcServer.java | 114 +++++ .../namenode/RedundantEditLogInputStream.java | 5 + .../hdfs/server/namenode/TransferFsImage.java | 5 +- .../src/main/proto/ClientNamenodeProtocol.proto | 20 + .../hadoop-hdfs/src/main/proto/inotify.proto | 117 +++++ .../src/main/resources/hdfs-default.xml | 10 + .../hdfs/TestDFSInotifyEventInputStream.java | 430 ++++++++++++++++++ .../hadoop/hdfs/qjournal/MiniQJMHACluster.java | 6 +- .../hdfs/qjournal/client/TestQJMWithFaults.java | 2 +- .../client/TestQuorumJournalManager.java | 2 +- .../hdfs/server/namenode/TestEditLog.java | 4 + 36 files changed, 2214 insertions(+), 58 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 3184e68..ecf6782 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -430,6 +430,8 @@ Release 2.6.0 - UNRELEASED HDFS-6774. Make FsDataset and DataStore support removing volumes. (Lei Xu via atm) + HDFS-6634. inotify in HDFS. (James Thomas via wang) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index 29702d4..bbfb9e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -106,6 +106,15 @@ <Field name="metrics" /> <Bug pattern="IS2_INCONSISTENT_SYNC" /> </Match> + <!-- + We use a separate lock to protect modifications to journalSet so that + FSEditLog#selectInputStreams does not need to be a synchronized method. + --> + <Match> + <Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" /> + <Field name="journalSet" /> + <Bug pattern="IS2_INCONSISTENT_SYNC" /> + </Match> <!-- This method isn't performance-critical and is much clearer to write as it's written. --> http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/pom.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/pom.xml b/hadoop-hdfs-project/hadoop-hdfs/pom.xml index 81eae0a..2c4ddf6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/pom.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/pom.xml @@ -309,6 +309,7 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> <include>fsimage.proto</include> <include>hdfs.proto</include> <include>encryption.proto</include> + <include>inotify.proto</include> </includes> </source> <output>${project.build.directory}/generated-sources/java</output> http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java index bd3ccd4..e2098dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/contrib/bkjournal/src/main/java/org/apache/hadoop/contrib/bkjournal/BookKeeperEditLogInputStream.java @@ -168,6 +168,11 @@ class BookKeeperEditLogInputStream extends EditLogInputStream { reader.setMaxOpSize(maxOpSize); } + @Override + public boolean isLocalLog() { + return false; + } + /** * Input stream implementation which can be used by * FSEditLogOp.Reader http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index c49d210..ce0f133 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -2990,6 +2990,15 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory, } } + public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { + return new DFSInotifyEventInputStream(namenode); + } + + public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid) + throws IOException { + return new DFSInotifyEventInputStream(namenode, lastReadTxid); + } + @Override // RemotePeerFactory public Peer newConnectedPeer(InetSocketAddress addr, Token<BlockTokenIdentifier> blockToken, DatanodeID datanodeId) http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 71a530b..7f96cf0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -673,4 +673,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY = "dfs.datanode.block.id.layout.upgrade.threads"; public static final int DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS = 12; + + public static final String DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_KEY = + "dfs.namenode.inotify.max.events.per.rpc"; + public static final int DFS_NAMENODE_INOTIFY_MAX_EVENTS_PER_RPC_DEFAULT = + 1000; + } http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java new file mode 100644 index 0000000..73c5f55 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs; + +import com.google.common.collect.Iterators; +import com.google.common.util.concurrent.UncheckedExecutionException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.inotify.Event; +import org.apache.hadoop.hdfs.inotify.EventsList; +import org.apache.hadoop.hdfs.inotify.MissingEventsException; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Iterator; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + * Stream for reading inotify events. DFSInotifyEventInputStreams should not + * be shared among multiple threads. + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public class DFSInotifyEventInputStream { + public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream + .class); + + private final ClientProtocol namenode; + private Iterator<Event> it; + private long lastReadTxid; + /** + * The most recent txid the NameNode told us it has sync'ed -- helps us + * determine how far behind we are in the edit stream. + */ + private long syncTxid; + /** + * Used to generate wait times in {@link DFSInotifyEventInputStream#take()}. + */ + private Random rng = new Random(); + + private static final int INITIAL_WAIT_MS = 10; + + DFSInotifyEventInputStream(ClientProtocol namenode) throws IOException { + this(namenode, namenode.getCurrentEditLogTxid()); // only consider new txn's + } + + DFSInotifyEventInputStream(ClientProtocol namenode, long lastReadTxid) + throws IOException { + this.namenode = namenode; + this.it = Iterators.emptyIterator(); + this.lastReadTxid = lastReadTxid; + } + + /** + * Returns the next event in the stream or null if no new events are currently + * available. + * + * @throws IOException because of network error or edit log + * corruption. Also possible if JournalNodes are unresponsive in the + * QJM setting (even one unresponsive JournalNode is enough in rare cases), + * so catching this exception and retrying at least a few times is + * recommended. + * @throws MissingEventsException if we cannot return the next event in the + * stream because the data for the event (and possibly some subsequent events) + * has been deleted (generally because this stream is a very large number of + * events behind the current state of the NameNode). It is safe to continue + * reading from the stream after this exception is thrown -- the next + * available event will be returned. + */ + public Event poll() throws IOException, MissingEventsException { + // need to keep retrying until the NN sends us the latest committed txid + if (lastReadTxid == -1) { + LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); + lastReadTxid = namenode.getCurrentEditLogTxid(); + return null; + } + if (!it.hasNext()) { + EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1); + if (el.getLastTxid() != -1) { + // we only want to set syncTxid when we were actually able to read some + // edits on the NN -- otherwise it will seem like edits are being + // generated faster than we can read them when the problem is really + // that we are temporarily unable to read edits + syncTxid = el.getSyncTxid(); + it = el.getEvents().iterator(); + long formerLastReadTxid = lastReadTxid; + lastReadTxid = el.getLastTxid(); + if (el.getFirstTxid() != formerLastReadTxid + 1) { + throw new MissingEventsException(formerLastReadTxid + 1, + el.getFirstTxid()); + } + } else { + LOG.debug("poll(): read no edits from the NN when requesting edits " + + "after txid {}", lastReadTxid); + return null; + } + } + + if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the + // newly seen edit log ops actually got converted to events + return it.next(); + } else { + return null; + } + } + + /** + * Return a estimate of how many events behind the NameNode's current state + * this stream is. Clients should periodically call this method and check if + * its result is steadily increasing, which indicates that they are falling + * behind (i.e. events are being generated faster than the client is reading + * them). If a client falls too far behind events may be deleted before the + * client can read them. + * <p/> + * A return value of -1 indicates that an estimate could not be produced, and + * should be ignored. The value returned by this method is really only useful + * when compared to previous or subsequent returned values. + */ + public long getEventsBehindEstimate() { + if (syncTxid == 0) { + return -1; + } else { + assert syncTxid >= lastReadTxid; + // this gives the difference between the last txid we have fetched to the + // client and syncTxid at the time we last fetched events from the + // NameNode + return syncTxid - lastReadTxid; + } + } + + /** + * Returns the next event in the stream, waiting up to the specified amount of + * time for a new event. Returns null if a new event is not available at the + * end of the specified amount of time. The time before the method returns may + * exceed the specified amount of time by up to the time required for an RPC + * to the NameNode. + * + * @param time number of units of the given TimeUnit to wait + * @param tu the desired TimeUnit + * @throws IOException see {@link DFSInotifyEventInputStream#poll()} + * @throws MissingEventsException + * see {@link DFSInotifyEventInputStream#poll()} + * @throws InterruptedException if the calling thread is interrupted + */ + public Event poll(long time, TimeUnit tu) throws IOException, + InterruptedException, MissingEventsException { + long initialTime = Time.monotonicNow(); + long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); + long nextWait = INITIAL_WAIT_MS; + Event next = null; + while ((next = poll()) == null) { + long timeLeft = totalWait - (Time.monotonicNow() - initialTime); + if (timeLeft <= 0) { + LOG.debug("timed poll(): timed out"); + break; + } else if (timeLeft < nextWait * 2) { + nextWait = timeLeft; + } else { + nextWait *= 2; + } + LOG.debug("timed poll(): poll() returned null, sleeping for {} ms", + nextWait); + Thread.sleep(nextWait); + } + + return next; + } + + /** + * Returns the next event in the stream, waiting indefinitely if a new event + * is not immediately available. + * + * @throws IOException see {@link DFSInotifyEventInputStream#poll()} + * @throws MissingEventsException see + * {@link DFSInotifyEventInputStream#poll()} + * @throws InterruptedException if the calling thread is interrupted + */ + public Event take() throws IOException, InterruptedException, + MissingEventsException { + Event next = null; + int nextWaitMin = INITIAL_WAIT_MS; + while ((next = poll()) == null) { + // sleep for a random period between nextWaitMin and nextWaitMin * 2 + // to avoid stampedes at the NN if there are multiple clients + int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin); + LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime); + Thread.sleep(sleepTime); + // the maximum sleep is 2 minutes + nextWaitMin = Math.min(60000, nextWaitMin * 2); + } + + return next; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java index 354640b..fc4bd84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java @@ -1940,4 +1940,13 @@ public class DistributedFileSystem extends FileSystem { } }.resolve(this, absF); } + + public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { + return dfs.getInotifyEventStream(); + } + + public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid) + throws IOException { + return dfs.getInotifyEventStream(lastReadTxid); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java index 1adfc1b..fdc466a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/HdfsAdmin.java @@ -29,6 +29,7 @@ import org.apache.hadoop.fs.CacheFlag; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.hdfs.DFSInotifyEventInputStream; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -275,4 +276,53 @@ public class HdfsAdmin { throws IOException { return dfs.listEncryptionZones(); } + + /** + * Exposes a stream of namesystem events. Only events occurring after the + * stream is created are available. + * See {@link org.apache.hadoop.hdfs.DFSInotifyEventInputStream} + * for information on stream usage. + * See {@link org.apache.hadoop.hdfs.inotify.Event} + * for information on the available events. + * <p/> + * Inotify users may want to tune the following HDFS parameters to + * ensure that enough extra HDFS edits are saved to support inotify clients + * that fall behind the current state of the namespace while reading events. + * The default parameter values should generally be reasonable. If edits are + * deleted before their corresponding events can be read, clients will see a + * {@link org.apache.hadoop.hdfs.inotify.MissingEventsException} on + * {@link org.apache.hadoop.hdfs.DFSInotifyEventInputStream} method calls. + * + * It should generally be sufficient to tune these parameters: + * dfs.namenode.num.extra.edits.retained + * dfs.namenode.max.extra.edits.segments.retained + * + * Parameters that affect the number of created segments and the number of + * edits that are considered necessary, i.e. do not count towards the + * dfs.namenode.num.extra.edits.retained quota): + * dfs.namenode.checkpoint.period + * dfs.namenode.checkpoint.txns + * dfs.namenode.num.checkpoints.retained + * dfs.ha.log-roll.period + * <p/> + * It is recommended that local journaling be configured + * (dfs.namenode.edits.dir) for inotify (in addition to a shared journal) + * so that edit transfers from the shared journal can be avoided. + * + * @throws IOException If there was an error obtaining the stream. + */ + public DFSInotifyEventInputStream getInotifyEventStream() throws IOException { + return dfs.getInotifyEventStream(); + } + + /** + * A version of {@link HdfsAdmin#getInotifyEventStream()} meant for advanced + * users who are aware of HDFS edits up to lastReadTxid (e.g. because they + * have access to an FSImage inclusive of lastReadTxid) and only want to read + * events after this point. + */ + public DFSInotifyEventInputStream getInotifyEventStream(long lastReadTxid) + throws IOException { + return dfs.getInotifyEventStream(lastReadTxid); + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java new file mode 100644 index 0000000..c7129ca --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/Event.java @@ -0,0 +1,452 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.inotify; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.XAttr; +import org.apache.hadoop.fs.permission.AclEntry; +import org.apache.hadoop.fs.permission.FsPermission; + +import java.util.List; + +/** + * Events sent by the inotify system. Note that no events are necessarily sent + * when a file is opened for read (although a MetadataUpdateEvent will be sent + * if the atime is updated). + */ +@InterfaceAudience.Public +@InterfaceStability.Unstable +public abstract class Event { + public static enum EventType { + CREATE, CLOSE, APPEND, RENAME, METADATA, UNLINK + } + + private EventType eventType; + + public EventType getEventType() { + return eventType; + } + + public Event(EventType eventType) { + this.eventType = eventType; + } + + /** + * Sent when a file is closed after append or create. + */ + public static class CloseEvent extends Event { + private String path; + private long fileSize; + private long timestamp; + + public CloseEvent(String path, long fileSize, long timestamp) { + super(EventType.CLOSE); + this.path = path; + this.fileSize = fileSize; + this.timestamp = timestamp; + } + + public String getPath() { + return path; + } + + /** + * The size of the closed file in bytes. May be -1 if the size is not + * available (e.g. in the case of a close generated by a concat operation). + */ + public long getFileSize() { + return fileSize; + } + + /** + * The time when this event occurred, in milliseconds since the epoch. + */ + public long getTimestamp() { + return timestamp; + } + } + + /** + * Sent when a new file is created (including overwrite). + */ + public static class CreateEvent extends Event { + + public static enum INodeType { + FILE, DIRECTORY, SYMLINK; + } + + private INodeType iNodeType; + private String path; + private long ctime; + private int replication; + private String ownerName; + private String groupName; + private FsPermission perms; + private String symlinkTarget; + + public static class Builder { + private INodeType iNodeType; + private String path; + private long ctime; + private int replication; + private String ownerName; + private String groupName; + private FsPermission perms; + private String symlinkTarget; + + public Builder iNodeType(INodeType type) { + this.iNodeType = type; + return this; + } + + public Builder path(String path) { + this.path = path; + return this; + } + + public Builder ctime(long ctime) { + this.ctime = ctime; + return this; + } + + public Builder replication(int replication) { + this.replication = replication; + return this; + } + + public Builder ownerName(String ownerName) { + this.ownerName = ownerName; + return this; + } + + public Builder groupName(String groupName) { + this.groupName = groupName; + return this; + } + + public Builder perms(FsPermission perms) { + this.perms = perms; + return this; + } + + public Builder symlinkTarget(String symlinkTarget) { + this.symlinkTarget = symlinkTarget; + return this; + } + + public CreateEvent build() { + return new CreateEvent(this); + } + } + + private CreateEvent(Builder b) { + super(EventType.CREATE); + this.iNodeType = b.iNodeType; + this.path = b.path; + this.ctime = b.ctime; + this.replication = b.replication; + this.ownerName = b.ownerName; + this.groupName = b.groupName; + this.perms = b.perms; + this.symlinkTarget = b.symlinkTarget; + } + + public INodeType getiNodeType() { + return iNodeType; + } + + public String getPath() { + return path; + } + + /** + * Creation time of the file, directory, or symlink. + */ + public long getCtime() { + return ctime; + } + + /** + * Replication is zero if the CreateEvent iNodeType is directory or symlink. + */ + public int getReplication() { + return replication; + } + + public String getOwnerName() { + return ownerName; + } + + public String getGroupName() { + return groupName; + } + + public FsPermission getPerms() { + return perms; + } + + /** + * Symlink target is null if the CreateEvent iNodeType is not symlink. + */ + public String getSymlinkTarget() { + return symlinkTarget; + } + } + + /** + * Sent when there is an update to directory or file (none of the metadata + * tracked here applies to symlinks) that is not associated with another + * inotify event. The tracked metadata includes atime/mtime, replication, + * owner/group, permissions, ACLs, and XAttributes. Fields not relevant to the + * metadataType of the MetadataUpdateEvent will be null or will have their default + * values. + */ + public static class MetadataUpdateEvent extends Event { + + public static enum MetadataType { + TIMES, REPLICATION, OWNER, PERMS, ACLS, XATTRS; + } + + private String path; + private MetadataType metadataType; + private long mtime; + private long atime; + private int replication; + private String ownerName; + private String groupName; + private FsPermission perms; + private List<AclEntry> acls; + private List<XAttr> xAttrs; + private boolean xAttrsRemoved; + + public static class Builder { + private String path; + private MetadataType metadataType; + private long mtime; + private long atime; + private int replication; + private String ownerName; + private String groupName; + private FsPermission perms; + private List<AclEntry> acls; + private List<XAttr> xAttrs; + private boolean xAttrsRemoved; + + public Builder path(String path) { + this.path = path; + return this; + } + + public Builder metadataType(MetadataType type) { + this.metadataType = type; + return this; + } + + public Builder mtime(long mtime) { + this.mtime = mtime; + return this; + } + + public Builder atime(long atime) { + this.atime = atime; + return this; + } + + public Builder replication(int replication) { + this.replication = replication; + return this; + } + + public Builder ownerName(String ownerName) { + this.ownerName = ownerName; + return this; + } + + public Builder groupName(String groupName) { + this.groupName = groupName; + return this; + } + + public Builder perms(FsPermission perms) { + this.perms = perms; + return this; + } + + public Builder acls(List<AclEntry> acls) { + this.acls = acls; + return this; + } + + public Builder xAttrs(List<XAttr> xAttrs) { + this.xAttrs = xAttrs; + return this; + } + + public Builder xAttrsRemoved(boolean xAttrsRemoved) { + this.xAttrsRemoved = xAttrsRemoved; + return this; + } + + public MetadataUpdateEvent build() { + return new MetadataUpdateEvent(this); + } + } + + private MetadataUpdateEvent(Builder b) { + super(EventType.METADATA); + this.path = b.path; + this.metadataType = b.metadataType; + this.mtime = b.mtime; + this.atime = b.atime; + this.replication = b.replication; + this.ownerName = b.ownerName; + this.groupName = b.groupName; + this.perms = b.perms; + this.acls = b.acls; + this.xAttrs = b.xAttrs; + this.xAttrsRemoved = b.xAttrsRemoved; + } + + public String getPath() { + return path; + } + + public MetadataType getMetadataType() { + return metadataType; + } + + public long getMtime() { + return mtime; + } + + public long getAtime() { + return atime; + } + + public int getReplication() { + return replication; + } + + public String getOwnerName() { + return ownerName; + } + + public String getGroupName() { + return groupName; + } + + public FsPermission getPerms() { + return perms; + } + + /** + * The full set of ACLs currently associated with this file or directory. + * May be null if all ACLs were removed. + */ + public List<AclEntry> getAcls() { + return acls; + } + + public List<XAttr> getxAttrs() { + return xAttrs; + } + + /** + * Whether the xAttrs returned by getxAttrs() were removed (as opposed to + * added). + */ + public boolean isxAttrsRemoved() { + return xAttrsRemoved; + } + + } + + /** + * Sent when a file, directory, or symlink is renamed. + */ + public static class RenameEvent extends Event { + private String srcPath; + private String dstPath; + private long timestamp; + + public RenameEvent(String srcPath, String dstPath, long timestamp) { + super(EventType.RENAME); + this.srcPath = srcPath; + this.dstPath = dstPath; + this.timestamp = timestamp; + } + + public String getSrcPath() { + return srcPath; + } + + public String getDstPath() { + return dstPath; + } + + /** + * The time when this event occurred, in milliseconds since the epoch. + */ + public long getTimestamp() { + return timestamp; + } + } + + /** + * Sent when an existing file is opened for append. + */ + public static class AppendEvent extends Event { + private String path; + + public AppendEvent(String path) { + super(EventType.APPEND); + this.path = path; + } + + public String getPath() { + return path; + } + } + + /** + * Sent when a file, directory, or symlink is deleted. + */ + public static class UnlinkEvent extends Event { + private String path; + private long timestamp; + + public UnlinkEvent(String path, long timestamp) { + super(EventType.UNLINK); + this.path = path; + this.timestamp = timestamp; + } + + public String getPath() { + return path; + } + + /** + * The time when this event occurred, in milliseconds since the epoch. + */ + public long getTimestamp() { + return timestamp; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java new file mode 100644 index 0000000..6d02d3c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.inotify; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.util.List; + +/** + * Contains a set of events, the transaction ID in the edit log up to which we + * read to produce these events, and the first txid we observed when producing + * these events (the last of which is for the purpose of determining whether we + * have missed events due to edit deletion). Also contains the most recent txid + * that the NameNode has sync'ed, so the client can determine how far behind in + * the edit log it is. + */ +@InterfaceAudience.Private +public class EventsList { + private List<Event> events; + private long firstTxid; + private long lastTxid; + private long syncTxid; + + public EventsList(List<Event> events, long firstTxid, long lastTxid, + long syncTxid) { + this.events = events; + this.firstTxid = firstTxid; + this.lastTxid = lastTxid; + this.syncTxid = syncTxid; + } + + public List<Event> getEvents() { + return events; + } + + public long getFirstTxid() { + return firstTxid; + } + + public long getLastTxid() { + return lastTxid; + } + + public long getSyncTxid() { + return syncTxid; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java new file mode 100644 index 0000000..e4b51c5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.inotify; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MissingEventsException extends Exception { + private static final long serialVersionUID = 1L; + + private long expectedTxid; + private long actualTxid; + + public MissingEventsException() {} + + public MissingEventsException(long expectedTxid, long actualTxid) { + this.expectedTxid = expectedTxid; + this.actualTxid = actualTxid; + } + + public long getExpectedTxid() { + return expectedTxid; + } + + public long getActualTxid() { + return actualTxid; + } + + @Override + public String toString() { + return "We expected the next batch of events to start with transaction ID " + + expectedTxid + ", but it instead started with transaction ID " + + actualTxid + ". Most likely the intervening transactions were cleaned " + + "up as part of checkpointing."; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index ef0ac55..093afcf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -43,10 +43,13 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.inotify.Event; +import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; @@ -1372,4 +1375,19 @@ public interface ClientProtocol { */ @Idempotent public void checkAccess(String path, FsAction mode) throws IOException; + + /** + * Get the highest txid the NameNode knows has been written to the edit + * log, or -1 if the NameNode's edit log is not yet open for write. Used as + * the starting point for the inotify event stream. + */ + @Idempotent + public long getCurrentEditLogTxid() throws IOException; + + /** + * Get an ordered list of events corresponding to the edit log transactions + * from txid onwards. + */ + @Idempotent + public EventsList getEditsFromTxid(long txid) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java index 40dd8f0..a162ec5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolServerSideTranslatorPB.java @@ -91,12 +91,16 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlo import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportResponseProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto; @@ -1408,4 +1412,25 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements } return VOID_CHECKACCESS_RESPONSE; } + + public GetCurrentEditLogTxidResponseProto getCurrentEditLogTxid(RpcController controller, + GetCurrentEditLogTxidRequestProto req) throws ServiceException { + try { + return GetCurrentEditLogTxidResponseProto.newBuilder().setTxid( + server.getCurrentEditLogTxid()).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + @Override + public GetEditsFromTxidResponseProto getEditsFromTxid(RpcController controller, + GetEditsFromTxidRequestProto req) throws ServiceException { + try { + return PBHelper.convertEditsResponse(server.getEditsFromTxid( + req.getTxid())); + } catch (IOException e) { + throw new ServiceException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 210828d..79c4fcf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; +import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -95,10 +96,12 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdd import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetCurrentEditLogTxidRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDataEncryptionKeyResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeStorageReportRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto; @@ -159,6 +162,7 @@ import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.RemoveXAttrRequestProto import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.SetXAttrRequestProto; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; +import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp; import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException; import org.apache.hadoop.hdfs.server.namenode.SafeModeException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; @@ -1430,4 +1434,25 @@ public class ClientNamenodeProtocolTranslatorPB implements throw ProtobufHelper.getRemoteException(e); } } + + public long getCurrentEditLogTxid() throws IOException { + GetCurrentEditLogTxidRequestProto req = GetCurrentEditLogTxidRequestProto + .getDefaultInstance(); + try { + return rpcProxy.getCurrentEditLogTxid(null, req).getTxid(); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + @Override + public EventsList getEditsFromTxid(long txid) throws IOException { + GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder() + .setTxid(txid).build(); + try { + return PBHelper.convert(rpcProxy.getEditsFromTxid(null, req)); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 4dcac39..38ba7db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -46,6 +46,8 @@ import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.inotify.Event; +import org.apache.hadoop.hdfs.inotify.EventsList; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -96,6 +98,7 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheP import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeStorageReportProto; +import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetEditsFromTxidResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeActionProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RollingUpgradeInfoProto; @@ -158,6 +161,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageReportProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypesProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto; +import org.apache.hadoop.hdfs.protocol.proto.InotifyProtos; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.GetXAttrsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.XAttrProtos.ListXAttrsResponseProto; @@ -2335,6 +2339,247 @@ public class PBHelper { return new ShmId(shmId.getHi(), shmId.getLo()); } + private static Event.CreateEvent.INodeType createTypeConvert(InotifyProtos.INodeType + type) { + switch (type) { + case I_TYPE_DIRECTORY: + return Event.CreateEvent.INodeType.DIRECTORY; + case I_TYPE_FILE: + return Event.CreateEvent.INodeType.FILE; + case I_TYPE_SYMLINK: + return Event.CreateEvent.INodeType.SYMLINK; + default: + return null; + } + } + + private static InotifyProtos.MetadataUpdateType metadataUpdateTypeConvert( + Event.MetadataUpdateEvent.MetadataType type) { + switch (type) { + case TIMES: + return InotifyProtos.MetadataUpdateType.META_TYPE_TIMES; + case REPLICATION: + return InotifyProtos.MetadataUpdateType.META_TYPE_REPLICATION; + case OWNER: + return InotifyProtos.MetadataUpdateType.META_TYPE_OWNER; + case PERMS: + return InotifyProtos.MetadataUpdateType.META_TYPE_PERMS; + case ACLS: + return InotifyProtos.MetadataUpdateType.META_TYPE_ACLS; + case XATTRS: + return InotifyProtos.MetadataUpdateType.META_TYPE_XATTRS; + default: + return null; + } + } + + private static Event.MetadataUpdateEvent.MetadataType metadataUpdateTypeConvert( + InotifyProtos.MetadataUpdateType type) { + switch (type) { + case META_TYPE_TIMES: + return Event.MetadataUpdateEvent.MetadataType.TIMES; + case META_TYPE_REPLICATION: + return Event.MetadataUpdateEvent.MetadataType.REPLICATION; + case META_TYPE_OWNER: + return Event.MetadataUpdateEvent.MetadataType.OWNER; + case META_TYPE_PERMS: + return Event.MetadataUpdateEvent.MetadataType.PERMS; + case META_TYPE_ACLS: + return Event.MetadataUpdateEvent.MetadataType.ACLS; + case META_TYPE_XATTRS: + return Event.MetadataUpdateEvent.MetadataType.XATTRS; + default: + return null; + } + } + + private static InotifyProtos.INodeType createTypeConvert(Event.CreateEvent.INodeType + type) { + switch (type) { + case DIRECTORY: + return InotifyProtos.INodeType.I_TYPE_DIRECTORY; + case FILE: + return InotifyProtos.INodeType.I_TYPE_FILE; + case SYMLINK: + return InotifyProtos.INodeType.I_TYPE_SYMLINK; + default: + return null; + } + } + + public static EventsList convert(GetEditsFromTxidResponseProto resp) throws + IOException { + List<Event> events = Lists.newArrayList(); + for (InotifyProtos.EventProto p : resp.getEventsList().getEventsList()) { + switch(p.getType()) { + case EVENT_CLOSE: + InotifyProtos.CloseEventProto close = + InotifyProtos.CloseEventProto.parseFrom(p.getContents()); + events.add(new Event.CloseEvent(close.getPath(), close.getFileSize(), + close.getTimestamp())); + break; + case EVENT_CREATE: + InotifyProtos.CreateEventProto create = + InotifyProtos.CreateEventProto.parseFrom(p.getContents()); + events.add(new Event.CreateEvent.Builder() + .iNodeType(createTypeConvert(create.getType())) + .path(create.getPath()) + .ctime(create.getCtime()) + .ownerName(create.getOwnerName()) + .groupName(create.getGroupName()) + .perms(convert(create.getPerms())) + .replication(create.getReplication()) + .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null : + create.getSymlinkTarget()).build()); + break; + case EVENT_METADATA: + InotifyProtos.MetadataUpdateEventProto meta = + InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents()); + events.add(new Event.MetadataUpdateEvent.Builder() + .path(meta.getPath()) + .metadataType(metadataUpdateTypeConvert(meta.getType())) + .mtime(meta.getMtime()) + .atime(meta.getAtime()) + .replication(meta.getReplication()) + .ownerName( + meta.getOwnerName().isEmpty() ? null : meta.getOwnerName()) + .groupName( + meta.getGroupName().isEmpty() ? null : meta.getGroupName()) + .perms(meta.hasPerms() ? convert(meta.getPerms()) : null) + .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry( + meta.getAclsList())) + .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs( + meta.getXAttrsList())) + .xAttrsRemoved(meta.getXAttrsRemoved()) + .build()); + break; + case EVENT_RENAME: + InotifyProtos.RenameEventProto rename = + InotifyProtos.RenameEventProto.parseFrom(p.getContents()); + events.add(new Event.RenameEvent(rename.getSrcPath(), rename.getDestPath(), + rename.getTimestamp())); + break; + case EVENT_APPEND: + InotifyProtos.AppendEventProto reopen = + InotifyProtos.AppendEventProto.parseFrom(p.getContents()); + events.add(new Event.AppendEvent(reopen.getPath())); + break; + case EVENT_UNLINK: + InotifyProtos.UnlinkEventProto unlink = + InotifyProtos.UnlinkEventProto.parseFrom(p.getContents()); + events.add(new Event.UnlinkEvent(unlink.getPath(), unlink.getTimestamp())); + break; + default: + throw new RuntimeException("Unexpected inotify event type: " + + p.getType()); + } + } + return new EventsList(events, resp.getEventsList().getFirstTxid(), + resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid()); + } + + public static GetEditsFromTxidResponseProto convertEditsResponse(EventsList el) { + InotifyProtos.EventsListProto.Builder builder = + InotifyProtos.EventsListProto.newBuilder(); + for (Event e : el.getEvents()) { + switch(e.getEventType()) { + case CLOSE: + Event.CloseEvent ce = (Event.CloseEvent) e; + builder.addEvents(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_CLOSE) + .setContents( + InotifyProtos.CloseEventProto.newBuilder() + .setPath(ce.getPath()) + .setFileSize(ce.getFileSize()) + .setTimestamp(ce.getTimestamp()).build().toByteString() + ).build()); + break; + case CREATE: + Event.CreateEvent ce2 = (Event.CreateEvent) e; + builder.addEvents(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_CREATE) + .setContents( + InotifyProtos.CreateEventProto.newBuilder() + .setType(createTypeConvert(ce2.getiNodeType())) + .setPath(ce2.getPath()) + .setCtime(ce2.getCtime()) + .setOwnerName(ce2.getOwnerName()) + .setGroupName(ce2.getGroupName()) + .setPerms(convert(ce2.getPerms())) + .setReplication(ce2.getReplication()) + .setSymlinkTarget(ce2.getSymlinkTarget() == null ? + "" : ce2.getSymlinkTarget()).build().toByteString() + ).build()); + break; + case METADATA: + Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e; + InotifyProtos.MetadataUpdateEventProto.Builder metaB = + InotifyProtos.MetadataUpdateEventProto.newBuilder() + .setPath(me.getPath()) + .setType(metadataUpdateTypeConvert(me.getMetadataType())) + .setMtime(me.getMtime()) + .setAtime(me.getAtime()) + .setReplication(me.getReplication()) + .setOwnerName(me.getOwnerName() == null ? "" : + me.getOwnerName()) + .setGroupName(me.getGroupName() == null ? "" : + me.getGroupName()) + .addAllAcls(me.getAcls() == null ? + Lists.<AclEntryProto>newArrayList() : + convertAclEntryProto(me.getAcls())) + .addAllXAttrs(me.getxAttrs() == null ? + Lists.<XAttrProto>newArrayList() : + convertXAttrProto(me.getxAttrs())) + .setXAttrsRemoved(me.isxAttrsRemoved()); + if (me.getPerms() != null) { + metaB.setPerms(convert(me.getPerms())); + } + builder.addEvents(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_METADATA) + .setContents(metaB.build().toByteString()) + .build()); + break; + case RENAME: + Event.RenameEvent re = (Event.RenameEvent) e; + builder.addEvents(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_RENAME) + .setContents( + InotifyProtos.RenameEventProto.newBuilder() + .setSrcPath(re.getSrcPath()) + .setDestPath(re.getDstPath()) + .setTimestamp(re.getTimestamp()).build().toByteString() + ).build()); + break; + case APPEND: + Event.AppendEvent re2 = (Event.AppendEvent) e; + builder.addEvents(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_APPEND) + .setContents( + InotifyProtos.AppendEventProto.newBuilder() + .setPath(re2.getPath()).build().toByteString() + ).build()); + break; + case UNLINK: + Event.UnlinkEvent ue = (Event.UnlinkEvent) e; + builder.addEvents(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_UNLINK) + .setContents( + InotifyProtos.UnlinkEventProto.newBuilder() + .setPath(ue.getPath()) + .setTimestamp(ue.getTimestamp()).build().toByteString() + ).build()); + break; + default: + throw new RuntimeException("Unexpected inotify event: " + e); + } + } + builder.setFirstTxid(el.getFirstTxid()); + builder.setLastTxid(el.getLastTxid()); + builder.setSyncTxid(el.getSyncTxid()); + return GetEditsFromTxidResponseProto.newBuilder().setEventsList( + builder.build()).build(); + } + public static HdfsProtos.CipherSuite convert(CipherSuite suite) { switch (suite) { case UNKNOWN: http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 0196c5b..e37869c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -79,7 +79,17 @@ public class IPCLoggerChannel implements AsyncLogger { protected final InetSocketAddress addr; private QJournalProtocol proxy; - private final ListeningExecutorService executor; + /** + * Executes tasks submitted to it serially, on a single thread, in FIFO order + * (generally used for write tasks that should not be reordered). + */ + private final ListeningExecutorService singleThreadExecutor; + /** + * Executes tasks submitted to it in parallel with each other and with those + * submitted to singleThreadExecutor (generally used for read tasks that can + * be safely reordered and interleaved with writes). + */ + private final ListeningExecutorService parallelExecutor; private long ipcSerial = 0; private long epoch = -1; private long committedTxId = HdfsConstants.INVALID_TXID; @@ -160,8 +170,10 @@ public class IPCLoggerChannel implements AsyncLogger { DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT); - executor = MoreExecutors.listeningDecorator( - createExecutor()); + singleThreadExecutor = MoreExecutors.listeningDecorator( + createSingleThreadExecutor()); + parallelExecutor = MoreExecutors.listeningDecorator( + createParallelExecutor()); metrics = IPCLoggerChannelMetrics.create(this); } @@ -183,7 +195,8 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public void close() { // No more tasks may be submitted after this point. - executor.shutdown(); + singleThreadExecutor.shutdown(); + parallelExecutor.shutdown(); if (proxy != null) { // TODO: this can hang for quite some time if the client // is currently in the middle of a call to a downed JN. @@ -230,15 +243,30 @@ public class IPCLoggerChannel implements AsyncLogger { * Separated out for easy overriding in tests. */ @VisibleForTesting - protected ExecutorService createExecutor() { + protected ExecutorService createSingleThreadExecutor() { return Executors.newSingleThreadExecutor( new ThreadFactoryBuilder() .setDaemon(true) - .setNameFormat("Logger channel to " + addr) + .setNameFormat("Logger channel (from single-thread executor) to " + + addr) .setUncaughtExceptionHandler( UncaughtExceptionHandlers.systemExit()) .build()); } + + /** + * Separated out for easy overriding in tests. + */ + @VisibleForTesting + protected ExecutorService createParallelExecutor() { + return Executors.newCachedThreadPool( + new ThreadFactoryBuilder() + .setDaemon(true) + .setNameFormat("Logger channel (from parallel executor) to " + addr) + .setUncaughtExceptionHandler( + UncaughtExceptionHandlers.systemExit()) + .build()); + } @Override public URL buildURLToFetchLogs(long segmentTxId) { @@ -286,7 +314,7 @@ public class IPCLoggerChannel implements AsyncLogger { @VisibleForTesting void waitForAllPendingCalls() throws InterruptedException { try { - executor.submit(new Runnable() { + singleThreadExecutor.submit(new Runnable() { @Override public void run() { } @@ -299,7 +327,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Boolean> isFormatted() { - return executor.submit(new Callable<Boolean>() { + return singleThreadExecutor.submit(new Callable<Boolean>() { @Override public Boolean call() throws IOException { return getProxy().isFormatted(journalId); @@ -309,7 +337,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<GetJournalStateResponseProto> getJournalState() { - return executor.submit(new Callable<GetJournalStateResponseProto>() { + return singleThreadExecutor.submit(new Callable<GetJournalStateResponseProto>() { @Override public GetJournalStateResponseProto call() throws IOException { GetJournalStateResponseProto ret = @@ -323,7 +351,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<NewEpochResponseProto> newEpoch( final long epoch) { - return executor.submit(new Callable<NewEpochResponseProto>() { + return singleThreadExecutor.submit(new Callable<NewEpochResponseProto>() { @Override public NewEpochResponseProto call() throws IOException { return getProxy().newEpoch(journalId, nsInfo, epoch); @@ -347,7 +375,7 @@ public class IPCLoggerChannel implements AsyncLogger { ListenableFuture<Void> ret = null; try { - ret = executor.submit(new Callable<Void>() { + ret = singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws IOException { throwIfOutOfSync(); @@ -464,7 +492,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Void> format(final NamespaceInfo nsInfo) { - return executor.submit(new Callable<Void>() { + return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws Exception { getProxy().format(journalId, nsInfo); @@ -476,7 +504,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Void> startLogSegment(final long txid, final int layoutVersion) { - return executor.submit(new Callable<Void>() { + return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws IOException { getProxy().startLogSegment(createReqInfo(), txid, layoutVersion); @@ -497,7 +525,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Void> finalizeLogSegment( final long startTxId, final long endTxId) { - return executor.submit(new Callable<Void>() { + return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws IOException { throwIfOutOfSync(); @@ -510,7 +538,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) { - return executor.submit(new Callable<Void>() { + return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws Exception { getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep); @@ -522,7 +550,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<RemoteEditLogManifest> getEditLogManifest( final long fromTxnId, final boolean inProgressOk) { - return executor.submit(new Callable<RemoteEditLogManifest>() { + return parallelExecutor.submit(new Callable<RemoteEditLogManifest>() { @Override public RemoteEditLogManifest call() throws IOException { GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest( @@ -538,7 +566,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<PrepareRecoveryResponseProto> prepareRecovery( final long segmentTxId) { - return executor.submit(new Callable<PrepareRecoveryResponseProto>() { + return singleThreadExecutor.submit(new Callable<PrepareRecoveryResponseProto>() { @Override public PrepareRecoveryResponseProto call() throws IOException { if (!hasHttpServerEndPoint()) { @@ -556,7 +584,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Void> acceptRecovery( final SegmentStateProto log, final URL url) { - return executor.submit(new Callable<Void>() { + return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws IOException { getProxy().acceptRecovery(createReqInfo(), log, url); @@ -567,7 +595,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Void> doPreUpgrade() { - return executor.submit(new Callable<Void>() { + return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws IOException { getProxy().doPreUpgrade(journalId); @@ -578,7 +606,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Void> doUpgrade(final StorageInfo sInfo) { - return executor.submit(new Callable<Void>() { + return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws IOException { getProxy().doUpgrade(journalId, sInfo); @@ -589,7 +617,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Void> doFinalize() { - return executor.submit(new Callable<Void>() { + return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws IOException { getProxy().doFinalize(journalId); @@ -601,7 +629,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Boolean> canRollBack(final StorageInfo storage, final StorageInfo prevStorage, final int targetLayoutVersion) { - return executor.submit(new Callable<Boolean>() { + return singleThreadExecutor.submit(new Callable<Boolean>() { @Override public Boolean call() throws IOException { return getProxy().canRollBack(journalId, storage, prevStorage, @@ -612,7 +640,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Void> doRollback() { - return executor.submit(new Callable<Void>() { + return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws IOException { getProxy().doRollback(journalId); @@ -623,7 +651,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Void> discardSegments(final long startTxId) { - return executor.submit(new Callable<Void>() { + return singleThreadExecutor.submit(new Callable<Void>() { @Override public Void call() throws IOException { getProxy().discardSegments(journalId, startTxId); @@ -634,7 +662,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public ListenableFuture<Long> getJournalCTime() { - return executor.submit(new Callable<Long>() { + return singleThreadExecutor.submit(new Callable<Long>() { @Override public Long call() throws IOException { return getProxy().getJournalCTime(journalId); http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 1ffe6f7..b36e547 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -651,7 +651,8 @@ public class Journal implements Closeable { } } if (log != null && log.isInProgress()) { - logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId())); + logs.add(new RemoteEditLog(log.getStartTxId(), getHighestWrittenTxId(), + true)); } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java index 0f63966..3649437 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupInputStream.java @@ -147,4 +147,9 @@ class EditLogBackupInputStream extends EditLogInputStream { public void setMaxOpSize(int maxOpSize) { reader.setMaxOpSize(maxOpSize); } + + @Override + public boolean isLocalLog() { + return true; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java index fa25604..974860c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java @@ -506,4 +506,9 @@ public class EditLogFileInputStream extends EditLogInputStream { reader.setMaxOpSize(maxOpSize); } } + + @Override + public boolean isLocalLog() { + return log instanceof FileLog; + } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java index 969668d..ac58616 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogInputStream.java @@ -203,4 +203,10 @@ public abstract class EditLogInputStream implements Closeable { * Set the maximum opcode size in bytes. */ public abstract void setMaxOpSize(int maxOpSize); + + /** + * Returns true if we are currently reading the log from a local disk or an + * even faster data source (e.g. a byte buffer). + */ + public abstract boolean isLocalLog(); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index b2adcd4..9d3538d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -188,6 +188,13 @@ public class FSEditLog implements LogsPurgeable { */ private final List<URI> sharedEditsDirs; + /** + * Take this lock when adding journals to or closing the JournalSet. Allows + * us to ensure that the JournalSet isn't closed or updated underneath us + * in selectInputStreams(). + */ + private final Object journalSetLock = new Object(); + private static class TransactionId { public long txid; @@ -252,20 +259,22 @@ public class FSEditLog implements LogsPurgeable { DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT); - journalSet = new JournalSet(minimumRedundantJournals); - - for (URI u : dirs) { - boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf) - .contains(u); - if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { - StorageDirectory sd = storage.getStorageDirectory(u); - if (sd != null) { - journalSet.add(new FileJournalManager(conf, sd, storage), - required, sharedEditsDirs.contains(u)); + synchronized(journalSetLock) { + journalSet = new JournalSet(minimumRedundantJournals); + + for (URI u : dirs) { + boolean required = FSNamesystem.getRequiredNamespaceEditsDirs(conf) + .contains(u); + if (u.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) { + StorageDirectory sd = storage.getStorageDirectory(u); + if (sd != null) { + journalSet.add(new FileJournalManager(conf, sd, storage), + required, sharedEditsDirs.contains(u)); + } + } else { + journalSet.add(createJournal(u), required, + sharedEditsDirs.contains(u)); } - } else { - journalSet.add(createJournal(u), required, - sharedEditsDirs.contains(u)); } } @@ -349,7 +358,9 @@ public class FSEditLog implements LogsPurgeable { } finally { if (journalSet != null && !journalSet.isEmpty()) { try { - journalSet.close(); + synchronized(journalSetLock) { + journalSet.close(); + } } catch (IOException ioe) { LOG.warn("Error closing journalSet", ioe); } @@ -606,7 +617,9 @@ public class FSEditLog implements LogsPurgeable { "due to " + e.getMessage() + ". " + "Unsynced transactions: " + (txid - synctxid); LOG.fatal(msg, new Exception()); - IOUtils.cleanup(LOG, journalSet); + synchronized(journalSetLock) { + IOUtils.cleanup(LOG, journalSet); + } terminate(1, msg); } } finally { @@ -630,7 +643,9 @@ public class FSEditLog implements LogsPurgeable { "Could not sync enough journals to persistent storage. " + "Unsynced transactions: " + (txid - synctxid); LOG.fatal(msg, new Exception()); - IOUtils.cleanup(LOG, journalSet); + synchronized(journalSetLock) { + IOUtils.cleanup(LOG, journalSet); + } terminate(1, msg); } } @@ -1301,9 +1316,8 @@ public class FSEditLog implements LogsPurgeable { /** * Return the txid of the last synced transaction. - * For test use only */ - synchronized long getSyncTxId() { + public synchronized long getSyncTxId() { return synctxid; } @@ -1340,7 +1354,9 @@ public class FSEditLog implements LogsPurgeable { LOG.info("Registering new backup node: " + bnReg); BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg); - journalSet.add(bjm, false); + synchronized(journalSetLock) { + journalSet.add(bjm, false); + } } synchronized void releaseBackupStream(NamenodeRegistration registration) @@ -1348,7 +1364,9 @@ public class FSEditLog implements LogsPurgeable { BackupJournalManager bjm = this.findBackupJournal(registration); if (bjm != null) { LOG.info("Removing backup journal " + bjm); - journalSet.remove(bjm); + synchronized(journalSetLock) { + journalSet.remove(bjm); + } } } @@ -1487,11 +1505,16 @@ public class FSEditLog implements LogsPurgeable { * @param recovery recovery context * @param inProgressOk set to true if in-progress streams are OK */ - public synchronized Collection<EditLogInputStream> selectInputStreams( + public Collection<EditLogInputStream> selectInputStreams( long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery, boolean inProgressOk) throws IOException { + List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>(); - selectInputStreams(streams, fromTxId, inProgressOk); + synchronized(journalSetLock) { + Preconditions.checkState(journalSet.isOpen(), "Cannot call " + + "selectInputStreams() on closed FSEditLog"); + selectInputStreams(streams, fromTxId, inProgressOk); + } try { checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk); http://git-wip-us.apache.org/repos/asf/hadoop/blob/faa4455b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index 362c316..6001db5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -187,17 +187,27 @@ public class FileJournalManager implements JournalManager { List<EditLogFile> allLogFiles = matchEditLogs(currentDir); List<RemoteEditLog> ret = Lists.newArrayListWithCapacity( allLogFiles.size()); - for (EditLogFile elf : allLogFiles) { if (elf.hasCorruptHeader() || (!inProgressOk && elf.isInProgress())) { continue; } + if (elf.isInProgress()) { + try { + elf.validateLog(); + } catch (IOException e) { + LOG.error("got IOException while trying to validate header of " + + elf + ". Skipping.", e); + continue; + } + } if (elf.getFirstTxId() >= firstTxId) { - ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId)); + ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId, + elf.isInProgress())); } else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) { // If the firstTxId is in the middle of an edit log segment. Return this // anyway and let the caller figure out whether it wants to use it. - ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId)); + ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId, + elf.isInProgress())); } }