http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java new file mode 100644 index 0000000..745ca7e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsDataOutputStream.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client; + +import java.io.IOException; +import java.io.OutputStream; +import java.util.EnumSet; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.crypto.CryptoOutputStream; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.hdfs.DFSOutputStream; + +import com.google.common.base.Preconditions; + +/** + * The Hdfs implementation of {@link FSDataOutputStream}. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class HdfsDataOutputStream extends FSDataOutputStream { + public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats, + long startPosition) throws IOException { + super(out, stats, startPosition); + } + + public HdfsDataOutputStream(DFSOutputStream out, FileSystem.Statistics stats + ) throws IOException { + this(out, stats, 0L); + } + + public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats, + long startPosition) throws IOException { + super(out, stats, startPosition); + Preconditions.checkArgument(out.getWrappedStream() instanceof DFSOutputStream, + "CryptoOutputStream should wrap a DFSOutputStream"); + } + + public HdfsDataOutputStream(CryptoOutputStream out, FileSystem.Statistics stats) + throws IOException { + this(out, stats, 0L); + } + + /** + * Get the actual number of replicas of the current block. + * + * This can be different from the designated replication factor of the file + * because the namenode does not maintain replication for the blocks which are + * currently being written to. Depending on the configuration, the client may + * continue to write to a block even if a few datanodes in the write pipeline + * have failed, or the client may add a new datanodes once a datanode has + * failed. + * + * @return the number of valid replicas of the current block + */ + public synchronized int getCurrentBlockReplication() throws IOException { + OutputStream wrappedStream = getWrappedStream(); + if (wrappedStream instanceof CryptoOutputStream) { + wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream(); + } + return ((DFSOutputStream) wrappedStream).getCurrentBlockReplication(); + } + + /** + * Sync buffered data to DataNodes (flush to disk devices). + * + * @param syncFlags + * Indicate the detailed semantic and actions of the hsync. + * @throws IOException + * @see FSDataOutputStream#hsync() + */ + public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException { + OutputStream wrappedStream = getWrappedStream(); + if (wrappedStream instanceof CryptoOutputStream) { + ((CryptoOutputStream) wrappedStream).flush(); + wrappedStream = ((CryptoOutputStream) wrappedStream).getWrappedStream(); + } + ((DFSOutputStream) wrappedStream).hsync(syncFlags); + } + + public static enum SyncFlag { + + /** + * When doing sync to DataNodes, also update the metadata (block length) in + * the NameNode. + */ + UPDATE_LENGTH, + + /** + * Sync the data to DataNode, close the current block, and allocate a new + * block + */ + END_BLOCK; + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java new file mode 100644 index 0000000..c3d2cfc --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java @@ -0,0 +1,524 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client.impl; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSOutputStream; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; +import com.google.common.annotations.VisibleForTesting; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * <p> + * Used by {@link org.apache.hadoop.hdfs.DFSClient} for renewing file-being-written leases + * on the namenode. + * When a file is opened for write (create or append), + * namenode stores a file lease for recording the identity of the writer. + * The writer (i.e. the DFSClient) is required to renew the lease periodically. + * When the lease is not renewed before it expires, + * the namenode considers the writer as failed and then it may either let + * another writer to obtain the lease or close the file. + * </p> + * <p> + * This class also provides the following functionality: + * <ul> + * <li> + * It maintains a map from (namenode, user) pairs to lease renewers. + * The same {@link LeaseRenewer} instance is used for renewing lease + * for all the {@link org.apache.hadoop.hdfs.DFSClient} to the same namenode and the same user. + * </li> + * <li> + * Each renewer maintains a list of {@link org.apache.hadoop.hdfs.DFSClient}. + * Periodically the leases for all the clients are renewed. + * A client is removed from the list when the client is closed. + * </li> + * <li> + * A thread per namenode per user is used by the {@link LeaseRenewer} + * to renew the leases. + * </li> + * </ul> + * </p> + */ +@InterfaceAudience.Private +public class LeaseRenewer { + static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class); + + static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L; + static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; + + /** Get a {@link LeaseRenewer} instance */ + public static LeaseRenewer getInstance(final String authority, + final UserGroupInformation ugi, final DFSClient dfsc) throws IOException { + final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi); + r.addClient(dfsc); + return r; + } + + /** + * A factory for sharing {@link LeaseRenewer} objects + * among {@link DFSClient} instances + * so that there is only one renewer per authority per user. + */ + private static class Factory { + private static final Factory INSTANCE = new Factory(); + + private static class Key { + /** Namenode info */ + final String authority; + /** User info */ + final UserGroupInformation ugi; + + private Key(final String authority, final UserGroupInformation ugi) { + if (authority == null) { + throw new HadoopIllegalArgumentException("authority == null"); + } else if (ugi == null) { + throw new HadoopIllegalArgumentException("ugi == null"); + } + + this.authority = authority; + this.ugi = ugi; + } + + @Override + public int hashCode() { + return authority.hashCode() ^ ugi.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj != null && obj instanceof Key) { + final Key that = (Key)obj; + return this.authority.equals(that.authority) + && this.ugi.equals(that.ugi); + } + return false; + } + + @Override + public String toString() { + return ugi.getShortUserName() + "@" + authority; + } + } + + /** A map for per user per namenode renewers. */ + private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>(); + + /** Get a renewer. */ + private synchronized LeaseRenewer get(final String authority, + final UserGroupInformation ugi) { + final Key k = new Key(authority, ugi); + LeaseRenewer r = renewers.get(k); + if (r == null) { + r = new LeaseRenewer(k); + renewers.put(k, r); + } + return r; + } + + /** Remove the given renewer. */ + private synchronized void remove(final LeaseRenewer r) { + final LeaseRenewer stored = renewers.get(r.factorykey); + //Since a renewer may expire, the stored renewer can be different. + if (r == stored) { + if (!r.clientsRunning()) { + renewers.remove(r.factorykey); + } + } + } + } + + /** The time in milliseconds that the map became empty. */ + private long emptyTime = Long.MAX_VALUE; + /** A fixed lease renewal time period in milliseconds */ + private long renewal = HdfsConstants.LEASE_SOFTLIMIT_PERIOD / 2; + + /** A daemon for renewing lease */ + private Daemon daemon = null; + /** Only the daemon with currentId should run. */ + private int currentId = 0; + + /** + * A period in milliseconds that the lease renewer thread should run + * after the map became empty. + * In other words, + * if the map is empty for a time period longer than the grace period, + * the renewer should terminate. + */ + private long gracePeriod; + /** + * The time period in milliseconds + * that the renewer sleeps for each iteration. + */ + private long sleepPeriod; + + private final Factory.Key factorykey; + + /** A list of clients corresponding to this renewer. */ + private final List<DFSClient> dfsclients = new ArrayList<DFSClient>(); + + /** + * A stringified stack trace of the call stack when the Lease Renewer + * was instantiated. This is only generated if trace-level logging is + * enabled on this class. + */ + private final String instantiationTrace; + + private LeaseRenewer(Factory.Key factorykey) { + this.factorykey = factorykey; + unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT); + + if (LOG.isTraceEnabled()) { + instantiationTrace = StringUtils.stringifyException( + new Throwable("TRACE")); + } else { + instantiationTrace = null; + } + } + + /** @return the renewal time in milliseconds. */ + private synchronized long getRenewalTime() { + return renewal; + } + + /** Used for testing only. */ + @VisibleForTesting + public synchronized void setRenewalTime(final long renewal) { + this.renewal = renewal; + } + + /** Add a client. */ + private synchronized void addClient(final DFSClient dfsc) { + for(DFSClient c : dfsclients) { + if (c == dfsc) { + //client already exists, nothing to do. + return; + } + } + //client not found, add it + dfsclients.add(dfsc); + + //update renewal time + final int hdfsTimeout = dfsc.getConf().getHdfsTimeout(); + if (hdfsTimeout > 0) { + final long half = hdfsTimeout/2; + if (half < renewal) { + this.renewal = half; + } + } + } + + private synchronized boolean clientsRunning() { + for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) { + if (!i.next().isClientRunning()) { + i.remove(); + } + } + return !dfsclients.isEmpty(); + } + + private synchronized long getSleepPeriod() { + return sleepPeriod; + } + + /** Set the grace period and adjust the sleep period accordingly. */ + synchronized void setGraceSleepPeriod(final long gracePeriod) { + unsyncSetGraceSleepPeriod(gracePeriod); + } + + private void unsyncSetGraceSleepPeriod(final long gracePeriod) { + if (gracePeriod < 100L) { + throw new HadoopIllegalArgumentException(gracePeriod + + " = gracePeriod < 100ms is too small."); + } + this.gracePeriod = gracePeriod; + final long half = gracePeriod/2; + this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT? + half: LEASE_RENEWER_SLEEP_DEFAULT; + } + + /** Is the daemon running? */ + synchronized boolean isRunning() { + return daemon != null && daemon.isAlive(); + } + + /** Does this renewer have nothing to renew? */ + public boolean isEmpty() { + return dfsclients.isEmpty(); + } + + /** Used only by tests */ + synchronized String getDaemonName() { + return daemon.getName(); + } + + /** Is the empty period longer than the grace period? */ + private synchronized boolean isRenewerExpired() { + return emptyTime != Long.MAX_VALUE + && Time.monotonicNow() - emptyTime > gracePeriod; + } + + public synchronized void put(final long inodeId, final DFSOutputStream out, + final DFSClient dfsc) { + if (dfsc.isClientRunning()) { + if (!isRunning() || isRenewerExpired()) { + //start a new deamon with a new id. + final int id = ++currentId; + daemon = new Daemon(new Runnable() { + @Override + public void run() { + try { + if (LOG.isDebugEnabled()) { + LOG.debug("Lease renewer daemon for " + clientsString() + + " with renew id " + id + " started"); + } + LeaseRenewer.this.run(id); + } catch(InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug(LeaseRenewer.this.getClass().getSimpleName() + + " is interrupted.", e); + } + } finally { + synchronized(LeaseRenewer.this) { + Factory.INSTANCE.remove(LeaseRenewer.this); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Lease renewer daemon for " + clientsString() + + " with renew id " + id + " exited"); + } + } + } + + @Override + public String toString() { + return String.valueOf(LeaseRenewer.this); + } + }); + daemon.start(); + } + dfsc.putFileBeingWritten(inodeId, out); + emptyTime = Long.MAX_VALUE; + } + } + + @VisibleForTesting + synchronized void setEmptyTime(long time) { + emptyTime = time; + } + + /** Close a file. */ + public void closeFile(final long inodeId, final DFSClient dfsc) { + dfsc.removeFileBeingWritten(inodeId); + + synchronized(this) { + if (dfsc.isFilesBeingWrittenEmpty()) { + dfsclients.remove(dfsc); + } + //update emptyTime if necessary + if (emptyTime == Long.MAX_VALUE) { + for(DFSClient c : dfsclients) { + if (!c.isFilesBeingWrittenEmpty()) { + //found a non-empty file-being-written map + return; + } + } + //discover the first time that all file-being-written maps are empty. + emptyTime = Time.monotonicNow(); + } + } + } + + /** Close the given client. */ + public synchronized void closeClient(final DFSClient dfsc) { + dfsclients.remove(dfsc); + if (dfsclients.isEmpty()) { + if (!isRunning() || isRenewerExpired()) { + Factory.INSTANCE.remove(LeaseRenewer.this); + return; + } + if (emptyTime == Long.MAX_VALUE) { + //discover the first time that the client list is empty. + emptyTime = Time.monotonicNow(); + } + } + + //update renewal time + if (renewal == dfsc.getConf().getHdfsTimeout()/2) { + long min = HdfsConstants.LEASE_SOFTLIMIT_PERIOD; + for(DFSClient c : dfsclients) { + final int timeout = c.getConf().getHdfsTimeout(); + if (timeout > 0 && timeout < min) { + min = timeout; + } + } + renewal = min/2; + } + } + + public void interruptAndJoin() throws InterruptedException { + Daemon daemonCopy = null; + synchronized (this) { + if (isRunning()) { + daemon.interrupt(); + daemonCopy = daemon; + } + } + + if (daemonCopy != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Wait for lease checker to terminate"); + } + daemonCopy.join(); + } + } + + private void renew() throws IOException { + final List<DFSClient> copies; + synchronized(this) { + copies = new ArrayList<DFSClient>(dfsclients); + } + //sort the client names for finding out repeated names. + Collections.sort(copies, new Comparator<DFSClient>() { + @Override + public int compare(final DFSClient left, final DFSClient right) { + return left.getClientName().compareTo(right.getClientName()); + } + }); + String previousName = ""; + for(int i = 0; i < copies.size(); i++) { + final DFSClient c = copies.get(i); + //skip if current client name is the same as the previous name. + if (!c.getClientName().equals(previousName)) { + if (!c.renewLease()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Did not renew lease for client " + + c); + } + continue; + } + previousName = c.getClientName(); + if (LOG.isDebugEnabled()) { + LOG.debug("Lease renewed for client " + previousName); + } + } + } + } + + /** + * Periodically check in with the namenode and renew all the leases + * when the lease period is half over. + */ + private void run(final int id) throws InterruptedException { + for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted(); + Thread.sleep(getSleepPeriod())) { + final long elapsed = Time.monotonicNow() - lastRenewed; + if (elapsed >= getRenewalTime()) { + try { + renew(); + if (LOG.isDebugEnabled()) { + LOG.debug("Lease renewer daemon for " + clientsString() + + " with renew id " + id + " executed"); + } + lastRenewed = Time.monotonicNow(); + } catch (SocketTimeoutException ie) { + LOG.warn("Failed to renew lease for " + clientsString() + " for " + + (elapsed/1000) + " seconds. Aborting ...", ie); + synchronized (this) { + while (!dfsclients.isEmpty()) { + DFSClient dfsClient = dfsclients.get(0); + dfsClient.closeAllFilesBeingWritten(true); + closeClient(dfsClient); + } + //Expire the current LeaseRenewer thread. + emptyTime = 0; + } + break; + } catch (IOException ie) { + LOG.warn("Failed to renew lease for " + clientsString() + " for " + + (elapsed/1000) + " seconds. Will retry shortly ...", ie); + } + } + + synchronized(this) { + if (id != currentId || isRenewerExpired()) { + if (LOG.isDebugEnabled()) { + if (id != currentId) { + LOG.debug("Lease renewer daemon for " + clientsString() + + " with renew id " + id + " is not current"); + } else { + LOG.debug("Lease renewer daemon for " + clientsString() + + " with renew id " + id + " expired"); + } + } + //no longer the current daemon or expired + return; + } + + // if no clients are in running state or there is no more clients + // registered with this renewer, stop the daemon after the grace + // period. + if (!clientsRunning() && emptyTime == Long.MAX_VALUE) { + emptyTime = Time.monotonicNow(); + } + } + } + } + + @Override + public String toString() { + String s = getClass().getSimpleName() + ":" + factorykey; + if (LOG.isTraceEnabled()) { + return s + ", clients=" + clientsString() + + ", created at " + instantiationTrace; + } + return s; + } + + /** Get the names of all clients */ + private synchronized String clientsString() { + if (dfsclients.isEmpty()) { + return "[]"; + } else { + final StringBuilder b = new StringBuilder("[").append( + dfsclients.get(0).getClientName()); + for(int i = 1; i < dfsclients.size(); i++) { + b.append(", ").append(dfsclients.get(i).getClientName()); + } + return b.append("]").toString(); + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java new file mode 100644 index 0000000..e4b51c5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/inotify/MissingEventsException.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.inotify; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MissingEventsException extends Exception { + private static final long serialVersionUID = 1L; + + private long expectedTxid; + private long actualTxid; + + public MissingEventsException() {} + + public MissingEventsException(long expectedTxid, long actualTxid) { + this.expectedTxid = expectedTxid; + this.actualTxid = actualTxid; + } + + public long getExpectedTxid() { + return expectedTxid; + } + + public long getActualTxid() { + return actualTxid; + } + + @Override + public String toString() { + return "We expected the next batch of events to start with transaction ID " + + expectedTxid + ", but it instead started with transaction ID " + + actualTxid + ". Most likely the intervening transactions were cleaned " + + "up as part of checkpointing."; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java new file mode 100644 index 0000000..1210999 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/AclException.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Indicates a failure manipulating an ACL. + */ +@InterfaceAudience.Private +public class AclException extends IOException { + private static final long serialVersionUID = 1L; + + /** + * Creates a new AclException. + * + * @param message String message + */ + public AclException(String message) { + super(message); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java new file mode 100644 index 0000000..923cdb4 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveIterator.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BatchedRemoteIterator; +import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +import com.google.common.base.Preconditions; + +/** + * CacheDirectiveIterator is a remote iterator that iterates cache directives. + * It supports retrying in case of namenode failover. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class CacheDirectiveIterator + extends BatchedRemoteIterator<Long, CacheDirectiveEntry> { + + private CacheDirectiveInfo filter; + private final ClientProtocol namenode; + private final Sampler<?> traceSampler; + + public CacheDirectiveIterator(ClientProtocol namenode, + CacheDirectiveInfo filter, Sampler<?> traceSampler) { + super(0L); + this.namenode = namenode; + this.filter = filter; + this.traceSampler = traceSampler; + } + + private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) { + CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder(filter); + builder.setId(null); + return builder.build(); + } + + /** + * Used for compatibility when communicating with a server version that + * does not support filtering directives by ID. + */ + private static class SingleEntry implements + BatchedEntries<CacheDirectiveEntry> { + + private final CacheDirectiveEntry entry; + + public SingleEntry(final CacheDirectiveEntry entry) { + this.entry = entry; + } + + @Override + public CacheDirectiveEntry get(int i) { + if (i > 0) { + return null; + } + return entry; + } + + @Override + public int size() { + return 1; + } + + @Override + public boolean hasMore() { + return false; + } + } + + @Override + public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey) + throws IOException { + BatchedEntries<CacheDirectiveEntry> entries = null; + TraceScope scope = Trace.startSpan("listCacheDirectives", traceSampler); + try { + entries = namenode.listCacheDirectives(prevKey, filter); + } catch (IOException e) { + if (e.getMessage().contains("Filtering by ID is unsupported")) { + // Retry case for old servers, do the filtering client-side + long id = filter.getId(); + filter = removeIdFromFilter(filter); + // Using id - 1 as prevId should get us a window containing the id + // This is somewhat brittle, since it depends on directives being + // returned in order of ascending ID. + entries = namenode.listCacheDirectives(id - 1, filter); + for (int i=0; i<entries.size(); i++) { + CacheDirectiveEntry entry = entries.get(i); + if (entry.getInfo().getId().equals((Long)id)) { + return new SingleEntry(entry); + } + } + throw new RemoteException(InvalidRequestException.class.getName(), + "Did not find requested id " + id); + } + throw e; + } finally { + scope.close(); + } + Preconditions.checkNotNull(entries); + return entries; + } + + @Override + public Long elementToPrevKey(CacheDirectiveEntry entry) { + return entry.getInfo().getId(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java new file mode 100644 index 0000000..e9481f7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolIterator.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BatchedRemoteIterator; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +/** + * CachePoolIterator is a remote iterator that iterates cache pools. + * It supports retrying in case of namenode failover. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class CachePoolIterator + extends BatchedRemoteIterator<String, CachePoolEntry> { + + private final ClientProtocol namenode; + private final Sampler traceSampler; + + public CachePoolIterator(ClientProtocol namenode, Sampler traceSampler) { + super(""); + this.namenode = namenode; + this.traceSampler = traceSampler; + } + + @Override + public BatchedEntries<CachePoolEntry> makeRequest(String prevKey) + throws IOException { + TraceScope scope = Trace.startSpan("listCachePools", traceSampler); + try { + return namenode.listCachePools(prevKey); + } finally { + scope.close(); + } + } + + @Override + public String elementToPrevKey(CachePoolEntry entry) { + return entry.getInfo().getPoolName(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java new file mode 100644 index 0000000..0141215 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/EncryptionZoneIterator.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.BatchedRemoteIterator; +import org.apache.htrace.Sampler; +import org.apache.htrace.Trace; +import org.apache.htrace.TraceScope; + +/** + * EncryptionZoneIterator is a remote iterator that iterates over encryption + * zones. It supports retrying in case of namenode failover. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class EncryptionZoneIterator + extends BatchedRemoteIterator<Long, EncryptionZone> { + + private final ClientProtocol namenode; + private final Sampler<?> traceSampler; + + public EncryptionZoneIterator(ClientProtocol namenode, + Sampler<?> traceSampler) { + super(Long.valueOf(0)); + this.namenode = namenode; + this.traceSampler = traceSampler; + } + + @Override + public BatchedEntries<EncryptionZone> makeRequest(Long prevId) + throws IOException { + TraceScope scope = Trace.startSpan("listEncryptionZones", traceSampler); + try { + return namenode.listEncryptionZones(prevId); + } finally { + scope.close(); + } + } + + @Override + public Long elementToPrevKey(EncryptionZone entry) { + return entry.getId(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java new file mode 100644 index 0000000..25084c7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/QuotaByStorageTypeExceededException.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.StorageType; + +import static org.apache.hadoop.util.StringUtils.TraditionalBinaryPrefix.long2String; + +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class QuotaByStorageTypeExceededException extends QuotaExceededException { + protected static final long serialVersionUID = 1L; + protected StorageType type; + + public QuotaByStorageTypeExceededException() {} + + public QuotaByStorageTypeExceededException(String msg) { + super(msg); + } + + public QuotaByStorageTypeExceededException(long quota, long count, StorageType type) { + super(quota, count); + this.type = type; + } + + @Override + public String getMessage() { + String msg = super.getMessage(); + if (msg == null) { + return "Quota by storage type : " + type.toString() + + " on path : " + (pathName==null ? "": pathName) + + " is exceeded. quota = " + long2String(quota, "B", 2) + + " but space consumed = " + long2String(count, "B", 2); + } else { + return msg; + } + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java new file mode 100644 index 0000000..03fb704 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/UnresolvedPathException.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.protocol; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.fs.UnresolvedLinkException; +import org.apache.hadoop.fs.Path; + +/** + * Thrown when a symbolic link is encountered in a path. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public final class UnresolvedPathException extends UnresolvedLinkException { + private static final long serialVersionUID = 1L; + private String path; // The path containing the link + private String preceding; // The path part preceding the link + private String remainder; // The path part following the link + private String linkTarget; // The link's target + + /** + * Used by RemoteException to instantiate an UnresolvedPathException. + */ + public UnresolvedPathException(String msg) { + super(msg); + } + + public UnresolvedPathException(String path, String preceding, + String remainder, String linkTarget) { + this.path = path; + this.preceding = preceding; + this.remainder = remainder; + this.linkTarget = linkTarget; + } + + /** + * Return a path with the link resolved with the target. + */ + public Path getResolvedPath() throws IOException { + // If the path is absolute we cam throw out the preceding part and + // just append the remainder to the target, otherwise append each + // piece to resolve the link in path. + boolean noRemainder = (remainder == null || "".equals(remainder)); + Path target = new Path(linkTarget); + if (target.isUriPathAbsolute()) { + return noRemainder ? target : new Path(target, remainder); + } else { + return noRemainder + ? new Path(preceding, target) + : new Path(new Path(preceding, linkTarget), remainder); + } + } + + @Override + public String getMessage() { + String msg = super.getMessage(); + if (msg != null) { + return msg; + } + String myMsg = "Unresolved path " + path; + try { + return getResolvedPath().toString(); + } catch (IOException e) { + // Ignore + } + return myMsg; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java new file mode 100644 index 0000000..c69986a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/ReplaceDatanodeOnFailure.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.protocol.datatransfer; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; + +/** + * The setting of replace-datanode-on-failure feature. + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ReplaceDatanodeOnFailure { + /** The replacement policies */ + public enum Policy { + /** The feature is disabled in the entire site. */ + DISABLE(Condition.FALSE), + /** Never add a new datanode. */ + NEVER(Condition.FALSE), + /** @see ReplaceDatanodeOnFailure.Condition#DEFAULT */ + DEFAULT(Condition.DEFAULT), + /** Always add a new datanode when an existing datanode is removed. */ + ALWAYS(Condition.TRUE); + + private final Condition condition; + + private Policy(Condition condition) { + this.condition = condition; + } + + Condition getCondition() { + return condition; + } + } + + /** Datanode replacement condition */ + private static interface Condition { + /** Return true unconditionally. */ + static final Condition TRUE = new Condition() { + @Override + public boolean satisfy(short replication, DatanodeInfo[] existings, + int nExistings, boolean isAppend, boolean isHflushed) { + return true; + } + }; + + /** Return false unconditionally. */ + static final Condition FALSE = new Condition() { + @Override + public boolean satisfy(short replication, DatanodeInfo[] existings, + int nExistings, boolean isAppend, boolean isHflushed) { + return false; + } + }; + + /** + * DEFAULT condition: + * Let r be the replication number. + * Let n be the number of existing datanodes. + * Add a new datanode only if r >= 3 and either + * (1) floor(r/2) >= n; or + * (2) r > n and the block is hflushed/appended. + */ + static final Condition DEFAULT = new Condition() { + @Override + public boolean satisfy(final short replication, + final DatanodeInfo[] existings, final int n, final boolean isAppend, + final boolean isHflushed) { + if (replication < 3) { + return false; + } else { + if (n <= (replication/2)) { + return true; + } else { + return isAppend || isHflushed; + } + } + } + }; + + /** Is the condition satisfied? */ + public boolean satisfy(short replication, DatanodeInfo[] existings, + int nExistings, boolean isAppend, boolean isHflushed); + } + + private final Policy policy; + private final boolean bestEffort; + + public ReplaceDatanodeOnFailure(Policy policy, boolean bestEffort) { + this.policy = policy; + this.bestEffort = bestEffort; + } + + /** Check if the feature is enabled. */ + public void checkEnabled() { + if (policy == Policy.DISABLE) { + throw new UnsupportedOperationException( + "This feature is disabled. Please refer to " + + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY + + " configuration property."); + } + } + + /** + * Best effort means that the client will try to replace the failed datanode + * (provided that the policy is satisfied), however, it will continue the + * write operation in case that the datanode replacement also fails. + * + * @return Suppose the datanode replacement fails. + * false: An exception should be thrown so that the write will fail. + * true : The write should be resumed with the remaining datandoes. + */ + public boolean isBestEffort() { + return bestEffort; + } + + /** Does it need a replacement according to the policy? */ + public boolean satisfy( + final short replication, final DatanodeInfo[] existings, + final boolean isAppend, final boolean isHflushed) { + final int n = existings == null? 0: existings.length; + if (n == 0 || n >= replication) { + //don't need to add datanode for any policy. + return false; + } else { + return policy.getCondition().satisfy( + replication, existings, n, isAppend, isHflushed); + } + } + + @Override + public String toString() { + return policy.toString(); + } + + /** Get the setting from configuration. */ + public static ReplaceDatanodeOnFailure get(final Configuration conf) { + final Policy policy = getPolicy(conf); + final boolean bestEffort = conf.getBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_DEFAULT); + + return new ReplaceDatanodeOnFailure(policy, bestEffort); + } + + private static Policy getPolicy(final Configuration conf) { + final boolean enabled = conf.getBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_DEFAULT); + if (!enabled) { + return Policy.DISABLE; + } + + final String policy = conf.get( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_DEFAULT); + for(int i = 1; i < Policy.values().length; i++) { + final Policy p = Policy.values()[i]; + if (p.name().equalsIgnoreCase(policy)) { + return p; + } + } + throw new HadoopIllegalArgumentException("Illegal configuration value for " + + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY + + ": " + policy); + } + + /** Write the setting to configuration. */ + public static void write(final Policy policy, + final boolean bestEffort, final Configuration conf) { + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY, + policy != Policy.DISABLE); + conf.set( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY, + policy.name()); + conf.setBoolean( + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.BEST_EFFORT_KEY, + bestEffort); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java new file mode 100644 index 0000000..b159d3a --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaNotFoundException.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.hadoop.hdfs.server.datanode; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; + +/** + * Exception indicating that DataNode does not have a replica + * that matches the target block. + */ +public class ReplicaNotFoundException extends IOException { + private static final long serialVersionUID = 1L; + public final static String NON_RBW_REPLICA = "Cannot recover a non-RBW replica "; + public final static String UNFINALIZED_REPLICA = + "Cannot append to an unfinalized replica "; + public final static String UNFINALIZED_AND_NONRBW_REPLICA = + "Cannot recover append/close to a replica that's not FINALIZED and not RBW "; + public final static String NON_EXISTENT_REPLICA = + "Cannot append to a non-existent replica "; + public final static String UNEXPECTED_GS_REPLICA = + "Cannot append to a replica with unexpected generation stamp "; + + public ReplicaNotFoundException() { + super(); + } + + public ReplicaNotFoundException(ExtendedBlock b) { + super("Replica not found for " + b); + } + + public ReplicaNotFoundException(String msg) { + super(msg); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java new file mode 100644 index 0000000..0bdd2a5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/RetryStartFileException.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; + +@InterfaceAudience.Private +public class RetryStartFileException extends IOException { + private static final long serialVersionUID = 1L; + + public RetryStartFileException() { + super("Preconditions for creating a file failed because of a " + + "transient error, retry create later."); + } + + public RetryStartFileException(String s) { + super(s); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b3940b5..4ebf437 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -977,6 +977,9 @@ Release 2.8.0 - UNRELEASED HDFS-8873. Allow the directoryScanner to be rate-limited (Daniel Templeton via Colin P. McCabe) + HDFS-8053. Move DFSIn/OutputStream and related classes to + hadoop-hdfs-client. (Mingliang Liu via wheat9) + OPTIMIZATIONS HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index 60029e0..c88c4c4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -74,15 +74,6 @@ </Match> <!-- - ResponseProccessor is thread that is designed to catch RuntimeException. - --> - <Match> - <Class name="org.apache.hadoop.hdfs.DataStreamer$ResponseProcessor" /> - <Method name="run" /> - <Bug pattern="REC_CATCH_EXCEPTION" /> - </Match> - - <!-- lastAppliedTxid is carefully unsynchronized in the BackupNode in a couple spots. See the comments in BackupImage for justification. --> @@ -196,14 +187,4 @@ <Method name="assertAllResultsEqual" /> <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" /> </Match> - - <!-- - We use a separate lock to guard cachingStrategy in order to separate - locks for p-reads from seek + read invocations. - --> - <Match> - <Class name="org.apache.hadoop.hdfs.DFSInputStream" /> - <Field name="cachingStrategy" /> - <Bug pattern="IS2_INCONSISTENT_SYNC" /> - </Match> </FindBugsFilter> http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java deleted file mode 100644 index 0ccacda..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java +++ /dev/null @@ -1,47 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.fs; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.protocol.LocatedBlock; - -/** - * Wrapper for {@link BlockLocation} that also includes a {@link LocatedBlock}, - * allowing more detailed queries to the datanode about a block. - * - */ -@InterfaceAudience.Private -@InterfaceStability.Unstable -public class HdfsBlockLocation extends BlockLocation { - - private final LocatedBlock block; - - public HdfsBlockLocation(BlockLocation loc, LocatedBlock block) - throws IOException { - // Initialize with data from passed in BlockLocation - super(loc); - this.block = block; - } - - public LocatedBlock getLocatedBlock() { - return block; - } -} http://git-wip-us.apache.org/repos/asf/hadoop/blob/bf37d3d8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java ---------------------------------------------------------------------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java deleted file mode 100644 index 7bba8a4..0000000 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockMissingException.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdfs; - -import java.io.IOException; - -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; - -/** - * This exception is thrown when a read encounters a block that has no locations - * associated with it. - */ -@InterfaceAudience.Private -@InterfaceStability.Evolving -public class BlockMissingException extends IOException { - - private static final long serialVersionUID = 1L; - - private final String filename; - private final long offset; - - /** - * An exception that indicates that file was corrupted. - * @param filename name of corrupted file - * @param description a description of the corruption details - */ - public BlockMissingException(String filename, String description, long offset) { - super(description); - this.filename = filename; - this.offset = offset; - } - - /** - * Returns the name of the corrupted file. - * @return name of corrupted file - */ - public String getFile() { - return filename; - } - - /** - * Returns the offset at which this file is corrupted - * @return offset of corrupted file - */ - public long getOffset() { - return offset; - } -}