Author: szetszwo Date: Fri Nov 9 20:51:09 2012 New Revision: 1407625 URL: http://svn.apache.org/viewvc?rev=1407625&view=rev Log: HDFS-4161. Backport HDFS-1865 "Share LeaseChecker thread among DFSClients" and the related JIRAs: HDFS-278, HDFS-1840, HDFS-1870, HDFS-1890, HDFS-2810, HDFS-3646 and HDFS-2240.
Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java Modified: hadoop/common/branches/branch-1/CHANGES.txt hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLease.java hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Modified: hadoop/common/branches/branch-1/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/CHANGES.txt?rev=1407625&r1=1407624&r2=1407625&view=diff ============================================================================== --- hadoop/common/branches/branch-1/CHANGES.txt (original) +++ hadoop/common/branches/branch-1/CHANGES.txt Fri Nov 9 20:51:09 2012 @@ -313,6 +313,9 @@ Release 1.1.1 - Unreleased HADOOP-8995. Remove unnecessary bogus exception from Configuration.java. (Jing Zhao via suresh) + HDFS-4161. Backport HDFS-1865 "Share LeaseChecker thread among DFSClients" + and the related JIRAs: HDFS-278, HDFS-1840, HDFS-1870, HDFS-1890, HDFS-2810, + HDFS-3646 and HDFS-2240. (szetszwo) BUG FIXES Modified: hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java?rev=1407625&r1=1407624&r2=1407625&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java (original) +++ hadoop/common/branches/branch-1/src/core/org/apache/hadoop/ipc/Client.java Fri Nov 9 20:51:09 2012 @@ -114,6 +114,21 @@ public class Client { } /** + * The time after which a RPC will timeout. If ping is not enabled (via + * ipc.client.ping), then the timeout value is the same as the pingInterval. + * If ping is enabled, then there is no timeout value. + * + * @param conf Configuration + * @return the timeout period in milliseconds. -1 if no timeout value is set + */ + final public static int getTimeout(Configuration conf) { + if (!conf.getBoolean("ipc.client.ping", true)) { + return getPingInterval(conf); + } + return -1; + } + + /** * Increment this client's reference count * */ Modified: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1407625&r1=1407624&r2=1407625&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Nov 9 20:51:09 2012 @@ -80,13 +80,12 @@ public class DFSClient implements FSCons public static final int MAX_BLOCK_ACQUIRE_FAILURES = 3; private static final int TCP_WINDOW_SIZE = 128 * 1024; // 128 KB public final ClientProtocol namenode; - private final ClientProtocol rpcNamenode; + final ClientProtocol rpcNamenode; private final InetSocketAddress nnAddress; final UserGroupInformation ugi; volatile boolean clientRunning = true; - Random r = new Random(); + static Random r = new Random(); final String clientName; - final LeaseChecker leasechecker = new LeaseChecker(); private Configuration conf; private long defaultBlockSize; private short defaultReplication; @@ -108,7 +107,18 @@ public class DFSClient implements FSCons */ private volatile boolean serverSupportsHdfs630 = true; private volatile boolean serverSupportsHdfs200 = true; - + final int hdfsTimeout; // timeout value for a DFS operation. + private final String authority; + + /** + * A map from file names to {@link DFSOutputStream} objects + * that are currently being written by this client. + * Note that a file can only be written by a single client. + */ + private final Map<String, DFSOutputStream> filesBeingWritten + = new HashMap<String, DFSOutputStream>(); + + /** Create a {@link NameNode} proxy */ public static ClientProtocol createNamenode(Configuration conf) throws IOException { return createNamenode(NameNode.getAddress(conf), conf); } @@ -251,14 +261,14 @@ public class DFSClient implements FSCons this.writePacketSize = conf.getInt("dfs.write.packet.size", 64*1024); this.maxBlockAcquireFailures = getMaxBlockAcquireFailures(conf); + this.hdfsTimeout = Client.getTimeout(conf); ugi = UserGroupInformation.getCurrentUser(); + this.authority = nameNodeAddr == null? "null": + nameNodeAddr.getHostName() + ":" + nameNodeAddr.getPort(); + String taskId = conf.get("mapred.task.id", "NONMAPREDUCE"); + this.clientName = "DFSClient_" + taskId + "_" + + r.nextInt() + "_" + Thread.currentThread().getId(); - String taskId = conf.get("mapred.task.id"); - if (taskId != null) { - this.clientName = "DFSClient_" + taskId; - } else { - this.clientName = "DFSClient_" + r.nextInt(); - } defaultBlockSize = conf.getLong("dfs.block.size", DEFAULT_BLOCK_SIZE); defaultReplication = (short) conf.getInt("dfs.replication", 3); @@ -310,20 +320,116 @@ public class DFSClient implements FSCons throw result; } } + + /** Return the lease renewer instance. The renewer thread won't start + * until the first output stream is created. The same instance will + * be returned until all output streams are closed. + */ + public synchronized LeaseRenewer getLeaseRenewer() throws IOException { + return LeaseRenewer.getInstance(authority, ugi, this); + } + + /** Get a lease and start automatic renewal */ + private void beginFileLease(final String src, final DFSOutputStream out) + throws IOException { + getLeaseRenewer().put(src, out, this); + } + + /** Stop renewal of lease for the file. */ + void endFileLease(final String src) throws IOException { + getLeaseRenewer().closeFile(src, this); + } + + /** Put a file. Only called from LeaseRenewer, where proper locking is + * enforced to consistently update its local dfsclients array and + * client's filesBeingWritten map. + */ + void putFileBeingWritten(final String src, final DFSOutputStream out) { + synchronized(filesBeingWritten) { + filesBeingWritten.put(src, out); + } + } + + /** Remove a file. Only called from LeaseRenewer. */ + void removeFileBeingWritten(final String src) { + synchronized(filesBeingWritten) { + filesBeingWritten.remove(src); + } + } + + /** Is file-being-written map empty? */ + boolean isFilesBeingWrittenEmpty() { + synchronized(filesBeingWritten) { + return filesBeingWritten.isEmpty(); + } + } + + /** + * Renew leases. + * @return true if lease was renewed. May return false if this + * client has been closed or has no files open. + **/ + boolean renewLease() throws IOException { + if (clientRunning && !isFilesBeingWrittenEmpty()) { + namenode.renewLease(clientName); + return true; + } + return false; + } + + /** Abort and release resources held. Ignore all errors. */ + void abort() { + clientRunning = false; + closeAllFilesBeingWritten(true); + + try { + // remove reference to this client and stop the renewer, + // if there is no more clients under the renewer. + getLeaseRenewer().closeClient(this); + } catch (IOException ioe) { + LOG.info("Exception occurred while aborting the client. " + ioe); + } + RPC.stopProxy(rpcNamenode); // close connections to the namenode + } + + /** Close/abort all files being written. */ + private void closeAllFilesBeingWritten(final boolean abort) { + for(;;) { + final String src; + final DFSOutputStream out; + synchronized(filesBeingWritten) { + if (filesBeingWritten.isEmpty()) { + return; + } + src = filesBeingWritten.keySet().iterator().next(); + out = filesBeingWritten.remove(src); + } + if (out != null) { + try { + if (abort) { + out.abort(); + } else { + out.close(); + } + } catch(IOException ie) { + LOG.error("Failed to " + (abort? "abort": "close") + " file " + src, + ie); + } + } + } + } + /** * Close the file system, abandoning all of the leases and files being * created and close connections to the namenode. */ public synchronized void close() throws IOException { if(clientRunning) { - leasechecker.close(); + closeAllFilesBeingWritten(false); clientRunning = false; - try { - leasechecker.interruptAndJoin(); - } catch (InterruptedException ie) { - } - + + getLeaseRenewer().closeClient(this); // close connections to the namenode RPC.stopProxy(rpcNamenode); } @@ -757,10 +863,10 @@ public class DFSClient implements FSCons } FsPermission masked = permission.applyUMask(FsPermission.getUMask(conf)); LOG.debug(src + ": masked=" + masked); - OutputStream result = new DFSOutputStream(src, masked, + final DFSOutputStream result = new DFSOutputStream(src, masked, overwrite, createParent, replication, blockSize, progress, buffersize, conf.getInt("io.bytes.per.checksum", 512)); - leasechecker.put(src, result); + beginFileLease(src, result); return result; } @@ -815,7 +921,7 @@ public class DFSClient implements FSCons } final DFSOutputStream result = new DFSOutputStream(src, buffersize, progress, lastBlock, stat, conf.getInt("io.bytes.per.checksum", 512)); - leasechecker.put(src, result); + beginFileLease(src, result); return result; } @@ -1392,117 +1498,6 @@ public class DFSClient implements FSCons throw new IOException("No live nodes contain current block"); } - boolean isLeaseCheckerStarted() { - return leasechecker.daemon != null; - } - - /** Lease management*/ - class LeaseChecker implements Runnable { - /** A map from src -> DFSOutputStream of files that are currently being - * written by this client. - */ - private final SortedMap<String, OutputStream> pendingCreates - = new TreeMap<String, OutputStream>(); - - private Daemon daemon = null; - - synchronized void put(String src, OutputStream out) { - if (clientRunning) { - if (daemon == null) { - daemon = new Daemon(this); - daemon.start(); - } - pendingCreates.put(src, out); - } - } - - synchronized void remove(String src) { - pendingCreates.remove(src); - } - - void interruptAndJoin() throws InterruptedException { - Daemon daemonCopy = null; - synchronized (this) { - if (daemon != null) { - daemon.interrupt(); - daemonCopy = daemon; - } - } - - if (daemonCopy != null) { - LOG.debug("Wait for lease checker to terminate"); - daemonCopy.join(); - } - } - - void close() { - while (true) { - String src; - OutputStream out; - synchronized (this) { - if (pendingCreates.isEmpty()) { - return; - } - src = pendingCreates.firstKey(); - out = pendingCreates.remove(src); - } - if (out != null) { - try { - out.close(); - } catch (IOException ie) { - LOG.error("Exception closing file " + src+ " : " + ie, ie); - } - } - } - } - - private void renew() throws IOException { - synchronized(this) { - if (pendingCreates.isEmpty()) { - return; - } - } - namenode.renewLease(clientName); - } - - /** - * Periodically check in with the namenode and renew all the leases - * when the lease period is half over. - */ - public void run() { - long lastRenewed = 0; - while (clientRunning && !Thread.interrupted()) { - if (System.currentTimeMillis() - lastRenewed > (LEASE_SOFTLIMIT_PERIOD / 2)) { - try { - renew(); - lastRenewed = System.currentTimeMillis(); - } catch (IOException ie) { - LOG.warn("Problem renewing lease for " + clientName, ie); - } - } - - try { - Thread.sleep(1000); - } catch (InterruptedException ie) { - if (LOG.isDebugEnabled()) { - LOG.debug(this + " is interrupted.", ie); - } - return; - } - } - } - - /** {@inheritDoc} */ - public String toString() { - String s = getClass().getSimpleName(); - if (LOG.isTraceEnabled()) { - return s + "@" + DFSClient.this + ": " - + StringUtils.stringifyException(new Throwable("for testing")); - } - return s; - } - } - /** Utility class to encapsulate data node info and its address. */ private static class DNAddrPair { DatanodeInfo info; @@ -3994,12 +3989,12 @@ public class DFSClient implements FSCons throw e; } closeInternal(); - leasechecker.remove(src); if (s != null) { s.close(); s = null; } + endFileLease(src); } /** @@ -4012,6 +4007,20 @@ public class DFSClient implements FSCons closed = true; } + /** + * Aborts this output stream and releases any system + * resources associated with this stream. + */ + synchronized void abort() throws IOException { + if (closed) { + return; + } + setLastException(new IOException("Lease timeout of " + + (hdfsTimeout / 1000) + " seconds expired.")); + closeThreads(); + endFileLease(src); + } + // shutdown datastreamer and responseprocessor threads. private void closeThreads() throws IOException { try { @@ -4080,6 +4089,16 @@ public class DFSClient implements FSCons while (!fileComplete) { fileComplete = namenode.complete(src, clientName); if (!fileComplete) { + if (!clientRunning || + (hdfsTimeout > 0 && + localstart + hdfsTimeout < System.currentTimeMillis())) { + String msg = "Unable to close file because dfsclient " + + " was unable to contact the HDFS servers." + + " clientRunning " + clientRunning + + " hdfsTimeout " + hdfsTimeout; + LOG.info(msg); + throw new IOException(msg); + } try { Thread.sleep(400); if (System.currentTimeMillis() - localstart > 5000) { Added: hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java?rev=1407625&view=auto ============================================================================== --- hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java (added) +++ hadoop/common/branches/branch-1/src/hdfs/org/apache/hadoop/hdfs/LeaseRenewer.java Fri Nov 9 20:51:09 2012 @@ -0,0 +1,457 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.net.SocketTimeoutException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream; +import org.apache.hadoop.hdfs.protocol.FSConstants; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; + +/** + * <p> + * Used by {@link DFSClient} for renewing file-being-written leases + * on the namenode. + * When a file is opened for write (create or append), + * namenode stores a file lease for recording the identity of the writer. + * The writer (i.e. the DFSClient) is required to renew the lease periodically. + * When the lease is not renewed before it expires, + * the namenode considers the writer as failed and then it may either let + * another writer to obtain the lease or close the file. + * </p> + * <p> + * This class also provides the following functionality: + * <ul> + * <li> + * It maintains a map from (namenode, user) pairs to lease renewers. + * The same {@link LeaseRenewer} instance is used for renewing lease + * for all the {@link DFSClient} to the same namenode and the same user. + * </li> + * <li> + * Each renewer maintains a list of {@link DFSClient}. + * Periodically the leases for all the clients are renewed. + * A client is removed from the list when the client is closed. + * </li> + * <li> + * A thread per namenode per user is used by the {@link LeaseRenewer} + * to renew the leases. + * </li> + * </ul> + * </p> + */ +class LeaseRenewer { + static final Log LOG = LogFactory.getLog(LeaseRenewer.class); + + static final long LEASE_RENEWER_GRACE_DEFAULT = 60*1000L; + static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L; + + /** Get a {@link LeaseRenewer} instance */ + static LeaseRenewer getInstance(final String authority, + final UserGroupInformation ugi, final DFSClient dfsc) throws IOException { + final LeaseRenewer r = Factory.INSTANCE.get(authority, ugi); + r.addClient(dfsc); + return r; + } + + /** + * A factory for sharing {@link LeaseRenewer} objects + * among {@link DFSClient} instances + * so that there is only one renewer per authority per user. + */ + private static class Factory { + private static final Factory INSTANCE = new Factory(); + + private static class Key { + /** Namenode info */ + final String authority; + /** User info */ + final UserGroupInformation ugi; + + private Key(final String authority, final UserGroupInformation ugi) { + if (authority == null) { + throw new IllegalArgumentException("authority == null"); + } else if (ugi == null) { + throw new IllegalArgumentException("ugi == null"); + } + + this.authority = authority; + this.ugi = ugi; + } + + @Override + public int hashCode() { + return authority.hashCode() ^ ugi.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (obj != null && obj instanceof Key) { + final Key that = (Key)obj; + return this.authority.equals(that.authority) + && this.ugi.equals(that.ugi); + } + return false; + } + + @Override + public String toString() { + return ugi.getShortUserName() + "@" + authority; + } + } + + /** A map for per user per namenode renewers. */ + private final Map<Key, LeaseRenewer> renewers = new HashMap<Key, LeaseRenewer>(); + + /** Get a renewer. */ + private synchronized LeaseRenewer get(final String authority, + final UserGroupInformation ugi) { + final Key k = new Key(authority, ugi); + LeaseRenewer r = renewers.get(k); + if (r == null) { + r = new LeaseRenewer(k); + renewers.put(k, r); + } + return r; + } + + /** Remove the given renewer. */ + private synchronized void remove(final LeaseRenewer r) { + final LeaseRenewer stored = renewers.get(r.factorykey); + //Since a renewer may expire, the stored renewer can be different. + if (r == stored) { + if (!r.clientsRunning()) { + renewers.remove(r.factorykey); + } + } + } + } + + /** The time in milliseconds that the map became empty. */ + private long emptyTime = Long.MAX_VALUE; + /** A fixed lease renewal time period in milliseconds */ + private long renewal = FSConstants.LEASE_SOFTLIMIT_PERIOD/2; + + /** A daemon for renewing lease */ + private Daemon daemon = null; + /** Only the daemon with currentId should run. */ + private int currentId = 0; + + /** + * A period in milliseconds that the lease renewer thread should run + * after the map became empty. + * In other words, + * if the map is empty for a time period longer than the grace period, + * the renewer should terminate. + */ + private long gracePeriod; + /** + * The time period in milliseconds + * that the renewer sleeps for each iteration. + */ + private long sleepPeriod; + + private final Factory.Key factorykey; + + /** A list of clients corresponding to this renewer. */ + private final List<DFSClient> dfsclients = new ArrayList<DFSClient>(); + + private LeaseRenewer(Factory.Key factorykey) { + this.factorykey = factorykey; + unsyncSetGraceSleepPeriod(LEASE_RENEWER_GRACE_DEFAULT); + } + + /** @return the renewal time in milliseconds. */ + private synchronized long getRenewalTime() { + return renewal; + } + + /** Add a client. */ + private synchronized void addClient(final DFSClient dfsc) { + for(DFSClient c : dfsclients) { + if (c == dfsc) { + //client already exists, nothing to do. + return; + } + } + //client not found, add it + dfsclients.add(dfsc); + + //update renewal time + if (dfsc.hdfsTimeout > 0) { + final long half = dfsc.hdfsTimeout/2; + if (half < renewal) { + this.renewal = half; + } + } + } + + private synchronized boolean clientsRunning() { + for(Iterator<DFSClient> i = dfsclients.iterator(); i.hasNext(); ) { + if (!i.next().clientRunning) { + i.remove(); + } + } + return !dfsclients.isEmpty(); + } + + private synchronized long getSleepPeriod() { + return sleepPeriod; + } + + /** Set the grace period and adjust the sleep period accordingly. */ + synchronized void setGraceSleepPeriod(final long gracePeriod) { + unsyncSetGraceSleepPeriod(gracePeriod); + } + + private void unsyncSetGraceSleepPeriod(final long gracePeriod) { + if (gracePeriod < 100L) { + throw new IllegalArgumentException(gracePeriod + + " = gracePeriod < 100ms is too small."); + } + this.gracePeriod = gracePeriod; + final long half = gracePeriod/2; + this.sleepPeriod = half < LEASE_RENEWER_SLEEP_DEFAULT? + half: LEASE_RENEWER_SLEEP_DEFAULT; + } + + /** Does this renewer have nothing to renew? */ + public boolean isEmpty() { + return dfsclients.isEmpty(); + } + + /** Is the daemon running? */ + synchronized boolean isRunning() { + return daemon != null && daemon.isAlive(); + } + + /** Is the empty period longer than the grace period? */ + private synchronized boolean isRenewerExpired() { + return emptyTime != Long.MAX_VALUE + && System.currentTimeMillis() - emptyTime > gracePeriod; + } + + synchronized void put(final String src, final DFSOutputStream out, + final DFSClient dfsc) { + if (dfsc.clientRunning) { + if (!isRunning() || isRenewerExpired()) { + //start a new deamon with a new id. + final int id = ++currentId; + daemon = new Daemon(new Runnable() { + @Override + public void run() { + try { + LeaseRenewer.this.run(id); + } catch(InterruptedException e) { + if (LOG.isDebugEnabled()) { + LOG.debug(LeaseRenewer.this.getClass().getSimpleName() + + " is interrupted.", e); + } + } finally { + synchronized(LeaseRenewer.this) { + Factory.INSTANCE.remove(LeaseRenewer.this); + } + } + } + }); + daemon.start(); + } + dfsc.putFileBeingWritten(src, out); + emptyTime = Long.MAX_VALUE; + } + } + + /** Close a file. */ + void closeFile(final String src, final DFSClient dfsc) { + dfsc.removeFileBeingWritten(src); + + synchronized(this) { + if (dfsc.isFilesBeingWrittenEmpty()) { + dfsclients.remove(dfsc); + } + //update emptyTime if necessary + if (emptyTime == Long.MAX_VALUE) { + for(DFSClient c : dfsclients) { + if (!c.isFilesBeingWrittenEmpty()) { + //found a non-empty file-being-written map + return; + } + } + //discover the first time that all file-being-written maps are empty. + emptyTime = System.currentTimeMillis(); + } + } + } + + /** Close the given client. */ + synchronized void closeClient(final DFSClient dfsc) { + dfsclients.remove(dfsc); + if (dfsclients.isEmpty()) { + if (!isRunning() || isRenewerExpired()) { + Factory.INSTANCE.remove(LeaseRenewer.this); + return; + } + if (emptyTime == Long.MAX_VALUE) { + //discover the first time that the client list is empty. + emptyTime = System.currentTimeMillis(); + } + } + + //update renewal time + if (renewal == dfsc.hdfsTimeout/2) { + long min = FSConstants.LEASE_SOFTLIMIT_PERIOD; + for(DFSClient c : dfsclients) { + if (c.hdfsTimeout > 0) { + final long half = c.hdfsTimeout; + if (half < min) { + min = half; + } + } + } + renewal = min/2; + } + } + + void interruptAndJoin() throws InterruptedException { + Daemon daemonCopy = null; + synchronized (this) { + if (isRunning()) { + daemon.interrupt(); + daemonCopy = daemon; + } + } + + if (daemonCopy != null) { + if(LOG.isDebugEnabled()) { + LOG.debug("Wait for lease checker to terminate"); + } + daemonCopy.join(); + } + } + + private void renew() throws IOException { + final List<DFSClient> copies; + synchronized(this) { + copies = new ArrayList<DFSClient>(dfsclients); + } + //sort the client names for finding out repeated names. + Collections.sort(copies, new Comparator<DFSClient>() { + @Override + public int compare(final DFSClient left, final DFSClient right) { + return left.clientName.compareTo(right.clientName); + } + }); + String previousName = ""; + for(int i = 0; i < copies.size(); i++) { + final DFSClient c = copies.get(i); + //skip if current client name is the same as the previous name. + if (!c.clientName.equals(previousName)) { + if (!c.renewLease()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Did not renew lease for client " + + c); + } + continue; + } + + previousName = c.clientName; + } + } + } + + /** + * Periodically check in with the namenode and renew all the leases + * when the lease period is half over. + */ + private void run(final int id) throws InterruptedException { + for(long lastRenewed = System.currentTimeMillis(); !Thread.interrupted(); + Thread.sleep(getSleepPeriod())) { + if (System.currentTimeMillis() - lastRenewed >= getRenewalTime()) { + try { + renew(); + lastRenewed = System.currentTimeMillis(); + } catch (SocketTimeoutException ie) { + LOG.warn("Failed to renew lease for " + clientsString() + " for " + + (getRenewalTime()/1000) + " seconds. Aborting ...", ie); + synchronized (this) { + for(DFSClient c : dfsclients) { + c.abort(); + } + } + break; + } catch (IOException ie) { + LOG.warn("Failed to renew lease for " + clientsString() + " for " + + (getRenewalTime()/1000) + " seconds. Will retry shortly ...", + ie); + } + } + + synchronized(this) { + if (id != currentId || isRenewerExpired()) { + //no longer the current daemon or expired + return; + } + + // if no clients are in running state or there is no more clients + // registered with this renewer, stop the daemon after the grace + // period. + if (!clientsRunning() && emptyTime == Long.MAX_VALUE) { + emptyTime = System.currentTimeMillis(); + } + } + } + } + + @Override + public String toString() { + String s = getClass().getSimpleName() + ":" + factorykey; + if (LOG.isTraceEnabled()) { + return s + ", clients=" + clientsString() + ", " + + StringUtils.stringifyException(new Throwable("for testing")); + } + return s; + } + + /** Get the names of all clients */ + private synchronized String clientsString() { + if (dfsclients.isEmpty()) { + return "[]"; + } else { + final StringBuilder b = new StringBuilder("[").append( + dfsclients.get(0).clientName); + for(int i = 1; i < dfsclients.size(); i++) { + b.append(", ").append(dfsclients.get(i).clientName); + } + return b.append("]").toString(); + } + } +} Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=1407625&r1=1407624&r2=1407625&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java Fri Nov 9 20:51:09 2012 @@ -165,7 +165,7 @@ public class AppendTestUtil { LOG.info("leasechecker.interruptAndJoin()"); // lose the lease on the client DistributedFileSystem dfs = (DistributedFileSystem)whichfs; - dfs.dfs.leasechecker.interruptAndJoin(); + dfs.dfs.getLeaseRenewer().interruptAndJoin(); } public static void recoverFile(MiniDFSCluster cluster, FileSystem fs, Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1407625&r1=1407624&r2=1407625&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Fri Nov 9 20:51:09 2012 @@ -46,6 +46,10 @@ import org.junit.Test; public class TestDistributedFileSystem { private static final Random RAN = new Random(); + { + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); + } + private boolean dualPortTesting = false; private Configuration getTestConfiguration() { @@ -102,40 +106,108 @@ public class TestDistributedFileSystem { @Test public void testDFSClient() throws Exception { Configuration conf = getTestConfiguration(); + final long grace = 1000L; MiniDFSCluster cluster = null; try { cluster = new MiniDFSCluster(conf, 2, true, null); - final Path filepath = new Path("/test/LeaseChecker/foo"); + final String filepathstring = "/test/LeaseChecker/foo"; + final Path[] filepaths = new Path[4]; + for(int i = 0; i < filepaths.length; i++) { + filepaths[i] = new Path(filepathstring + i); + } final long millis = System.currentTimeMillis(); { DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); - assertFalse(dfs.dfs.isLeaseCheckerStarted()); + dfs.dfs.getLeaseRenewer().setGraceSleepPeriod(grace); + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); - //create a file - FSDataOutputStream out = dfs.create(filepath); - assertTrue(dfs.dfs.isLeaseCheckerStarted()); - - //write something and close - out.writeLong(millis); - assertTrue(dfs.dfs.isLeaseCheckerStarted()); - out.close(); - assertTrue(dfs.dfs.isLeaseCheckerStarted()); + { + //create a file + final FSDataOutputStream out = dfs.create(filepaths[0]); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //write something + out.writeLong(millis); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //close + out.close(); + Thread.sleep(grace/4*3); + //within grace period + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + for(int i = 0; i < 3; i++) { + if (dfs.dfs.getLeaseRenewer().isRunning()) { + Thread.sleep(grace/2); + } + } + //passed grace period + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); + } + + { + //create file1 + final FSDataOutputStream out1 = dfs.create(filepaths[1]); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //create file2 + final FSDataOutputStream out2 = dfs.create(filepaths[2]); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + + //write something to file1 + out1.writeLong(millis); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //close file1 + out1.close(); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + + //write something to file2 + out2.writeLong(millis); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //close file2 + out2.close(); + Thread.sleep(grace/4*3); + //within grace period + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + } + + { + //create file3 + final FSDataOutputStream out3 = dfs.create(filepaths[3]); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + Thread.sleep(grace/4*3); + //passed previous grace period, should still running + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //write something to file3 + out3.writeLong(millis); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + //close file3 + out3.close(); + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + Thread.sleep(grace/4*3); + //within grace period + assertTrue(dfs.dfs.getLeaseRenewer().isRunning()); + for(int i = 0; i < 3; i++) { + if (dfs.dfs.getLeaseRenewer().isRunning()) { + Thread.sleep(grace/2); + } + } + //passed grace period + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); + } + dfs.close(); } { DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem(); - assertFalse(dfs.dfs.isLeaseCheckerStarted()); + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); //open and check the file - FSDataInputStream in = dfs.open(filepath); - assertFalse(dfs.dfs.isLeaseCheckerStarted()); + FSDataInputStream in = dfs.open(filepaths[0]); + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); assertEquals(millis, in.readLong()); - assertFalse(dfs.dfs.isLeaseCheckerStarted()); + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); in.close(); - assertFalse(dfs.dfs.isLeaseCheckerStarted()); + assertFalse(dfs.dfs.getLeaseRenewer().isRunning()); dfs.close(); } } Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1407625&r1=1407624&r2=1407625&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java Fri Nov 9 20:51:09 2012 @@ -656,7 +656,7 @@ public class TestFileAppend4 extends Tes // has not been completed in the NN. // Lose the leases LOG.info("Killing lease checker"); - client.leasechecker.interruptAndJoin(); + client.getLeaseRenewer().interruptAndJoin(); FileSystem fs1 = cluster.getFileSystem(); FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername( @@ -726,7 +726,7 @@ public class TestFileAppend4 extends Tes // has not been completed in the NN. // Lose the leases LOG.info("Killing lease checker"); - client.leasechecker.interruptAndJoin(); + client.getLeaseRenewer().interruptAndJoin(); FileSystem fs1 = cluster.getFileSystem(); FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername( Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java?rev=1407625&r1=1407624&r2=1407625&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestFileCreation.java Fri Nov 9 20:51:09 2012 @@ -910,6 +910,53 @@ public class TestFileCreation { dfs.close(); } finally { System.out.println("testFsClose successful"); + cluster.shutdown(); + } + } + + // test closing file after cluster is shutdown + public void testFsCloseAfterClusterShutdown() throws IOException { + System.out.println("test testFsCloseAfterClusterShutdown start"); + final int DATANODE_NUM = 3; + + Configuration conf = new Configuration(); + conf.setInt("dfs.replication.min", 3); + conf.setBoolean("ipc.client.ping", false); // hdfs timeout is default 60 seconds + conf.setInt("ipc.ping.interval", 10000); // hdfs timeout is now 10 second + + // create cluster + MiniDFSCluster cluster = new MiniDFSCluster(conf, DATANODE_NUM, true, null); + DistributedFileSystem dfs = null; + try { + cluster.waitActive(); + dfs = (DistributedFileSystem)cluster.getFileSystem(); + + // create a new file. + final String f = DIR + "dhrubashutdown"; + final Path fpath = new Path(f); + FSDataOutputStream out = TestFileCreation.createFile(dfs, fpath, DATANODE_NUM); + out.write("something_dhruba".getBytes()); + out.sync(); // ensure that block is allocated + + // shutdown last datanode in pipeline. + cluster.stopDataNode(2); + + // close file. Since we have set the minReplcatio to 3 but have killed one + // of the three datanodes, the close call will loop until the hdfsTimeout is + // encountered. + boolean hasException = false; + try { + out.close(); + System.out.println("testFsCloseAfterClusterShutdown: Error here"); + } catch (IOException e) { + hasException = true; + } + assertTrue("Failed to close file after cluster shutdown", hasException); + } finally { + System.out.println("testFsCloseAfterClusterShutdown successful"); + if (cluster != null) { + cluster.shutdown(); + } } } } Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLease.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLease.java?rev=1407625&r1=1407624&r2=1407625&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLease.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLease.java Fri Nov 9 20:51:09 2012 @@ -17,13 +17,21 @@ */ package org.apache.hadoop.hdfs; -import java.io.*; +import java.io.DataOutputStream; +import java.io.IOException; +import java.security.PrivilegedExceptionAction; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Assert; +import org.junit.Test; +import org.mockito.Mockito; -public class TestLease extends junit.framework.TestCase { +public class TestLease { static boolean hasLease(MiniDFSCluster cluster, Path src) { return cluster.getNameNode().getNamesystem().leaseManager .getLeaseByPath(src.toString()) != null; @@ -31,12 +39,13 @@ public class TestLease extends junit.fra final Path dir = new Path("/test/lease/"); + @Test public void testLease() throws Exception { Configuration conf = new Configuration(); MiniDFSCluster cluster = new MiniDFSCluster(conf, 2, true, null); try { FileSystem fs = cluster.getFileSystem(); - assertTrue(fs.mkdirs(dir)); + Assert.assertTrue(fs.mkdirs(dir)); Path a = new Path(dir, "a"); Path b = new Path(dir, "b"); @@ -44,24 +53,66 @@ public class TestLease extends junit.fra DataOutputStream a_out = fs.create(a); a_out.writeBytes("something"); - assertTrue(hasLease(cluster, a)); - assertTrue(!hasLease(cluster, b)); + Assert.assertTrue(hasLease(cluster, a)); + Assert.assertTrue(!hasLease(cluster, b)); DataOutputStream b_out = fs.create(b); b_out.writeBytes("something"); - assertTrue(hasLease(cluster, a)); - assertTrue(hasLease(cluster, b)); + Assert.assertTrue(hasLease(cluster, a)); + Assert.assertTrue(hasLease(cluster, b)); a_out.close(); b_out.close(); - assertTrue(!hasLease(cluster, a)); - assertTrue(!hasLease(cluster, b)); + Assert.assertTrue(!hasLease(cluster, a)); + Assert.assertTrue(!hasLease(cluster, b)); fs.delete(dir, true); } finally { if (cluster != null) {cluster.shutdown();} } } + + @Test + public void testFactory() throws Exception { + final String[] groups = new String[]{"supergroup"}; + final UserGroupInformation[] ugi = new UserGroupInformation[3]; + for(int i = 0; i < ugi.length; i++) { + ugi[i] = UserGroupInformation.createUserForTesting("user" + i, groups); + } + + final Configuration conf = new Configuration(); + final DFSClient c1 = createDFSClientAs(ugi[0], conf); + FSDataOutputStream out1 = createFsOut(c1, "/out1"); + final DFSClient c2 = createDFSClientAs(ugi[0], conf); + FSDataOutputStream out2 = createFsOut(c2, "/out2"); + Assert.assertEquals(c1.getLeaseRenewer(), c2.getLeaseRenewer()); + final DFSClient c3 = createDFSClientAs(ugi[1], conf); + FSDataOutputStream out3 = createFsOut(c3, "/out3"); + Assert.assertTrue(c1.getLeaseRenewer() != c3.getLeaseRenewer()); + final DFSClient c4 = createDFSClientAs(ugi[1], conf); + FSDataOutputStream out4 = createFsOut(c4, "/out4"); + Assert.assertEquals(c3.getLeaseRenewer(), c4.getLeaseRenewer()); + final DFSClient c5 = createDFSClientAs(ugi[2], conf); + FSDataOutputStream out5 = createFsOut(c5, "/out5"); + Assert.assertTrue(c1.getLeaseRenewer() != c5.getLeaseRenewer()); + Assert.assertTrue(c3.getLeaseRenewer() != c5.getLeaseRenewer()); + } + + private FSDataOutputStream createFsOut(DFSClient dfs, String path) + throws IOException { + return new FSDataOutputStream(dfs.create(path, true), null); + } + + static final ClientProtocol mcp = Mockito.mock(ClientProtocol.class); + static public DFSClient createDFSClientAs(UserGroupInformation ugi, + final Configuration conf) throws Exception { + return ugi.doAs(new PrivilegedExceptionAction<DFSClient>() { + @Override + public DFSClient run() throws Exception { + return new DFSClient(null, mcp, conf, null); + } + }); + } } Modified: hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java?rev=1407625&r1=1407624&r2=1407625&view=diff ============================================================================== --- hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java (original) +++ hadoop/common/branches/branch-1/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery2.java Fri Nov 9 20:51:09 2012 @@ -157,7 +157,7 @@ public class TestLeaseRecovery2 extends stm.sync(); if (triggerSoftLease) { AppendTestUtil.LOG.info("leasechecker.interruptAndJoin()"); - dfs.dfs.leasechecker.interruptAndJoin(); + dfs.dfs.getLeaseRenewer().interruptAndJoin(); } return filepath; }