Author: brandonli
Date: Thu Nov  7 21:49:13 2013
New Revision: 1539834

URL: http://svn.apache.org/r1539834
Log:
HDFS-5364. Add OpenFileCtx cache. Contributed by Brandon Li

Added:
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java
Modified:
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
    
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java?rev=1539834&r1=1539833&r2=1539834&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java
 Thu Nov  7 21:49:13 2013
@@ -23,33 +23,47 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.nfs.mount.Mountd;
+import org.apache.hadoop.mount.MountdBase;
 import org.apache.hadoop.nfs.nfs3.Nfs3Base;
 import org.apache.hadoop.util.StringUtils;
 
+import com.google.common.annotations.VisibleForTesting;
+
 /**
  * Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}.
  * Currently Mountd program is also started inside this class.
  * Only TCP server is supported and UDP is not supported.
  */
 public class Nfs3 extends Nfs3Base {
+  private Mountd mountd;
+  
   static {
     Configuration.addDefaultResource("hdfs-default.xml");
     Configuration.addDefaultResource("hdfs-site.xml");
   }
   
   public Nfs3(List<String> exports) throws IOException {
-    super(new Mountd(exports), new RpcProgramNfs3());
+    super(new RpcProgramNfs3());
+    mountd = new Mountd(exports);
   }
 
+  @VisibleForTesting
   public Nfs3(List<String> exports, Configuration config) throws IOException {
-    super(new Mountd(exports, config), new RpcProgramNfs3(config), config);
+    super(new RpcProgramNfs3(config), config);
+    mountd = new Mountd(exports, config);
   }
 
+  public Mountd getMountd() {
+    return mountd;
+  }
+  
   public static void main(String[] args) throws IOException {
     StringUtils.startupShutdownMessage(Nfs3.class, args, LOG);
     List<String> exports = new ArrayList<String>();
     exports.add("/");
+    
     final Nfs3 nfsServer = new Nfs3(exports);
+    nfsServer.mountd.start(true); // Start mountd
     nfsServer.start(true);
   }
 }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1539834&r1=1539833&r2=1539834&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
 Thu Nov  7 21:49:13 2013
@@ -24,7 +24,6 @@ import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.nio.ByteBuffer;
 import java.nio.channels.ClosedChannelException;
-import java.security.InvalidParameterException;
 import java.util.EnumSet;
 import java.util.Iterator;
 import java.util.Map.Entry;
@@ -96,7 +95,7 @@ class OpenFileCtx {
   
   // It's updated after each sync to HDFS
   private Nfs3FileAttributes latestAttr;
-
+  
   private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites;
   
   private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits;
@@ -165,10 +164,22 @@ class OpenFileCtx {
     return System.currentTimeMillis() - lastAccessTime > streamTimeout;
   }
   
+  long getLastAccessTime() {
+    return lastAccessTime;  
+  }
+  
   public long getNextOffset() {
     return nextOffset.get();
   }
   
+  boolean getActiveState() {
+    return this.activeState;
+  }
+  
+  boolean hasPendingWork() {
+    return (pendingWrites.size() != 0 || pendingCommits.size() != 0);
+  }
+  
   // Increase or decrease the memory occupation of non-sequential writes
   private long updateNonSequentialWriteInMemory(long count) {
     long newValue = nonSequentialWriteInMemory.addAndGet(count);
@@ -792,19 +803,18 @@ class OpenFileCtx {
    * @return true, remove stream; false, keep stream
    */
   public synchronized boolean streamCleanup(long fileId, long streamTimeout) {
-    if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) {
-      throw new InvalidParameterException("StreamTimeout" + streamTimeout
-          + "ms is less than MINIMIUM_STREAM_TIMEOUT "
-          + WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms");
+    Preconditions
+        .checkState(streamTimeout >= 
Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+    if (!activeState) {
+      return true;
     }
     
     boolean flag = false;
     // Check the stream timeout
     if (checkStreamTimeout(streamTimeout)) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("closing stream for fileId:" + fileId);
+        LOG.debug("stream can be closed for fileId:" + fileId);
       }
-      cleanup();
       flag = true;
     }
     return flag;
@@ -975,7 +985,7 @@ class OpenFileCtx {
     FileHandle handle = writeCtx.getHandle();
     if (LOG.isDebugEnabled()) {
       LOG.debug("do write, fileId: " + handle.getFileId() + " offset: "
-          + offset + " length:" + count + " stableHow:" + 
stableHow.getValue());
+          + offset + " length:" + count + " stableHow:" + stableHow.name());
     }
 
     try {
@@ -1056,7 +1066,7 @@ class OpenFileCtx {
     }
   }
 
-  private synchronized void cleanup() {
+  synchronized void cleanup() {
     if (!activeState) {
       LOG.info("Current OpenFileCtx is already inactive, no need to cleanup.");
       return;
@@ -1064,7 +1074,7 @@ class OpenFileCtx {
     activeState = false;
 
     // stop the dump thread
-    if (dumpThread != null) {
+    if (dumpThread != null && dumpThread.isAlive()) {
       dumpThread.interrupt();
       try {
         dumpThread.join(3000);
@@ -1146,4 +1156,10 @@ class OpenFileCtx {
   void setActiveStatusForTest(boolean activeState) {
     this.activeState = activeState;
   }
+  
+  @Override
+  public String toString() {
+    return String.format("activeState: %b asyncStatus: %b nextOffset: %d",
+        activeState, asyncStatus, nextOffset.get());
+  }
 }
\ No newline at end of file

Added: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java?rev=1539834&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java
 (added)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java
 Thu Nov  7 21:49:13 2013
@@ -0,0 +1,270 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentMap;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.nfs.nfs3.FileHandle;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
+import org.apache.hadoop.util.Daemon;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Maps;
+
+/**
+ * A cache saves OpenFileCtx objects for different users. Each cache entry is
+ * used to maintain the writing context for a single file.
+ */
+class OpenFileCtxCache {
+  private static final Log LOG = LogFactory.getLog(OpenFileCtxCache.class);
+  // Insert and delete with openFileMap are synced
+  private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
+      .newConcurrentMap();
+
+  private final int maxStreams;
+  private final long streamTimeout;
+  private final StreamMonitor streamMonitor;
+
+  OpenFileCtxCache(Configuration config, long streamTimeout) {
+    maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES,
+        Nfs3Constant.MAX_OPEN_FILES_DEFAULT);
+    LOG.info("Maximum open streams is " + maxStreams);
+    this.streamTimeout = streamTimeout;
+    streamMonitor = new StreamMonitor();
+  }
+
+  /**
+   * The entry to be evicted is based on the following rules:<br>
+   * 1. if the OpenFileCtx has any pending task, it will not be chosen.<br>
+   * 2. if there is inactive OpenFileCtx, the first found one is to evict. <br>
+   * 3. For OpenFileCtx entries don't belong to group 1 or 2, the idlest one 
+   * is select. If it's idle longer than OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT, it
+   * will be evicted. Otherwise, the whole eviction request is failed.
+   */
+  @VisibleForTesting
+  Entry<FileHandle, OpenFileCtx> getEntryToEvict() {
+    Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
+        .iterator();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("openFileMap size:" + openFileMap.size());
+    }
+
+    Entry<FileHandle, OpenFileCtx> idlest = null;
+    
+    while (it.hasNext()) {
+      Entry<FileHandle, OpenFileCtx> pairs = it.next();
+      OpenFileCtx ctx = pairs.getValue();
+      if (!ctx.getActiveState()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Got one inactive stream: " + ctx);
+        }
+        return pairs;
+      }
+      if (ctx.hasPendingWork()) {
+        // Always skip files with pending work.
+        continue;
+      }
+      if (idlest == null) {
+        idlest = pairs;
+      } else {
+        if (ctx.getLastAccessTime() < idlest.getValue().getLastAccessTime()) {
+          idlest = pairs;
+        }
+      }
+    }
+
+    if (idlest == null) {
+      LOG.warn("No eviction candidate. All streams have pending work.");
+      return null;
+    } else {
+      long idleTime = System.currentTimeMillis()
+          - idlest.getValue().getLastAccessTime();
+      if (idleTime < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("idlest stream's idle time:" + idleTime);
+        }
+        LOG.warn("All opened streams are busy, can't remove any from cache.");
+        return null;
+      } else {
+        return idlest;
+      }
+    }
+  }
+
+  boolean put(FileHandle h, OpenFileCtx context) {
+    OpenFileCtx toEvict = null;
+    synchronized (this) {
+      Preconditions.checkState(openFileMap.size() <= this.maxStreams,
+          "stream cache size " + openFileMap.size()
+              + "  is larger than maximum" + this.maxStreams);
+      if (openFileMap.size() == this.maxStreams) {
+        Entry<FileHandle, OpenFileCtx> pairs = getEntryToEvict();
+        if (pairs ==null) {
+          return false;
+        } else {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Evict stream ctx: " + pairs.getValue());
+          }
+          toEvict = openFileMap.remove(pairs.getKey());
+          Preconditions.checkState(toEvict == pairs.getValue(),
+              "The deleted entry is not the same as odlest found.");
+        }
+      }
+      openFileMap.put(h, context);
+    }
+    
+    // Cleanup the old stream outside the lock
+    if (toEvict != null) {
+      toEvict.cleanup();
+    }
+    return true;
+  }
+
+  @VisibleForTesting
+  void scan(long streamTimeout) {
+    ArrayList<OpenFileCtx> ctxToRemove = new ArrayList<OpenFileCtx>();
+    Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
+        .iterator();
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("openFileMap size:" + openFileMap.size());
+    }
+
+    while (it.hasNext()) {
+      Entry<FileHandle, OpenFileCtx> pairs = it.next();
+      FileHandle handle = pairs.getKey();
+      OpenFileCtx ctx = pairs.getValue();
+      if (!ctx.streamCleanup(handle.getFileId(), streamTimeout)) {
+        continue;
+      }
+
+      // Check it again inside lock before removing
+      synchronized (this) {
+        OpenFileCtx ctx2 = openFileMap.get(handle);
+        if (ctx2 != null) {
+          if (ctx2.streamCleanup(handle.getFileId(), streamTimeout)) {
+            openFileMap.remove(handle);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("After remove stream " + handle.getFileId()
+                  + ", the stream number:" + openFileMap.size());
+            }
+            ctxToRemove.add(ctx2);
+          }
+        }
+      }
+    }
+
+    // Invoke the cleanup outside the lock
+    for (OpenFileCtx ofc : ctxToRemove) {
+      ofc.cleanup();
+    }
+  }
+
+  OpenFileCtx get(FileHandle key) {
+    return openFileMap.get(key);
+  }
+
+  int size() {
+    return openFileMap.size();
+  }
+
+  void start() {
+    streamMonitor.start();
+  }
+
+  // Evict all entries
+  void cleanAll() {
+    ArrayList<OpenFileCtx> cleanedContext = new ArrayList<OpenFileCtx>();
+    synchronized (this) {
+      Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
+          .iterator();
+      if (LOG.isTraceEnabled()) {
+        LOG.trace("openFileMap size:" + openFileMap.size());
+      }
+
+      while (it.hasNext()) {
+        Entry<FileHandle, OpenFileCtx> pairs = it.next();
+        OpenFileCtx ctx = pairs.getValue();
+        it.remove();
+        cleanedContext.add(ctx);
+      }
+    }
+
+    // Invoke the cleanup outside the lock
+    for (OpenFileCtx ofc : cleanedContext) {
+      ofc.cleanup();
+    }
+  }
+
+  void shutdown() {
+    // stop the dump thread
+    if (streamMonitor != null && streamMonitor.isAlive()) {
+      streamMonitor.shouldRun(false);
+      streamMonitor.interrupt();
+      try {
+        streamMonitor.join(3000);
+      } catch (InterruptedException e) {
+      }
+    }
+    
+    cleanAll();
+  }
+
+  /**
+   * StreamMonitor wakes up periodically to find and closes idle streams.
+   */
+  class StreamMonitor extends Daemon {
+    private final static int rotation = 5 * 1000; // 5 seconds
+    private long lastWakeupTime = 0;
+    private boolean shouldRun = true;
+    
+    void shouldRun(boolean shouldRun) {
+      this.shouldRun = shouldRun;
+    }
+    
+    @Override
+    public void run() {
+      while (shouldRun) {
+        scan(streamTimeout);
+
+        // Check if it can sleep
+        try {
+          long workedTime = System.currentTimeMillis() - lastWakeupTime;
+          if (workedTime < rotation) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("StreamMonitor can still have a sleep:"
+                  + ((rotation - workedTime) / 1000));
+            }
+            Thread.sleep(rotation - workedTime);
+          }
+          lastWakeupTime = System.currentTimeMillis();
+
+        } catch (InterruptedException e) {
+          LOG.info("StreamMonitor got interrupted");
+          return;
+        }
+      }
+    }
+  }
+}
\ No newline at end of file

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1539834&r1=1539833&r2=1539834&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
 Thu Nov  7 21:49:13 2013
@@ -214,6 +214,11 @@ public class RpcProgramNfs3 extends RpcP
     }
   }
   
+  @Override
+  public void startDaemons() {
+     writeManager.startAsyncDataSerivce();
+  }
+  
   /******************************************************
    * RPC call handlers
    ******************************************************/
@@ -778,7 +783,8 @@ public class RpcProgramNfs3 extends RpcP
 
     int createMode = request.getMode();
     if ((createMode != Nfs3Constant.CREATE_EXCLUSIVE)
-        && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)) 
{
+        && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)
+        && request.getObjAttr().getSize() != 0) {
       LOG.error("Setting file size is not supported when creating file: "
           + fileName + " dir fileId:" + dirHandle.getFileId());
       return new CREATE3Response(Nfs3Status.NFS3ERR_INVAL);
@@ -831,6 +837,23 @@ public class RpcProgramNfs3 extends RpcP
       postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
       dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr),
           dfsClient, dirFileIdPath, iug);
+      
+      // Add open stream
+      OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr,
+          writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
+      fileHandle = new FileHandle(postOpObjAttr.getFileId());
+      if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) {
+        LOG.warn("Can't add more stream, close it."
+            + " Future write will become append");
+        fos.close();
+        fos = null;
+      } else {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Opened stream for file:" + fileName + ", fileId:"
+              + fileHandle.getFileId());
+        }
+      }
+      
     } catch (IOException e) {
       LOG.error("Exception", e);
       if (fos != null) {
@@ -859,16 +882,6 @@ public class RpcProgramNfs3 extends RpcP
       }
     }
     
-    // Add open stream
-    OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir
-        + "/" + postOpObjAttr.getFileId(), dfsClient, iug);
-    fileHandle = new FileHandle(postOpObjAttr.getFileId());
-    writeManager.addOpenFileStream(fileHandle, openFileCtx);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("open stream for file:" + fileName + ", fileId:"
-          + fileHandle.getFileId());
-    }
-    
     return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr,
         dirWcc);
   }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1539834&r1=1539833&r2=1539834&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
 Thu Nov  7 21:49:13 2013
@@ -18,8 +18,6 @@
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import java.io.IOException;
-import java.util.Iterator;
-import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
@@ -29,11 +27,12 @@ import org.apache.hadoop.fs.CommonConfig
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
 import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS;
+import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
+import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.nfs.NfsFileType;
 import org.apache.hadoop.nfs.nfs3.FileHandle;
 import org.apache.hadoop.nfs.nfs3.IdUserGroup;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
-import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
 import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.nfs.nfs3.request.WRITE3Request;
@@ -56,69 +55,70 @@ public class WriteManager {
 
   private final Configuration config;
   private final IdUserGroup iug;
-  private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps
-      .newConcurrentMap();
-
+ 
   private AsyncDataService asyncDataService;
   private boolean asyncDataServiceStarted = false;
 
-  private final StreamMonitor streamMonitor;
-  
+  private final int maxStreams;
+
   /**
    * The time limit to wait for accumulate reordered sequential writes to the
    * same file before the write is considered done.
    */
   private long streamTimeout;
-  
-  public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 
minutes
-  public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds
-  
-  void addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
-    openFileMap.put(h, ctx);
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("After add the new stream " + h.getFileId()
-          + ", the stream number:" + openFileMap.size());
+
+  private final OpenFileCtxCache fileContextCache;
+
+  static public class MultipleCachedStreamException extends IOException {
+    private static final long serialVersionUID = 1L;
+
+    public MultipleCachedStreamException(String msg) {
+      super(msg);
     }
   }
 
+  boolean addOpenFileStream(FileHandle h, OpenFileCtx ctx) {
+    return fileContextCache.put(h, ctx);
+  }
+  
   WriteManager(IdUserGroup iug, final Configuration config) {
     this.iug = iug;
     this.config = config;
-    
-    streamTimeout = config.getLong("dfs.nfs3.stream.timeout",
-        DEFAULT_STREAM_TIMEOUT);
+    streamTimeout = config.getLong(Nfs3Constant.OUTPUT_STREAM_TIMEOUT,
+        Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT);
     LOG.info("Stream timeout is " + streamTimeout + "ms.");
-    if (streamTimeout < MINIMIUM_STREAM_TIMEOUT) {
+    if (streamTimeout < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) {
       LOG.info("Reset stream timeout to minimum value "
-          + MINIMIUM_STREAM_TIMEOUT + "ms.");
-      streamTimeout = MINIMIUM_STREAM_TIMEOUT;
+          + Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + "ms.");
+      streamTimeout = Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT;
     }
-    
-    this.streamMonitor = new StreamMonitor();
+    maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES,
+        Nfs3Constant.MAX_OPEN_FILES_DEFAULT);
+    LOG.info("Maximum open streams is "+ maxStreams);
+    this.fileContextCache = new OpenFileCtxCache(config, streamTimeout);
   }
 
-  private void startAsyncDataSerivce() {
-    streamMonitor.start();
+  void startAsyncDataSerivce() {
+    if (asyncDataServiceStarted) {
+      return;
+    }
+    fileContextCache.start();
     this.asyncDataService = new AsyncDataService();
     asyncDataServiceStarted = true;
   }
 
-  private void shutdownAsyncDataService() {
-    asyncDataService.shutdown();
+  void shutdownAsyncDataService() {
+    if (!asyncDataServiceStarted) {
+      return;
+    }
     asyncDataServiceStarted = false;
-    streamMonitor.interrupt();
+    asyncDataService.shutdown();
+    fileContextCache.shutdown();
   }
 
   void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel,
       int xid, Nfs3FileAttributes preOpAttr) throws IOException {
-    // First write request starts the async data service
-    if (!asyncDataServiceStarted) {
-      startAsyncDataSerivce();
-    }
-
-    long offset = request.getOffset();
     int count = request.getCount();
-    WriteStableHow stableHow = request.getStableHow();
     byte[] data = request.getData().array();
     if (data.length < count) {
       WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL);
@@ -129,13 +129,12 @@ public class WriteManager {
 
     FileHandle handle = request.getHandle();
     if (LOG.isDebugEnabled()) {
-      LOG.debug("handleWrite fileId: " + handle.getFileId() + " offset: "
-          + offset + " length:" + count + " stableHow:" + 
stableHow.getValue());
+      LOG.debug("handleWrite " + request);
     }
 
     // Check if there is a stream to write
     FileHandle fileHandle = request.getHandle();
-    OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
+    OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
     if (openFileCtx == null) {
       LOG.info("No opened stream for fileId:" + fileHandle.getFileId());
 
@@ -150,6 +149,15 @@ public class WriteManager {
         fos = dfsClient.append(fileIdPath, bufferSize, null, null);
 
         latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug);
+      } catch (RemoteException e) {
+        IOException io = e.unwrapRemoteException();
+        if (io instanceof AlreadyBeingCreatedException) {
+          LOG.warn("Can't append file:" + fileIdPath
+              + ". Possibly the file is being closed. Drop the request:"
+              + request + ", wait for the client to retry...");
+          return;
+        }
+        throw e;
       } catch (IOException e) {
         LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e);
         if (fos != null) {
@@ -170,9 +178,26 @@ public class WriteManager {
           Nfs3Constant.FILE_DUMP_DIR_DEFAULT);
       openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/"
           + fileHandle.getFileId(), dfsClient, iug);
-      addOpenFileStream(fileHandle, openFileCtx);
+
+      if (!addOpenFileStream(fileHandle, openFileCtx)) {
+        LOG.info("Can't add new stream. Close it. Tell client to retry.");
+        try {
+          fos.close();
+        } catch (IOException e) {
+          LOG.error("Can't close stream for fileId:" + handle.getFileId());
+        }
+        // Notify client to retry
+        WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr);
+        WRITE3Response response = new 
WRITE3Response(Nfs3Status.NFS3ERR_JUKEBOX,
+            fileWcc, 0, request.getStableHow(), 
Nfs3Constant.WRITE_COMMIT_VERF);
+        Nfs3Utils.writeChannel(channel,
+            response.writeHeaderAndResponse(new XDR(), xid, new 
VerifierNone()),
+            xid);
+        return;
+      }
+
       if (LOG.isDebugEnabled()) {
-        LOG.debug("opened stream for file:" + fileHandle.getFileId());
+        LOG.debug("Opened stream for appending file:" + 
fileHandle.getFileId());
       }
     }
 
@@ -185,7 +210,7 @@ public class WriteManager {
   void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
       long commitOffset, Channel channel, int xid, Nfs3FileAttributes 
preOpAttr) {
     int status;
-    OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
+    OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
 
     if (openFileCtx == null) {
       LOG.info("No opened stream for fileId:" + fileHandle.getFileId()
@@ -238,7 +263,7 @@ public class WriteManager {
     String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle);
     Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
     if (attr != null) {
-      OpenFileCtx openFileCtx = openFileMap.get(fileHandle);
+      OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
       if (openFileCtx != null) {
         attr.setSize(openFileCtx.getNextOffset());
         attr.setUsed(openFileCtx.getNextOffset());
@@ -253,8 +278,8 @@ public class WriteManager {
     Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug);
 
     if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) {
-      OpenFileCtx openFileCtx = openFileMap
-          .get(new FileHandle(attr.getFileId()));
+      OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr
+          .getFileId()));
 
       if (openFileCtx != null) {
         attr.setSize(openFileCtx.getNextOffset());
@@ -263,56 +288,9 @@ public class WriteManager {
     }
     return attr;
   }
-   
-  @VisibleForTesting
-  ConcurrentMap<FileHandle, OpenFileCtx> getOpenFileMap() {
-    return this.openFileMap;
-  }
-  
-  /**
-   * StreamMonitor wakes up periodically to find and closes idle streams.
-   */
-  class StreamMonitor extends Daemon {
-    private int rotation = 5 * 1000; // 5 seconds
-    private long lastWakeupTime = 0;
-
-    @Override
-    public void run() {
-      while (true) {
-        Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet()
-            .iterator();
-        if (LOG.isTraceEnabled()) {
-          LOG.trace("openFileMap size:" + openFileMap.size());
-        }
-        while (it.hasNext()) {
-          Entry<FileHandle, OpenFileCtx> pairs = it.next();
-          OpenFileCtx ctx = pairs.getValue();
-          if (ctx.streamCleanup((pairs.getKey()).getFileId(), streamTimeout)) {
-            it.remove();
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("After remove stream " + pairs.getKey().getFileId()
-                  + ", the stream number:" + openFileMap.size());
-            }
-          }
-        }
-
-        // Check if it can sleep
-        try {
-          long workedTime = System.currentTimeMillis() - lastWakeupTime;
-          if (workedTime < rotation) {
-            if (LOG.isTraceEnabled()) {
-              LOG.trace("StreamMonitor can still have a sleep:"
-                  + ((rotation - workedTime) / 1000));
-            }
-            Thread.sleep(rotation - workedTime);
-          }
-          lastWakeupTime = System.currentTimeMillis();
 
-        } catch (InterruptedException e) {
-          LOG.info("StreamMonitor got interrupted");
-          return;
-        }
-      }
-    }
+  @VisibleForTesting
+  OpenFileCtxCache getOpenFileCtxCache() {
+    return this.fileContextCache;
   }
 }

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java?rev=1539834&r1=1539833&r2=1539834&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java
 Thu Nov  7 21:49:13 2013
@@ -51,7 +51,7 @@ public class TestMountd {
     Nfs3 nfs3 = new Nfs3(exports, config);
     nfs3.start(false);
 
-    RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountBase()
+    RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd()
         .getRpcProgram();
     mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost"));
     

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java?rev=1539834&r1=1539833&r2=1539834&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java
 Thu Nov  7 21:49:13 2013
@@ -135,6 +135,7 @@ public class TestOutOfOrderWrite {
     @Override
     protected ChannelPipelineFactory setPipelineFactory() {
       this.pipelineFactory = new ChannelPipelineFactory() {
+        @Override
         public ChannelPipeline getPipeline() {
           return Channels.pipeline(
               RpcUtil.constructRpcFrameDecoder(),

Added: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java?rev=1539834&view=auto
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java
 (added)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java
 Thu Nov  7 21:49:13 2013
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.nfs.nfs3;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx;
+import org.apache.hadoop.nfs.nfs3.FileHandle;
+import org.apache.hadoop.nfs.nfs3.IdUserGroup;
+import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
+import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestOpenFileCtxCache {
+  static boolean cleaned = false;
+
+  @Test
+  public void testEviction() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+
+    // Only two entries will be in the cache
+    conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);
+
+    DFSClient dfsClient = Mockito.mock(DFSClient.class);
+    Nfs3FileAttributes attr = new Nfs3FileAttributes();
+    HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
+    Mockito.when(fos.getPos()).thenReturn((long) 0);
+
+    OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
+        dfsClient, new IdUserGroup());
+    OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
+        dfsClient, new IdUserGroup());
+    OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
+        dfsClient, new IdUserGroup());
+    OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
+        dfsClient, new IdUserGroup());
+    OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath",
+        dfsClient, new IdUserGroup());
+
+    OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);
+
+    boolean ret = cache.put(new FileHandle(1), context1);
+    assertTrue(ret);
+    Thread.sleep(1000);
+    ret = cache.put(new FileHandle(2), context2);
+    assertTrue(ret);
+    ret = cache.put(new FileHandle(3), context3);
+    assertFalse(ret);
+    assertTrue(cache.size() == 2);
+
+    // Wait for the oldest stream to be evict-able, insert again
+    Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+    assertTrue(cache.size() == 2);
+
+    ret = cache.put(new FileHandle(3), context3);
+    assertTrue(ret);
+    assertTrue(cache.size() == 2);
+    assertTrue(cache.get(new FileHandle(1)) == null);
+
+    // Test inactive entry is evicted immediately
+    context3.setActiveStatusForTest(false);
+    ret = cache.put(new FileHandle(4), context4);
+    assertTrue(ret);
+
+    // Now the cache has context2 and context4
+    // Test eviction failure if all entries have pending work.
+    context2.getPendingWritesForTest().put(new OffsetRange(0, 100),
+        new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
+    context4.getPendingCommitsForTest().put(new Long(100),
+        new CommitCtx(0, null, 0, attr));
+    Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+    ret = cache.put(new FileHandle(5), context5);
+    assertFalse(ret);
+  }
+
+  @Test
+  public void testScan() throws IOException, InterruptedException {
+    Configuration conf = new Configuration();
+
+    // Only two entries will be in the cache
+    conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2);
+
+    DFSClient dfsClient = Mockito.mock(DFSClient.class);
+    Nfs3FileAttributes attr = new Nfs3FileAttributes();
+    HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
+    Mockito.when(fos.getPos()).thenReturn((long) 0);
+
+    OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath",
+        dfsClient, new IdUserGroup());
+    OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath",
+        dfsClient, new IdUserGroup());
+    OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath",
+        dfsClient, new IdUserGroup());
+    OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath",
+        dfsClient, new IdUserGroup());
+
+    OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100);
+
+    // Test cleaning expired entry
+    boolean ret = cache.put(new FileHandle(1), context1);
+    assertTrue(ret);
+    ret = cache.put(new FileHandle(2), context2);
+    assertTrue(ret);
+    Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + 1);
+    cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT);
+    assertTrue(cache.size() == 0);
+
+    // Test cleaning inactive entry
+    ret = cache.put(new FileHandle(3), context3);
+    assertTrue(ret);
+    ret = cache.put(new FileHandle(4), context4);
+    assertTrue(ret);
+    context3.setActiveStatusForTest(false);
+    cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT);
+    assertTrue(cache.size() == 1);
+    assertTrue(cache.get(new FileHandle(3)) == null);
+    assertTrue(cache.get(new FileHandle(4)) != null);
+  }
+}

Modified: 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1539834&r1=1539833&r2=1539834&view=diff
==============================================================================
--- 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
 (original)
+++ 
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
 Thu Nov  7 21:49:13 2013
@@ -186,9 +186,8 @@ public class TestWrites {
   private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int 
maxWaitTime)
       throws InterruptedException {
     int waitedTime = 0;
-    ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager()
-        .getOpenFileMap();
-    OpenFileCtx ctx = openFileMap.get(handle);
+    OpenFileCtx ctx = nfsd.getWriteManager()
+        .getOpenFileCtxCache().get(handle);
     assertTrue(ctx != null);
     do {
       Thread.sleep(3000);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1539834&r1=1539833&r2=1539834&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov  7 
21:49:13 2013
@@ -596,6 +596,8 @@ Release 2.2.1 - UNRELEASED
 
     HDFS-5252. Stable write is not handled correctly in someplace. (brandonli)
 
+    HDFS-5364. Add OpenFileCtx cache. (brandonli)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES


Reply via email to