HDFS-7446. HDFS inotify should have the ability to determine what txid it has read up to (cmccabe) (cherry picked from commit 75a326aaff8c92349701d9b3473c3070b8c2be44)
(cherry picked from commit 06552a15d5172a2b0ad3d61aa7f9a849857385aa) Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/43631451 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/43631451 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/43631451 Branch: refs/heads/branch-2.6 Commit: 4363145128f91b2fb1f1c0254ee5e8621a1ac383 Parents: 8ed162b Author: Colin Patrick Mccabe <cmcc...@cloudera.com> Authored: Tue Nov 25 17:44:34 2014 -0800 Committer: Vinod Kumar Vavilapalli <vino...@apache.org> Committed: Tue Sep 8 15:32:17 2015 -0700 ---------------------------------------------------------------------- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../hadoop/hdfs/DFSInotifyEventInputStream.java | 65 ++-- .../apache/hadoop/hdfs/inotify/EventBatch.java | 41 +++ .../hadoop/hdfs/inotify/EventBatchList.java | 63 ++++ .../apache/hadoop/hdfs/inotify/EventsList.java | 63 ---- .../hadoop/hdfs/protocol/ClientProtocol.java | 8 +- .../ClientNamenodeProtocolTranslatorPB.java | 4 +- .../apache/hadoop/hdfs/protocolPB/PBHelper.java | 341 ++++++++++--------- .../namenode/InotifyFSEditLogOpTranslator.java | 74 ++-- .../hdfs/server/namenode/NameNodeRpcServer.java | 45 +-- .../hadoop-hdfs/src/main/proto/inotify.proto | 10 +- .../hdfs/TestDFSInotifyEventInputStream.java | 209 +++++++----- 12 files changed, 526 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d2f07c2..1f6ce36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -156,6 +156,9 @@ Release 2.6.1 - UNRELEASED HDFS-7980. Incremental BlockReport will dramatically slow down namenode startup. (Walter Su via szetszwo) + HDFS-7446. HDFS inotify should have the ability to determine what txid it + has read up to (cmccabe) + Release 2.6.0 - 2014-11-18 INCOMPATIBLE CHANGES http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java index 73c5f55..83b92b9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInotifyEventInputStream.java @@ -19,11 +19,10 @@ package org.apache.hadoop.hdfs; import com.google.common.collect.Iterators; -import com.google.common.util.concurrent.UncheckedExecutionException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.inotify.Event; -import org.apache.hadoop.hdfs.inotify.EventsList; +import org.apache.hadoop.hdfs.inotify.EventBatch; +import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.inotify.MissingEventsException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.util.Time; @@ -33,13 +32,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Iterator; import java.util.Random; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; /** * Stream for reading inotify events. DFSInotifyEventInputStreams should not @@ -52,7 +45,7 @@ public class DFSInotifyEventInputStream { .class); private final ClientProtocol namenode; - private Iterator<Event> it; + private Iterator<EventBatch> it; private long lastReadTxid; /** * The most recent txid the NameNode told us it has sync'ed -- helps us @@ -78,22 +71,22 @@ public class DFSInotifyEventInputStream { } /** - * Returns the next event in the stream or null if no new events are currently - * available. + * Returns the next batch of events in the stream or null if no new + * batches are currently available. * * @throws IOException because of network error or edit log * corruption. Also possible if JournalNodes are unresponsive in the * QJM setting (even one unresponsive JournalNode is enough in rare cases), * so catching this exception and retrying at least a few times is * recommended. - * @throws MissingEventsException if we cannot return the next event in the - * stream because the data for the event (and possibly some subsequent events) - * has been deleted (generally because this stream is a very large number of - * events behind the current state of the NameNode). It is safe to continue - * reading from the stream after this exception is thrown -- the next - * available event will be returned. + * @throws MissingEventsException if we cannot return the next batch in the + * stream because the data for the events (and possibly some subsequent + * events) has been deleted (generally because this stream is a very large + * number of transactions behind the current state of the NameNode). It is + * safe to continue reading from the stream after this exception is thrown + * The next available batch of events will be returned. */ - public Event poll() throws IOException, MissingEventsException { + public EventBatch poll() throws IOException, MissingEventsException { // need to keep retrying until the NN sends us the latest committed txid if (lastReadTxid == -1) { LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN"); @@ -101,14 +94,14 @@ public class DFSInotifyEventInputStream { return null; } if (!it.hasNext()) { - EventsList el = namenode.getEditsFromTxid(lastReadTxid + 1); + EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1); if (el.getLastTxid() != -1) { // we only want to set syncTxid when we were actually able to read some // edits on the NN -- otherwise it will seem like edits are being // generated faster than we can read them when the problem is really // that we are temporarily unable to read edits syncTxid = el.getSyncTxid(); - it = el.getEvents().iterator(); + it = el.getBatches().iterator(); long formerLastReadTxid = lastReadTxid; lastReadTxid = el.getLastTxid(); if (el.getFirstTxid() != formerLastReadTxid + 1) { @@ -131,18 +124,18 @@ public class DFSInotifyEventInputStream { } /** - * Return a estimate of how many events behind the NameNode's current state - * this stream is. Clients should periodically call this method and check if - * its result is steadily increasing, which indicates that they are falling - * behind (i.e. events are being generated faster than the client is reading - * them). If a client falls too far behind events may be deleted before the - * client can read them. + * Return a estimate of how many transaction IDs behind the NameNode's + * current state this stream is. Clients should periodically call this method + * and check if its result is steadily increasing, which indicates that they + * are falling behind (i.e. transaction are being generated faster than the + * client is reading them). If a client falls too far behind events may be + * deleted before the client can read them. * <p/> * A return value of -1 indicates that an estimate could not be produced, and * should be ignored. The value returned by this method is really only useful * when compared to previous or subsequent returned values. */ - public long getEventsBehindEstimate() { + public long getTxidsBehindEstimate() { if (syncTxid == 0) { return -1; } else { @@ -155,8 +148,8 @@ public class DFSInotifyEventInputStream { } /** - * Returns the next event in the stream, waiting up to the specified amount of - * time for a new event. Returns null if a new event is not available at the + * Returns the next event batch in the stream, waiting up to the specified + * amount of time for a new batch. Returns null if one is not available at the * end of the specified amount of time. The time before the method returns may * exceed the specified amount of time by up to the time required for an RPC * to the NameNode. @@ -168,12 +161,12 @@ public class DFSInotifyEventInputStream { * see {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ - public Event poll(long time, TimeUnit tu) throws IOException, + public EventBatch poll(long time, TimeUnit tu) throws IOException, InterruptedException, MissingEventsException { long initialTime = Time.monotonicNow(); long totalWait = TimeUnit.MILLISECONDS.convert(time, tu); long nextWait = INITIAL_WAIT_MS; - Event next = null; + EventBatch next = null; while ((next = poll()) == null) { long timeLeft = totalWait - (Time.monotonicNow() - initialTime); if (timeLeft <= 0) { @@ -193,17 +186,17 @@ public class DFSInotifyEventInputStream { } /** - * Returns the next event in the stream, waiting indefinitely if a new event - * is not immediately available. + * Returns the next batch of events in the stream, waiting indefinitely if + * a new batch is not immediately available. * * @throws IOException see {@link DFSInotifyEventInputStream#poll()} * @throws MissingEventsException see * {@link DFSInotifyEventInputStream#poll()} * @throws InterruptedException if the calling thread is interrupted */ - public Event take() throws IOException, InterruptedException, + public EventBatch take() throws IOException, InterruptedException, MissingEventsException { - Event next = null; + EventBatch next = null; int nextWaitMin = INITIAL_WAIT_MS; while ((next = poll()) == null) { // sleep for a random period between nextWaitMin and nextWaitMin * 2 http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java new file mode 100644 index 0000000..0ad1070 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatch.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.inotify; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * A batch of events that all happened on the same transaction ID. + */ +@InterfaceAudience.Public +public class EventBatch { + private final long txid; + private final Event[] events; + + public EventBatch(long txid, Event[] events) { + this.txid = txid; + this.events = events; + } + + public long getTxid() { + return txid; + } + + public Event[] getEvents() { return events; } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java new file mode 100644 index 0000000..9c97038 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventBatchList.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.inotify; + +import org.apache.hadoop.classification.InterfaceAudience; + +import java.util.List; + +/** + * Contains a list of event batches, the transaction ID in the edit log up to + * which we read to produce these events, and the first txid we observed when + * producing these events (the last of which is for the purpose of determining + * whether we have missed events due to edit deletion). Also contains the most + * recent txid that the NameNode has sync'ed, so the client can determine how + * far behind in the edit log it is. + */ +@InterfaceAudience.Private +public class EventBatchList { + private List<EventBatch> batches; + private long firstTxid; + private long lastTxid; + private long syncTxid; + + public EventBatchList(List<EventBatch> batches, long firstTxid, + long lastTxid, long syncTxid) { + this.batches = batches; + this.firstTxid = firstTxid; + this.lastTxid = lastTxid; + this.syncTxid = syncTxid; + } + + public List<EventBatch> getBatches() { + return batches; + } + + public long getFirstTxid() { + return firstTxid; + } + + public long getLastTxid() { + return lastTxid; + } + + public long getSyncTxid() { + return syncTxid; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java deleted file mode 100644 index 6d02d3c..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/inotify/EventsList.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs.inotify; - -import org.apache.hadoop.classification.InterfaceAudience; - -import java.util.List; - -/** - * Contains a set of events, the transaction ID in the edit log up to which we - * read to produce these events, and the first txid we observed when producing - * these events (the last of which is for the purpose of determining whether we - * have missed events due to edit deletion). Also contains the most recent txid - * that the NameNode has sync'ed, so the client can determine how far behind in - * the edit log it is. - */ -@InterfaceAudience.Private -public class EventsList { - private List<Event> events; - private long firstTxid; - private long lastTxid; - private long syncTxid; - - public EventsList(List<Event> events, long firstTxid, long lastTxid, - long syncTxid) { - this.events = events; - this.firstTxid = firstTxid; - this.lastTxid = lastTxid; - this.syncTxid = syncTxid; - } - - public List<Event> getEvents() { - return events; - } - - public long getFirstTxid() { - return firstTxid; - } - - public long getLastTxid() { - return lastTxid; - } - - public long getSyncTxid() { - return syncTxid; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index d29d2eb..f3a390a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -43,7 +43,7 @@ import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.hdfs.inotify.EventsList; +import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.HdfsConstants.RollingUpgradeAction; import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; @@ -1405,9 +1405,9 @@ public interface ClientProtocol { public long getCurrentEditLogTxid() throws IOException; /** - * Get an ordered list of events corresponding to the edit log transactions - * from txid onwards. + * Get an ordered list of batches of events corresponding to the edit log + * transactions for txids equal to or greater than txid. */ @Idempotent - public EventsList getEditsFromTxid(long txid) throws IOException; + public EventBatchList getEditsFromTxid(long txid) throws IOException; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java index 077a3e9..7a2dd15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/ClientNamenodeProtocolTranslatorPB.java @@ -44,7 +44,7 @@ import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; -import org.apache.hadoop.hdfs.inotify.EventsList; +import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; @@ -1480,7 +1480,7 @@ public class ClientNamenodeProtocolTranslatorPB implements } @Override - public EventsList getEditsFromTxid(long txid) throws IOException { + public EventBatchList getEditsFromTxid(long txid) throws IOException { GetEditsFromTxidRequestProto req = GetEditsFromTxidRequestProto.newBuilder() .setTxid(txid).build(); try { http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index ae9cb3e..c52588f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -46,11 +46,12 @@ import org.apache.hadoop.fs.permission.FsAction; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; +import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.inotify.Event; -import org.apache.hadoop.hdfs.inotify.EventsList; +import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry; import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo; @@ -2516,173 +2517,197 @@ public class PBHelper { } } - public static EventsList convert(GetEditsFromTxidResponseProto resp) throws + public static EventBatchList convert(GetEditsFromTxidResponseProto resp) throws IOException { - List<Event> events = Lists.newArrayList(); - for (InotifyProtos.EventProto p : resp.getEventsList().getEventsList()) { - switch(p.getType()) { - case EVENT_CLOSE: - InotifyProtos.CloseEventProto close = - InotifyProtos.CloseEventProto.parseFrom(p.getContents()); - events.add(new Event.CloseEvent(close.getPath(), close.getFileSize(), - close.getTimestamp())); - break; - case EVENT_CREATE: - InotifyProtos.CreateEventProto create = - InotifyProtos.CreateEventProto.parseFrom(p.getContents()); - events.add(new Event.CreateEvent.Builder() - .iNodeType(createTypeConvert(create.getType())) - .path(create.getPath()) - .ctime(create.getCtime()) - .ownerName(create.getOwnerName()) - .groupName(create.getGroupName()) - .perms(convert(create.getPerms())) - .replication(create.getReplication()) - .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null : - create.getSymlinkTarget()) - .overwrite(create.getOverwrite()).build()); - break; - case EVENT_METADATA: - InotifyProtos.MetadataUpdateEventProto meta = - InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents()); - events.add(new Event.MetadataUpdateEvent.Builder() - .path(meta.getPath()) - .metadataType(metadataUpdateTypeConvert(meta.getType())) - .mtime(meta.getMtime()) - .atime(meta.getAtime()) - .replication(meta.getReplication()) - .ownerName( - meta.getOwnerName().isEmpty() ? null : meta.getOwnerName()) - .groupName( - meta.getGroupName().isEmpty() ? null : meta.getGroupName()) - .perms(meta.hasPerms() ? convert(meta.getPerms()) : null) - .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry( - meta.getAclsList())) - .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs( - meta.getXAttrsList())) - .xAttrsRemoved(meta.getXAttrsRemoved()) - .build()); - break; - case EVENT_RENAME: - InotifyProtos.RenameEventProto rename = - InotifyProtos.RenameEventProto.parseFrom(p.getContents()); - events.add(new Event.RenameEvent(rename.getSrcPath(), rename.getDestPath(), - rename.getTimestamp())); - break; - case EVENT_APPEND: - InotifyProtos.AppendEventProto reopen = - InotifyProtos.AppendEventProto.parseFrom(p.getContents()); - events.add(new Event.AppendEvent(reopen.getPath())); - break; - case EVENT_UNLINK: - InotifyProtos.UnlinkEventProto unlink = - InotifyProtos.UnlinkEventProto.parseFrom(p.getContents()); - events.add(new Event.UnlinkEvent(unlink.getPath(), unlink.getTimestamp())); - break; - default: - throw new RuntimeException("Unexpected inotify event type: " + - p.getType()); + final InotifyProtos.EventsListProto list = resp.getEventsList(); + final long firstTxid = list.getFirstTxid(); + final long lastTxid = list.getLastTxid(); + + List<EventBatch> batches = Lists.newArrayList(); + if (list.getEventsList().size() > 0) { + throw new IOException("Can't handle old inotify server response."); + } + for (InotifyProtos.EventBatchProto bp : list.getBatchList()) { + long txid = bp.getTxid(); + if ((txid != -1) && ((txid < firstTxid) || (txid > lastTxid))) { + throw new IOException("Error converting TxidResponseProto: got a " + + "transaction id " + txid + " that was outside the range of [" + + firstTxid + ", " + lastTxid + "]."); + } + List<Event> events = Lists.newArrayList(); + for (InotifyProtos.EventProto p : bp.getEventsList()) { + switch (p.getType()) { + case EVENT_CLOSE: + InotifyProtos.CloseEventProto close = + InotifyProtos.CloseEventProto.parseFrom(p.getContents()); + events.add(new Event.CloseEvent(close.getPath(), + close.getFileSize(), close.getTimestamp())); + break; + case EVENT_CREATE: + InotifyProtos.CreateEventProto create = + InotifyProtos.CreateEventProto.parseFrom(p.getContents()); + events.add(new Event.CreateEvent.Builder() + .iNodeType(createTypeConvert(create.getType())) + .path(create.getPath()) + .ctime(create.getCtime()) + .ownerName(create.getOwnerName()) + .groupName(create.getGroupName()) + .perms(convert(create.getPerms())) + .replication(create.getReplication()) + .symlinkTarget(create.getSymlinkTarget().isEmpty() ? null : + create.getSymlinkTarget()) + .overwrite(create.getOverwrite()).build()); + break; + case EVENT_METADATA: + InotifyProtos.MetadataUpdateEventProto meta = + InotifyProtos.MetadataUpdateEventProto.parseFrom(p.getContents()); + events.add(new Event.MetadataUpdateEvent.Builder() + .path(meta.getPath()) + .metadataType(metadataUpdateTypeConvert(meta.getType())) + .mtime(meta.getMtime()) + .atime(meta.getAtime()) + .replication(meta.getReplication()) + .ownerName( + meta.getOwnerName().isEmpty() ? null : meta.getOwnerName()) + .groupName( + meta.getGroupName().isEmpty() ? null : meta.getGroupName()) + .perms(meta.hasPerms() ? convert(meta.getPerms()) : null) + .acls(meta.getAclsList().isEmpty() ? null : convertAclEntry( + meta.getAclsList())) + .xAttrs(meta.getXAttrsList().isEmpty() ? null : convertXAttrs( + meta.getXAttrsList())) + .xAttrsRemoved(meta.getXAttrsRemoved()) + .build()); + break; + case EVENT_RENAME: + InotifyProtos.RenameEventProto rename = + InotifyProtos.RenameEventProto.parseFrom(p.getContents()); + events.add(new Event.RenameEvent(rename.getSrcPath(), + rename.getDestPath(), rename.getTimestamp())); + break; + case EVENT_APPEND: + InotifyProtos.AppendEventProto reopen = + InotifyProtos.AppendEventProto.parseFrom(p.getContents()); + events.add(new Event.AppendEvent(reopen.getPath())); + break; + case EVENT_UNLINK: + InotifyProtos.UnlinkEventProto unlink = + InotifyProtos.UnlinkEventProto.parseFrom(p.getContents()); + events.add(new Event.UnlinkEvent(unlink.getPath(), + unlink.getTimestamp())); + break; + default: + throw new RuntimeException("Unexpected inotify event type: " + + p.getType()); + } } + batches.add(new EventBatch(txid, events.toArray(new Event[0]))); } - return new EventsList(events, resp.getEventsList().getFirstTxid(), + return new EventBatchList(batches, resp.getEventsList().getFirstTxid(), resp.getEventsList().getLastTxid(), resp.getEventsList().getSyncTxid()); } - public static GetEditsFromTxidResponseProto convertEditsResponse(EventsList el) { + public static GetEditsFromTxidResponseProto convertEditsResponse(EventBatchList el) { InotifyProtos.EventsListProto.Builder builder = InotifyProtos.EventsListProto.newBuilder(); - for (Event e : el.getEvents()) { - switch(e.getEventType()) { - case CLOSE: - Event.CloseEvent ce = (Event.CloseEvent) e; - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_CLOSE) - .setContents( - InotifyProtos.CloseEventProto.newBuilder() - .setPath(ce.getPath()) - .setFileSize(ce.getFileSize()) - .setTimestamp(ce.getTimestamp()).build().toByteString() - ).build()); - break; - case CREATE: - Event.CreateEvent ce2 = (Event.CreateEvent) e; - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_CREATE) - .setContents( - InotifyProtos.CreateEventProto.newBuilder() - .setType(createTypeConvert(ce2.getiNodeType())) - .setPath(ce2.getPath()) - .setCtime(ce2.getCtime()) - .setOwnerName(ce2.getOwnerName()) - .setGroupName(ce2.getGroupName()) - .setPerms(convert(ce2.getPerms())) - .setReplication(ce2.getReplication()) - .setSymlinkTarget(ce2.getSymlinkTarget() == null ? - "" : ce2.getSymlinkTarget()) - .setOverwrite(ce2.getOverwrite()).build().toByteString() - ).build()); - break; - case METADATA: - Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e; - InotifyProtos.MetadataUpdateEventProto.Builder metaB = - InotifyProtos.MetadataUpdateEventProto.newBuilder() - .setPath(me.getPath()) - .setType(metadataUpdateTypeConvert(me.getMetadataType())) - .setMtime(me.getMtime()) - .setAtime(me.getAtime()) - .setReplication(me.getReplication()) - .setOwnerName(me.getOwnerName() == null ? "" : - me.getOwnerName()) - .setGroupName(me.getGroupName() == null ? "" : - me.getGroupName()) - .addAllAcls(me.getAcls() == null ? - Lists.<AclEntryProto>newArrayList() : - convertAclEntryProto(me.getAcls())) - .addAllXAttrs(me.getxAttrs() == null ? - Lists.<XAttrProto>newArrayList() : - convertXAttrProto(me.getxAttrs())) - .setXAttrsRemoved(me.isxAttrsRemoved()); - if (me.getPerms() != null) { - metaB.setPerms(convert(me.getPerms())); + for (EventBatch b : el.getBatches()) { + List<InotifyProtos.EventProto> events = Lists.newArrayList(); + for (Event e : b.getEvents()) { + switch (e.getEventType()) { + case CLOSE: + Event.CloseEvent ce = (Event.CloseEvent) e; + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_CLOSE) + .setContents( + InotifyProtos.CloseEventProto.newBuilder() + .setPath(ce.getPath()) + .setFileSize(ce.getFileSize()) + .setTimestamp(ce.getTimestamp()).build().toByteString() + ).build()); + break; + case CREATE: + Event.CreateEvent ce2 = (Event.CreateEvent) e; + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_CREATE) + .setContents( + InotifyProtos.CreateEventProto.newBuilder() + .setType(createTypeConvert(ce2.getiNodeType())) + .setPath(ce2.getPath()) + .setCtime(ce2.getCtime()) + .setOwnerName(ce2.getOwnerName()) + .setGroupName(ce2.getGroupName()) + .setPerms(convert(ce2.getPerms())) + .setReplication(ce2.getReplication()) + .setSymlinkTarget(ce2.getSymlinkTarget() == null ? + "" : ce2.getSymlinkTarget()) + .setOverwrite(ce2.getOverwrite()).build().toByteString() + ).build()); + break; + case METADATA: + Event.MetadataUpdateEvent me = (Event.MetadataUpdateEvent) e; + InotifyProtos.MetadataUpdateEventProto.Builder metaB = + InotifyProtos.MetadataUpdateEventProto.newBuilder() + .setPath(me.getPath()) + .setType(metadataUpdateTypeConvert(me.getMetadataType())) + .setMtime(me.getMtime()) + .setAtime(me.getAtime()) + .setReplication(me.getReplication()) + .setOwnerName(me.getOwnerName() == null ? "" : + me.getOwnerName()) + .setGroupName(me.getGroupName() == null ? "" : + me.getGroupName()) + .addAllAcls(me.getAcls() == null ? + Lists.<AclEntryProto>newArrayList() : + convertAclEntryProto(me.getAcls())) + .addAllXAttrs(me.getxAttrs() == null ? + Lists.<XAttrProto>newArrayList() : + convertXAttrProto(me.getxAttrs())) + .setXAttrsRemoved(me.isxAttrsRemoved()); + if (me.getPerms() != null) { + metaB.setPerms(convert(me.getPerms())); + } + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_METADATA) + .setContents(metaB.build().toByteString()) + .build()); + break; + case RENAME: + Event.RenameEvent re = (Event.RenameEvent) e; + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_RENAME) + .setContents( + InotifyProtos.RenameEventProto.newBuilder() + .setSrcPath(re.getSrcPath()) + .setDestPath(re.getDstPath()) + .setTimestamp(re.getTimestamp()).build().toByteString() + ).build()); + break; + case APPEND: + Event.AppendEvent re2 = (Event.AppendEvent) e; + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_APPEND) + .setContents( + InotifyProtos.AppendEventProto.newBuilder() + .setPath(re2.getPath()).build().toByteString() + ).build()); + break; + case UNLINK: + Event.UnlinkEvent ue = (Event.UnlinkEvent) e; + events.add(InotifyProtos.EventProto.newBuilder() + .setType(InotifyProtos.EventType.EVENT_UNLINK) + .setContents( + InotifyProtos.UnlinkEventProto.newBuilder() + .setPath(ue.getPath()) + .setTimestamp(ue.getTimestamp()).build().toByteString() + ).build()); + break; + default: + throw new RuntimeException("Unexpected inotify event: " + e); } - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_METADATA) - .setContents(metaB.build().toByteString()) - .build()); - break; - case RENAME: - Event.RenameEvent re = (Event.RenameEvent) e; - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_RENAME) - .setContents( - InotifyProtos.RenameEventProto.newBuilder() - .setSrcPath(re.getSrcPath()) - .setDestPath(re.getDstPath()) - .setTimestamp(re.getTimestamp()).build().toByteString() - ).build()); - break; - case APPEND: - Event.AppendEvent re2 = (Event.AppendEvent) e; - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_APPEND) - .setContents( - InotifyProtos.AppendEventProto.newBuilder() - .setPath(re2.getPath()).build().toByteString() - ).build()); - break; - case UNLINK: - Event.UnlinkEvent ue = (Event.UnlinkEvent) e; - builder.addEvents(InotifyProtos.EventProto.newBuilder() - .setType(InotifyProtos.EventType.EVENT_UNLINK) - .setContents( - InotifyProtos.UnlinkEventProto.newBuilder() - .setPath(ue.getPath()) - .setTimestamp(ue.getTimestamp()).build().toByteString() - ).build()); - break; - default: - throw new RuntimeException("Unexpected inotify event: " + e); } + builder.addBatch(InotifyProtos.EventBatchProto.newBuilder(). + setTxid(b.getTxid()). + addAllEvents(events)); } builder.setFirstTxid(el.getFirstTxid()); builder.setLastTxid(el.getLastTxid()); http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java index 00a5f25..cd3fc23 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/InotifyFSEditLogOpTranslator.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.namenode; import com.google.common.collect.Lists; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.inotify.Event; +import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.protocol.Block; import java.util.List; @@ -39,32 +40,35 @@ public class InotifyFSEditLogOpTranslator { return size; } - public static Event[] translate(FSEditLogOp op) { + public static EventBatch translate(FSEditLogOp op) { switch(op.opCode) { case OP_ADD: FSEditLogOp.AddOp addOp = (FSEditLogOp.AddOp) op; if (addOp.blocks.length == 0) { // create - return new Event[] { new Event.CreateEvent.Builder().path(addOp.path) + return new EventBatch(op.txid, + new Event[] { new Event.CreateEvent.Builder().path(addOp.path) .ctime(addOp.atime) .replication(addOp.replication) .ownerName(addOp.permissions.getUserName()) .groupName(addOp.permissions.getGroupName()) .perms(addOp.permissions.getPermission()) .overwrite(addOp.overwrite) - .iNodeType(Event.CreateEvent.INodeType.FILE).build() }; + .iNodeType(Event.CreateEvent.INodeType.FILE).build() }); } else { - return new Event[] { new Event.AppendEvent(addOp.path) }; + return new EventBatch(op.txid, + new Event[] { new Event.AppendEvent(addOp.path) }); } case OP_CLOSE: FSEditLogOp.CloseOp cOp = (FSEditLogOp.CloseOp) op; - return new Event[] { - new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) }; + return new EventBatch(op.txid, new Event[] { + new Event.CloseEvent(cOp.path, getSize(cOp), cOp.mtime) }); case OP_SET_REPLICATION: FSEditLogOp.SetReplicationOp setRepOp = (FSEditLogOp.SetReplicationOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.REPLICATION) .path(setRepOp.path) - .replication(setRepOp.replication).build() }; + .replication(setRepOp.replication).build() }); case OP_CONCAT_DELETE: FSEditLogOp.ConcatDeleteOp cdOp = (FSEditLogOp.ConcatDeleteOp) op; List<Event> events = Lists.newArrayList(); @@ -73,73 +77,83 @@ public class InotifyFSEditLogOpTranslator { events.add(new Event.UnlinkEvent(src, cdOp.timestamp)); } events.add(new Event.CloseEvent(cdOp.trg, -1, cdOp.timestamp)); - return events.toArray(new Event[0]); + return new EventBatch(op.txid, events.toArray(new Event[0])); case OP_RENAME_OLD: FSEditLogOp.RenameOldOp rnOpOld = (FSEditLogOp.RenameOldOp) op; - return new Event[] { - new Event.RenameEvent(rnOpOld.src, rnOpOld.dst, rnOpOld.timestamp) }; + return new EventBatch(op.txid, new Event[] { + new Event.RenameEvent(rnOpOld.src, + rnOpOld.dst, rnOpOld.timestamp) }); case OP_RENAME: FSEditLogOp.RenameOp rnOp = (FSEditLogOp.RenameOp) op; - return new Event[] { - new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) }; + return new EventBatch(op.txid, new Event[] { + new Event.RenameEvent(rnOp.src, rnOp.dst, rnOp.timestamp) }); case OP_DELETE: FSEditLogOp.DeleteOp delOp = (FSEditLogOp.DeleteOp) op; - return new Event[] { new Event.UnlinkEvent(delOp.path, delOp.timestamp) }; + return new EventBatch(op.txid, new Event[] { + new Event.UnlinkEvent(delOp.path, delOp.timestamp) }); case OP_MKDIR: FSEditLogOp.MkdirOp mkOp = (FSEditLogOp.MkdirOp) op; - return new Event[] { new Event.CreateEvent.Builder().path(mkOp.path) + return new EventBatch(op.txid, + new Event[] { new Event.CreateEvent.Builder().path(mkOp.path) .ctime(mkOp.timestamp) .ownerName(mkOp.permissions.getUserName()) .groupName(mkOp.permissions.getGroupName()) .perms(mkOp.permissions.getPermission()) - .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() }; + .iNodeType(Event.CreateEvent.INodeType.DIRECTORY).build() }); case OP_SET_PERMISSIONS: FSEditLogOp.SetPermissionsOp permOp = (FSEditLogOp.SetPermissionsOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.PERMS) .path(permOp.src) - .perms(permOp.permissions).build() }; + .perms(permOp.permissions).build() }); case OP_SET_OWNER: FSEditLogOp.SetOwnerOp ownOp = (FSEditLogOp.SetOwnerOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.OWNER) .path(ownOp.src) - .ownerName(ownOp.username).groupName(ownOp.groupname).build() }; + .ownerName(ownOp.username).groupName(ownOp.groupname).build() }); case OP_TIMES: FSEditLogOp.TimesOp timesOp = (FSEditLogOp.TimesOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.TIMES) .path(timesOp.path) - .atime(timesOp.atime).mtime(timesOp.mtime).build() }; + .atime(timesOp.atime).mtime(timesOp.mtime).build() }); case OP_SYMLINK: FSEditLogOp.SymlinkOp symOp = (FSEditLogOp.SymlinkOp) op; - return new Event[] { new Event.CreateEvent.Builder().path(symOp.path) + return new EventBatch(op.txid, + new Event[] { new Event.CreateEvent.Builder().path(symOp.path) .ctime(symOp.atime) .ownerName(symOp.permissionStatus.getUserName()) .groupName(symOp.permissionStatus.getGroupName()) .perms(symOp.permissionStatus.getPermission()) .symlinkTarget(symOp.value) - .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() }; + .iNodeType(Event.CreateEvent.INodeType.SYMLINK).build() }); case OP_REMOVE_XATTR: FSEditLogOp.RemoveXAttrOp rxOp = (FSEditLogOp.RemoveXAttrOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS) .path(rxOp.src) .xAttrs(rxOp.xAttrs) - .xAttrsRemoved(true).build() }; + .xAttrsRemoved(true).build() }); case OP_SET_XATTR: FSEditLogOp.SetXAttrOp sxOp = (FSEditLogOp.SetXAttrOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.XATTRS) .path(sxOp.src) .xAttrs(sxOp.xAttrs) - .xAttrsRemoved(false).build() }; + .xAttrsRemoved(false).build() }); case OP_SET_ACL: FSEditLogOp.SetAclOp saOp = (FSEditLogOp.SetAclOp) op; - return new Event[] { new Event.MetadataUpdateEvent.Builder() + return new EventBatch(op.txid, + new Event[] { new Event.MetadataUpdateEvent.Builder() .metadataType(Event.MetadataUpdateEvent.MetadataType.ACLS) .path(saOp.src) - .acls(saOp.aclEntries).build() }; + .acls(saOp.aclEntries).build() }); default: return null; } http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 690d7e1..16cec5c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -34,7 +34,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import com.google.common.collect.Lists; import org.apache.commons.logging.Log; import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; @@ -55,8 +54,8 @@ import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclStatus; -import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsAction; +import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.ha.HAServiceStatus; import org.apache.hadoop.ha.HealthCheckFailedException; @@ -67,8 +66,8 @@ import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HDFSPolicyProvider; -import org.apache.hadoop.hdfs.inotify.Event; -import org.apache.hadoop.hdfs.inotify.EventsList; +import org.apache.hadoop.hdfs.inotify.EventBatch; +import org.apache.hadoop.hdfs.inotify.EventBatchList; import org.apache.hadoop.hdfs.protocol.AclException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; @@ -139,10 +138,16 @@ import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.ipc.ProtobufRpcEngine; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.ipc.Server; -import org.apache.hadoop.ipc.WritableRpcEngine; import org.apache.hadoop.ipc.RefreshRegistry; import org.apache.hadoop.ipc.RefreshResponse; +import org.apache.hadoop.ipc.Server; +import org.apache.hadoop.ipc.WritableRpcEngine; +import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService; +import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; +import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB; +import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB; +import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.Groups; @@ -155,19 +160,12 @@ import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolP import org.apache.hadoop.security.protocolPB.RefreshAuthorizationPolicyProtocolServerSideTranslatorPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolPB; import org.apache.hadoop.security.protocolPB.RefreshUserMappingsProtocolServerSideTranslatorPB; -import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolPB; -import org.apache.hadoop.ipc.protocolPB.RefreshCallQueueProtocolServerSideTranslatorPB; -import org.apache.hadoop.ipc.proto.RefreshCallQueueProtocolProtos.RefreshCallQueueProtocolService; -import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolPB; -import org.apache.hadoop.ipc.protocolPB.GenericRefreshProtocolServerSideTranslatorPB; -import org.apache.hadoop.ipc.proto.GenericRefreshProtocolProtos.GenericRefreshProtocolService; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.tools.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolPB; import org.apache.hadoop.tools.protocolPB.GetUserMappingsProtocolServerSideTranslatorPB; import org.apache.hadoop.tracing.SpanReceiverInfo; -import org.apache.hadoop.tracing.TraceAdminPB; import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService; import org.apache.hadoop.tracing.TraceAdminProtocolPB; import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB; @@ -175,6 +173,7 @@ import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionUtil; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Lists; import com.google.protobuf.BlockingService; /** @@ -1670,7 +1669,7 @@ class NameNodeRpcServer implements NamenodeProtocols { } @Override // ClientProtocol - public EventsList getEditsFromTxid(long txid) throws IOException { + public EventBatchList getEditsFromTxid(long txid) throws IOException { checkNNStartup(); namesystem.checkOperation(OperationCategory.READ); // only active namesystem.checkSuperuserPrivilege(); @@ -1689,13 +1688,14 @@ class NameNodeRpcServer implements NamenodeProtocols { // guaranteed to have been written by this NameNode.) boolean readInProgress = syncTxid > 0; - List<Event> events = Lists.newArrayList(); + List<EventBatch> batches = Lists.newArrayList(); + int totalEvents = 0; long maxSeenTxid = -1; long firstSeenTxid = -1; if (syncTxid > 0 && txid > syncTxid) { // we can't read past syncTxid, so there's no point in going any further - return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); + return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid); } Collection<EditLogInputStream> streams = null; @@ -1707,7 +1707,7 @@ class NameNodeRpcServer implements NamenodeProtocols { // will result LOG.info("NN is transitioning from active to standby and FSEditLog " + "is closed -- could not read edits"); - return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); + return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid); } boolean breakOuter = false; @@ -1725,9 +1725,10 @@ class NameNodeRpcServer implements NamenodeProtocols { break; } - Event[] eventsFromOp = InotifyFSEditLogOpTranslator.translate(op); - if (eventsFromOp != null) { - events.addAll(Arrays.asList(eventsFromOp)); + EventBatch eventBatch = InotifyFSEditLogOpTranslator.translate(op); + if (eventBatch != null) { + batches.add(eventBatch); + totalEvents += eventBatch.getEvents().length; } if (op.getTransactionId() > maxSeenTxid) { maxSeenTxid = op.getTransactionId(); @@ -1735,7 +1736,7 @@ class NameNodeRpcServer implements NamenodeProtocols { if (firstSeenTxid == -1) { firstSeenTxid = op.getTransactionId(); } - if (events.size() >= maxEventsPerRPC || (syncTxid > 0 && + if (totalEvents >= maxEventsPerRPC || (syncTxid > 0 && op.getTransactionId() == syncTxid)) { // we're done breakOuter = true; @@ -1750,7 +1751,7 @@ class NameNodeRpcServer implements NamenodeProtocols { } } - return new EventsList(events, firstSeenTxid, maxSeenTxid, syncTxid); + return new EventBatchList(batches, firstSeenTxid, maxSeenTxid, syncTxid); } @Override // TraceAdminProtocol http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto index a1d4d92..e51c02c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/inotify.proto @@ -48,6 +48,11 @@ message EventProto { required bytes contents = 2; } +message EventBatchProto { + required int64 txid = 1; + repeated EventProto events = 2; +} + enum INodeType { I_TYPE_FILE = 0x0; I_TYPE_DIRECTORY = 0x1; @@ -111,8 +116,9 @@ message UnlinkEventProto { } message EventsListProto { - repeated EventProto events = 1; + repeated EventProto events = 1; // deprecated required int64 firstTxid = 2; required int64 lastTxid = 3; required int64 syncTxid = 4; -} \ No newline at end of file + repeated EventBatchProto batch = 5; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/43631451/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java index a608ba8..82db110 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSInotifyEventInputStream.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.XAttrSetFlag; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.inotify.Event; +import org.apache.hadoop.hdfs.inotify.EventBatch; import org.apache.hadoop.hdfs.inotify.MissingEventsException; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes; @@ -49,11 +50,17 @@ public class TestDFSInotifyEventInputStream { private static final Log LOG = LogFactory.getLog( TestDFSInotifyEventInputStream.class); - private static Event waitForNextEvent(DFSInotifyEventInputStream eis) + private static EventBatch waitForNextEvents(DFSInotifyEventInputStream eis) throws IOException, MissingEventsException { - Event next = null; - while ((next = eis.poll()) == null); - return next; + EventBatch batch = null; + while ((batch = eis.poll()) == null); + return batch; + } + + private static long checkTxid(EventBatch batch, long prevTxid){ + Assert.assertTrue("Previous txid " + prevTxid + " was not less than " + + "new txid " + batch.getTxid(), prevTxid < batch.getTxid()); + return batch.getTxid(); } /** @@ -64,7 +71,7 @@ public class TestDFSInotifyEventInputStream { */ @Test public void testOpcodeCount() { - Assert.assertTrue(FSEditLogOpCodes.values().length == 47); + Assert.assertEquals(47, FSEditLogOpCodes.values().length); } @@ -127,30 +134,36 @@ public class TestDFSInotifyEventInputStream { "user::rwx,user:foo:rw-,group::r--,other::---", true)); client.removeAcl("/file5"); // SetAclOp -> MetadataUpdateEvent - Event next = null; + EventBatch batch = null; // RenameOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.RENAME); - Event.RenameEvent re = (Event.RenameEvent) next; - Assert.assertTrue(re.getDstPath().equals("/file4")); - Assert.assertTrue(re.getSrcPath().equals("/file")); + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + long txid = batch.getTxid(); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME); + Event.RenameEvent re = (Event.RenameEvent) batch.getEvents()[0]; + Assert.assertEquals("/file4", re.getDstPath()); + Assert.assertEquals("/file", re.getSrcPath()); Assert.assertTrue(re.getTimestamp() > 0); - long eventsBehind = eis.getEventsBehindEstimate(); + long eventsBehind = eis.getTxidsBehindEstimate(); // RenameOldOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.RENAME); - Event.RenameEvent re2 = (Event.RenameEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.RENAME); + Event.RenameEvent re2 = (Event.RenameEvent) batch.getEvents()[0]; Assert.assertTrue(re2.getDstPath().equals("/file2")); Assert.assertTrue(re2.getSrcPath().equals("/file4")); Assert.assertTrue(re.getTimestamp() > 0); // AddOp with overwrite - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Event.CreateEvent ce = (Event.CreateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Event.CreateEvent ce = (Event.CreateEvent) batch.getEvents()[0]; Assert.assertTrue(ce.getiNodeType() == Event.CreateEvent.INodeType.FILE); Assert.assertTrue(ce.getPath().equals("/file2")); Assert.assertTrue(ce.getCtime() > 0); @@ -159,66 +172,80 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(ce.getOverwrite()); // CloseOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); - Event.CloseEvent ce2 = (Event.CloseEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE); + Event.CloseEvent ce2 = (Event.CloseEvent) batch.getEvents()[0]; Assert.assertTrue(ce2.getPath().equals("/file2")); Assert.assertTrue(ce2.getFileSize() > 0); Assert.assertTrue(ce2.getTimestamp() > 0); // AddOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.APPEND); - Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2")); + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND); + Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2")); // CloseOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); - Assert.assertTrue(((Event.CloseEvent) next).getPath().equals("/file2")); + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CLOSE); + Assert.assertTrue(((Event.CloseEvent) batch.getEvents()[0]).getPath().equals("/file2")); // TimesOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue.getPath().equals("/file2")); Assert.assertTrue(mue.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.TIMES); // SetReplicationOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue2 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue2.getPath().equals("/file2")); Assert.assertTrue(mue2.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.REPLICATION); Assert.assertTrue(mue2.getReplication() == 1); // ConcatDeleteOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.APPEND); - Assert.assertTrue(((Event.AppendEvent) next).getPath().equals("/file2")); - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK); - Event.UnlinkEvent ue2 = (Event.UnlinkEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(3, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.APPEND); + Assert.assertTrue(((Event.AppendEvent) batch.getEvents()[0]).getPath().equals("/file2")); + Assert.assertTrue(batch.getEvents()[1].getEventType() == Event.EventType.UNLINK); + Event.UnlinkEvent ue2 = (Event.UnlinkEvent) batch.getEvents()[1]; Assert.assertTrue(ue2.getPath().equals("/file3")); Assert.assertTrue(ue2.getTimestamp() > 0); - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CLOSE); - Event.CloseEvent ce3 = (Event.CloseEvent) next; + Assert.assertTrue(batch.getEvents()[2].getEventType() == Event.EventType.CLOSE); + Event.CloseEvent ce3 = (Event.CloseEvent) batch.getEvents()[2]; Assert.assertTrue(ce3.getPath().equals("/file2")); Assert.assertTrue(ce3.getTimestamp() > 0); // DeleteOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.UNLINK); - Event.UnlinkEvent ue = (Event.UnlinkEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.UNLINK); + Event.UnlinkEvent ue = (Event.UnlinkEvent) batch.getEvents()[0]; Assert.assertTrue(ue.getPath().equals("/file2")); Assert.assertTrue(ue.getTimestamp() > 0); // MkdirOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Event.CreateEvent ce4 = (Event.CreateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Event.CreateEvent ce4 = (Event.CreateEvent) batch.getEvents()[0]; Assert.assertTrue(ce4.getiNodeType() == Event.CreateEvent.INodeType.DIRECTORY); Assert.assertTrue(ce4.getPath().equals("/dir")); @@ -227,18 +254,22 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(ce4.getSymlinkTarget() == null); // SetPermissionsOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue3 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue3.getPath().equals("/dir")); Assert.assertTrue(mue3.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.PERMS); Assert.assertTrue(mue3.getPerms().toString().contains("rw-rw-rw-")); // SetOwnerOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue4 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue4.getPath().equals("/dir")); Assert.assertTrue(mue4.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.OWNER); @@ -246,9 +277,11 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(mue4.getGroupName().equals("groupname")); // SymlinkOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Event.CreateEvent ce5 = (Event.CreateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Event.CreateEvent ce5 = (Event.CreateEvent) batch.getEvents()[0]; Assert.assertTrue(ce5.getiNodeType() == Event.CreateEvent.INodeType.SYMLINK); Assert.assertTrue(ce5.getPath().equals("/dir2")); @@ -257,9 +290,11 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(ce5.getSymlinkTarget().equals("/dir")); // SetXAttrOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue5 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue5.getPath().equals("/file5")); Assert.assertTrue(mue5.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.XATTRS); @@ -268,9 +303,11 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(!mue5.isxAttrsRemoved()); // RemoveXAttrOp - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue6 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue6.getPath().equals("/file5")); Assert.assertTrue(mue6.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.XATTRS); @@ -279,9 +316,11 @@ public class TestDFSInotifyEventInputStream { Assert.assertTrue(mue6.isxAttrsRemoved()); // SetAclOp (1) - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue7 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue7.getPath().equals("/file5")); Assert.assertTrue(mue7.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.ACLS); @@ -289,9 +328,11 @@ public class TestDFSInotifyEventInputStream { AclEntry.parseAclEntry("user::rwx", true))); // SetAclOp (2) - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.METADATA); - Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) next; + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + txid = checkTxid(batch, txid); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.METADATA); + Event.MetadataUpdateEvent mue8 = (Event.MetadataUpdateEvent) batch.getEvents()[0]; Assert.assertTrue(mue8.getPath().equals("/file5")); Assert.assertTrue(mue8.getMetadataType() == Event.MetadataUpdateEvent.MetadataType.ACLS); @@ -305,7 +346,7 @@ public class TestDFSInotifyEventInputStream { // and we should not have been behind at all when eventsBehind was set // either, since there were few enough events that they should have all // been read to the client during the first poll() call - Assert.assertTrue(eis.getEventsBehindEstimate() == eventsBehind); + Assert.assertTrue(eis.getTxidsBehindEstimate() == eventsBehind); } finally { cluster.shutdown(); @@ -329,13 +370,14 @@ public class TestDFSInotifyEventInputStream { } cluster.getDfsCluster().shutdownNameNode(0); cluster.getDfsCluster().transitionToActive(1); - Event next = null; + EventBatch batch = null; // we can read all of the edits logged by the old active from the new // active for (int i = 0; i < 10; i++) { - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" + + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); @@ -369,11 +411,12 @@ public class TestDFSInotifyEventInputStream { // make sure that the old active can't read any further than the edits // it logged itself (it has no idea whether the in-progress edits from // the other writer have actually been committed) - Event next = null; + EventBatch batch = null; for (int i = 0; i < 10; i++) { - next = waitForNextEvent(eis); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir" + + batch = waitForNextEvents(eis); + Assert.assertEquals(1, batch.getEvents().length); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Assert.assertTrue(((Event.CreateEvent) batch.getEvents()[0]).getPath().equals("/dir" + i)); } Assert.assertTrue(eis.poll() == null); @@ -414,13 +457,13 @@ public class TestDFSInotifyEventInputStream { }, 1, TimeUnit.SECONDS); // a very generous wait period -- the edit will definitely have been // processed by the time this is up - Event next = eis.poll(5, TimeUnit.SECONDS); - Assert.assertTrue(next != null); - Assert.assertTrue(next.getEventType() == Event.EventType.CREATE); - Assert.assertTrue(((Event.CreateEvent) next).getPath().equals("/dir")); + EventBatch batch = eis.poll(5, TimeUnit.SECONDS); + Assert.assertNotNull(batch); + Assert.assertEquals(1, batch.getEvents().length); + Assert.assertTrue(batch.getEvents()[0].getEventType() == Event.EventType.CREATE); + Assert.assertEquals("/dir", ((Event.CreateEvent) batch.getEvents()[0]).getPath()); } finally { cluster.shutdown(); } } - }