Author: brandonli
Date: Wed Nov 27 23:59:23 2013
New Revision: 1546240

URL: http://svn.apache.org/r1546240
Log:
HDFS-5563. Merging change r1546235 from branch-2

Modified:
    
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
    
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
    
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
    
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
    
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt

Modified: 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1546240&r1=1546239&r2=1546240&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
 (original)
+++ 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java
 Wed Nov 27 23:59:23 2013
@@ -710,15 +710,28 @@ class OpenFileCtx {
     }
     return response;
   }
-
+  
+  /**
+   * Check the commit status with the given offset
+   * @param commitOffset the offset to commit
+   * @param channel the channel to return response
+   * @param xid the xid of the commit request
+   * @param preOpAttr the preOp attribute
+   * @param fromRead whether the commit is triggered from read request
+   * @return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
+   * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
+   */
   public COMMIT_STATUS checkCommit(DFSClient dfsClient, long commitOffset,
-      Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
-    // Keep stream active
-    updateLastAccessTime();
+      Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean 
fromRead) {
+    if (!fromRead) {
+      Preconditions.checkState(channel != null && preOpAttr != null);
+      // Keep stream active
+      updateLastAccessTime();
+    }
     Preconditions.checkState(commitOffset >= 0);
 
     COMMIT_STATUS ret = checkCommitInternal(commitOffset, channel, xid,
-        preOpAttr);
+        preOpAttr, fromRead);
     if (LOG.isDebugEnabled()) {
       LOG.debug("Got commit status: " + ret.name());
     }
@@ -745,14 +758,10 @@ class OpenFileCtx {
     }
     return ret;
   }
-
-  /**
-   * return one commit status: COMMIT_FINISHED, COMMIT_WAIT,
-   * COMMIT_INACTIVE_CTX, COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR
-   */
+  
   @VisibleForTesting
   synchronized COMMIT_STATUS checkCommitInternal(long commitOffset,
-      Channel channel, int xid, Nfs3FileAttributes preOpAttr) {
+      Channel channel, int xid, Nfs3FileAttributes preOpAttr, boolean 
fromRead) {
     if (!activeState) {
       if (pendingWrites.isEmpty()) {
         return COMMIT_STATUS.COMMIT_INACTIVE_CTX;
@@ -775,9 +784,11 @@ class OpenFileCtx {
 
     if (commitOffset > 0) {
       if (commitOffset > flushed) {
-        CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
-            preOpAttr);
-        pendingCommits.put(commitOffset, commitCtx);
+        if (!fromRead) {
+          CommitCtx commitCtx = new CommitCtx(commitOffset, channel, xid,
+              preOpAttr);
+          pendingCommits.put(commitOffset, commitCtx);
+        }
         return COMMIT_STATUS.COMMIT_WAIT;
       } else {
         return COMMIT_STATUS.COMMIT_DO_SYNC;
@@ -792,11 +803,13 @@ class OpenFileCtx {
       // do a sync here though the output stream might be closed.
       return COMMIT_STATUS.COMMIT_FINISHED;
     } else {
-      // Insert commit
-      long maxOffset = key.getKey().getMax() - 1;
-      Preconditions.checkState(maxOffset > 0);
-      CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, preOpAttr);
-      pendingCommits.put(maxOffset, commitCtx);
+      if (!fromRead) {
+        // Insert commit
+        long maxOffset = key.getKey().getMax() - 1;
+        Preconditions.checkState(maxOffset > 0);
+        CommitCtx commitCtx = new CommitCtx(maxOffset, channel, xid, 
preOpAttr);
+        pendingCommits.put(maxOffset, commitCtx);
+      }
       return COMMIT_STATUS.COMMIT_WAIT;
     }
   }

Modified: 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1546240&r1=1546239&r2=1546240&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
 (original)
+++ 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java
 Wed Nov 27 23:59:23 2013
@@ -628,6 +628,14 @@ public class RpcProgramNfs3 extends RpcP
       }
     }
     
+    // In case there is buffered data for the same file, flush it. This can be
+    // optimized later by reading from the cache.
+    int ret = writeManager.commitBeforeRead(dfsClient, handle, offset + count);
+    if (ret != Nfs3Status.NFS3_OK) {
+      LOG.warn("commitBeforeRead didn't succeed with ret=" + ret
+          + ". Read may not get most recent data.");
+    }
+
     try {
       int buffSize = Math.min(MAX_READ_TRANSFER_SIZE, count);
       byte[] readbuffer = new byte[buffSize];

Modified: 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1546240&r1=1546239&r2=1546240&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
 (original)
+++ 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java
 Wed Nov 27 23:59:23 2013
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
 import java.io.IOException;
-import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -41,11 +40,9 @@ import org.apache.hadoop.nfs.nfs3.respon
 import org.apache.hadoop.nfs.nfs3.response.WccData;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.VerifierNone;
-import org.apache.hadoop.util.Daemon;
 import org.jboss.netty.channel.Channel;
 
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Maps;
 
 /**
  * Manage the writes and responds asynchronously.
@@ -207,6 +204,51 @@ public class WriteManager {
     return;
   }
 
+  // Do a possible commit before read request in case there is buffered data
+  // inside DFSClient which has been flushed but not synced.
+  int commitBeforeRead(DFSClient dfsClient, FileHandle fileHandle,
+      long commitOffset) {
+    int status;
+    OpenFileCtx openFileCtx = fileContextCache.get(fileHandle);
+
+    if (openFileCtx == null) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("No opened stream for fileId:" + fileHandle.getFileId()
+            + " commitOffset=" + commitOffset
+            + ". Return success in this case.");
+      }
+      status = Nfs3Status.NFS3_OK;
+
+    } else {
+      COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset,
+          null, 0, null, true);
+      switch (ret) {
+      case COMMIT_FINISHED:
+      case COMMIT_INACTIVE_CTX:
+        status = Nfs3Status.NFS3_OK;
+        break;
+      case COMMIT_INACTIVE_WITH_PENDING_WRITE:
+      case COMMIT_ERROR:
+        status = Nfs3Status.NFS3ERR_IO;
+        break;
+      case COMMIT_WAIT:
+        /**
+         * This should happen rarely in some possible cases, such as read
+         * request arrives before DFSClient is able to quickly flush data to 
DN,
+         * or Prerequisite writes is not available. Won't wait since we don't
+         * want to block read.
+         */     
+        status = Nfs3Status.NFS3ERR_JUKEBOX;
+        break;
+      default:
+        LOG.error("Should not get commit return code:" + ret.name());
+        throw new RuntimeException("Should not get commit return code:"
+            + ret.name());
+      }
+    }
+    return status;
+  }
+  
   void handleCommit(DFSClient dfsClient, FileHandle fileHandle,
       long commitOffset, Channel channel, int xid, Nfs3FileAttributes 
preOpAttr) {
     int status;
@@ -219,9 +261,8 @@ public class WriteManager {
       
     } else {
       COMMIT_STATUS ret = openFileCtx.checkCommit(dfsClient, commitOffset,
-          channel, xid, preOpAttr);
+          channel, xid, preOpAttr, false);
       switch (ret) {
-      case COMMIT_DO_SYNC:
       case COMMIT_FINISHED:
       case COMMIT_INACTIVE_CTX:
         status = Nfs3Status.NFS3_OK;
@@ -234,6 +275,7 @@ public class WriteManager {
         // Do nothing. Commit is async now.
         return;
       default:
+        LOG.error("Should not get commit return code:" + ret.name());
         throw new RuntimeException("Should not get commit return code:"
             + ret.name());
       }

Modified: 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1546240&r1=1546239&r2=1546240&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
 (original)
+++ 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java
 Wed Nov 27 23:59:23 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.nfs.nfs3;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -26,8 +27,7 @@ import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentNavigableMap;
 
-import junit.framework.Assert;
-
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSClient;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
@@ -41,6 +41,7 @@ import org.apache.hadoop.nfs.nfs3.IdUser
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant;
 import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow;
 import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes;
+import org.apache.hadoop.nfs.nfs3.Nfs3Status;
 import org.apache.hadoop.nfs.nfs3.request.CREATE3Request;
 import org.apache.hadoop.nfs.nfs3.request.READ3Request;
 import org.apache.hadoop.nfs.nfs3.request.SetAttr3;
@@ -49,6 +50,8 @@ import org.apache.hadoop.nfs.nfs3.respon
 import org.apache.hadoop.nfs.nfs3.response.READ3Response;
 import org.apache.hadoop.oncrpc.XDR;
 import org.apache.hadoop.oncrpc.security.SecurityHandler;
+import org.jboss.netty.channel.Channel;
+import org.junit.Assert;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -140,32 +143,33 @@ public class TestWrites {
 
     // Test inactive open file context
     ctx.setActiveStatusForTest(false);
-    ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
+    Channel ch = Mockito.mock(Channel.class);
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
     Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_CTX);
 
     ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
         new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
-    ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
     Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE);
 
     // Test request with non zero commit offset
     ctx.setActiveStatusForTest(true);
     Mockito.when(fos.getPos()).thenReturn((long) 10);
-    COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr);
+    COMMIT_STATUS status = ctx.checkCommitInternal(5, null, 1, attr, false);
     Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
     // Do_SYNC state will be updated to FINISHED after data sync
-    ret = ctx.checkCommit(dfsClient, 5, null, 1, attr);
+    ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, false);
     Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
     
-    status = ctx.checkCommitInternal(10, null, 1, attr);
+    status = ctx.checkCommitInternal(10, ch, 1, attr, false);
     Assert.assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
-    ret = ctx.checkCommit(dfsClient, 10, null, 1, attr);
+    ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, false);
     Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
 
     ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
         .getPendingCommitsForTest();
     Assert.assertTrue(commits.size() == 0);
-    ret = ctx.checkCommit(dfsClient, 11, null, 1, attr);
+    ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, false);
     Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
     Assert.assertTrue(commits.size() == 1);
     long key = commits.firstKey();
@@ -174,7 +178,7 @@ public class TestWrites {
     // Test request with zero commit offset
     commits.remove(new Long(11));
     // There is one pending write [5,10]
-    ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
     Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_WAIT);
     Assert.assertTrue(commits.size() == 1);
     key = commits.firstKey();
@@ -182,10 +186,79 @@ public class TestWrites {
 
     // Empty pending writes
     ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
-    ret = ctx.checkCommit(dfsClient, 0, null, 1, attr);
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, false);
     Assert.assertTrue(ret == COMMIT_STATUS.COMMIT_FINISHED);
   }
 
+  @Test
+  // Validate all the commit check return codes OpenFileCtx.COMMIT_STATUS, 
which
+  // includes COMMIT_FINISHED, COMMIT_WAIT, COMMIT_INACTIVE_CTX,
+  // COMMIT_INACTIVE_WITH_PENDING_WRITE, COMMIT_ERROR, and COMMIT_DO_SYNC.
+  public void testCheckCommitFromRead() throws IOException {
+    DFSClient dfsClient = Mockito.mock(DFSClient.class);
+    Nfs3FileAttributes attr = new Nfs3FileAttributes();
+    HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class);
+    Mockito.when(fos.getPos()).thenReturn((long) 0);
+
+    OpenFileCtx ctx = new OpenFileCtx(fos, attr, "/dumpFilePath", dfsClient,
+        new IdUserGroup());
+
+    FileHandle h = new FileHandle(1); // fake handle for "/dumpFilePath"
+    COMMIT_STATUS ret;
+    WriteManager wm = new WriteManager(new IdUserGroup(), new Configuration());
+    assertTrue(wm.addOpenFileStream(h, ctx));
+    
+    // Test inactive open file context
+    ctx.setActiveStatusForTest(false);
+    Channel ch = Mockito.mock(Channel.class);
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
+    assertEquals( COMMIT_STATUS.COMMIT_INACTIVE_CTX, ret);
+    assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
+    
+    ctx.getPendingWritesForTest().put(new OffsetRange(5, 10),
+        new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null));
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_INACTIVE_WITH_PENDING_WRITE, ret);
+    assertEquals(Nfs3Status.NFS3ERR_IO, wm.commitBeforeRead(dfsClient, h, 0));
+    
+    // Test request with non zero commit offset
+    ctx.setActiveStatusForTest(true);
+    Mockito.when(fos.getPos()).thenReturn((long) 10);
+    COMMIT_STATUS status = ctx.checkCommitInternal(5, ch, 1, attr, false);
+    assertEquals(COMMIT_STATUS.COMMIT_DO_SYNC, status);
+    // Do_SYNC state will be updated to FINISHED after data sync
+    ret = ctx.checkCommit(dfsClient, 5, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
+    assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 5));
+ 
+    status = ctx.checkCommitInternal(10, ch, 1, attr, true);
+    assertTrue(status == COMMIT_STATUS.COMMIT_DO_SYNC);
+    ret = ctx.checkCommit(dfsClient, 10, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
+    assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 10));
+
+    ConcurrentNavigableMap<Long, CommitCtx> commits = ctx
+        .getPendingCommitsForTest();
+    assertTrue(commits.size() == 0);
+    ret = ctx.checkCommit(dfsClient, 11, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
+    assertEquals(0, commits.size()); // commit triggered by read doesn't wait
+    assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 
11));
+
+    // Test request with zero commit offset
+    // There is one pending write [5,10]
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_WAIT, ret);
+    assertEquals(0, commits.size());
+    assertEquals(Nfs3Status.NFS3ERR_JUKEBOX, wm.commitBeforeRead(dfsClient, h, 
0));
+
+    // Empty pending writes
+    ctx.getPendingWritesForTest().remove(new OffsetRange(5, 10));
+    ret = ctx.checkCommit(dfsClient, 0, ch, 1, attr, true);
+    assertEquals(COMMIT_STATUS.COMMIT_FINISHED, ret);
+    assertEquals(Nfs3Status.NFS3_OK, wm.commitBeforeRead(dfsClient, h, 0));
+  }
+  
   private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int 
maxWaitTime)
       throws InterruptedException {
     int waitedTime = 0;

Modified: 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1546240&r1=1546239&r2=1546240&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
(original)
+++ 
hadoop/common/branches/branch-2.2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt 
Wed Nov 27 23:59:23 2013
@@ -125,6 +125,9 @@ Release 2.2.1 - UNRELEASED
 
     HDFS-5577. NFS user guide update (brandonli)
 
+    HDFS-5563. NFS gateway should commit the buffered data when read request 
comes
+    after write to the same file (brandonli)
+
 Release 2.2.0 - 2013-10-13
 
   INCOMPATIBLE CHANGES


Reply via email to