Author: hairong
Date: Fri Dec  3 07:54:55 2010
New Revision: 1041718

URL: http://svn.apache.org/viewvc?rev=1041718&view=rev
Log:
HDFS-895. Allow hflush/sync to occur in parallel with new writes to the file. 
Contributed by Todd Lipcon.

Added:
    
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
Modified:
    hadoop/common/branches/branch-0.20-append/CHANGES.txt
    
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java

Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=1041718&r1=1041717&r2=1041718&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Fri Dec  3 07:54:55 
2010
@@ -25,6 +25,9 @@ Release 0.20-append - Unreleased
     HDFS-1202.  DataBlockScanner throws NPE when updated before 
     initialized. (Todd Lipcon via dhruba)
 
+    HDFS-895. Allow hflush/sync to occur in parallel with new writes to
+    the file. (Todd Lipcon via hairong)
+
   IMPROVEMENTS
 
   BUG FIXES

Modified: 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1041718&r1=1041717&r2=1041718&view=diff
==============================================================================
--- 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
 (original)
+++ 
hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
 Fri Dec  3 07:54:55 2010
@@ -2171,6 +2171,8 @@ public class DFSClient implements FSCons
     private DataStreamer streamer = new DataStreamer();;
     private ResponseProcessor response = null;
     private long currentSeqno = 0;
+    private long lastQueuedSeqno = -1;
+    private long lastAckedSeqno = -1;
     private long bytesCurBlock = 0; // bytes writen in current block
     private int packetSize = 0; // write packet size, including the header.
     private int chunksPerPacket = 0;
@@ -2180,7 +2182,7 @@ public class DFSClient implements FSCons
     private volatile int errorIndex = 0;
     private volatile IOException lastException = null;
     private long artificialSlowdown = 0;
-    private long lastFlushOffset = -1; // offset when flush was invoked
+    private long lastFlushOffset = 0; // offset when flush was invoked
     private boolean persistBlocks = false; // persist blocks on namenode
     private int recoveryErrorCount = 0; // number of times block recovery 
failed
     private int maxRecoveryErrorCount = 5; // try block recovery 5 times
@@ -2563,6 +2565,8 @@ public class DFSClient implements FSCons
             lastPacketInBlock = one.lastPacketInBlock;
 
             synchronized (ackQueue) {
+              assert ack.getSeqno() == lastAckedSeqno + 1;
+              lastAckedSeqno = ack.getSeqno();
               ackQueue.removeFirst();
               ackQueue.notifyAll();
             }
@@ -3178,11 +3182,9 @@ public class DFSClient implements FSCons
           if (bytesCurBlock == blockSize) {
             currentPacket.lastPacketInBlock = true;
             bytesCurBlock = 0;
-            lastFlushOffset = -1;
+            lastFlushOffset = 0;
           }
-          dataQueue.addLast(currentPacket);
-          dataQueue.notifyAll();
-          currentPacket = null;
+          enqueueCurrentPacket();
  
           // If this was the first write after reopening a file, then the above
           // write filled up any partial chunk. Tell the summer to generate 
full 
@@ -3198,58 +3200,102 @@ public class DFSClient implements FSCons
       //LOG.debug("DFSClient writeChunk done length " + len +
       //          " checksum length " + cklen);
     }
-  
+
+    private synchronized void enqueueCurrentPacket() {
+      synchronized (dataQueue) {
+        if (currentPacket == null) return;
+        dataQueue.addLast(currentPacket);
+        dataQueue.notifyAll();
+        lastQueuedSeqno = currentPacket.seqno;
+        currentPacket = null;
+      }
+    }
+
     /**
      * All data is written out to datanodes. It is not guaranteed 
      * that data has been flushed to persistent store on the 
      * datanode. Block allocations are persisted on namenode.
      */
-    public synchronized void sync() throws IOException {
+    public void sync() throws IOException {
+      checkOpen();
+      if (closed) {
+        throw new IOException("DFSOutputStream is closed");
+      }
       try {
-        /* Record current blockOffset. This might be changed inside
-         * flushBuffer() where a partial checksum chunk might be flushed.
-         * After the flush, reset the bytesCurBlock back to its previous value,
-         * any partial checksum chunk will be sent now and in next packet.
-         */
-        long saveOffset = bytesCurBlock;
-
-        // flush checksum buffer, but keep checksum buffer intact
-        flushBuffer(true);
-
-        LOG.debug("DFSClient flush() : saveOffset " + saveOffset +  
-                  " bytesCurBlock " + bytesCurBlock +
-                  " lastFlushOffset " + lastFlushOffset);
-        
-        // Flush only if we haven't already flushed till this offset.
-        if (lastFlushOffset != bytesCurBlock) {
-
-          // record the valid offset of this flush
-          lastFlushOffset = bytesCurBlock;
+        long toWaitFor;
+        synchronized (this) {
+          /* Record current blockOffset. This might be changed inside
+           * flushBuffer() where a partial checksum chunk might be flushed.
+           * After the flush, reset the bytesCurBlock back to its previous 
value,
+           * any partial checksum chunk will be sent now and in next packet.
+           */
+          long saveOffset = bytesCurBlock;
+          Packet oldCurrentPacket = currentPacket;
 
-          // wait for all packets to be sent and acknowledged
-          flushInternal();
-        } else {
-          // just discard the current packet since it is already been sent.
-          currentPacket = null;
+          // flush checksum buffer, but keep checksum buffer intact
+          flushBuffer(true);
+          // bytesCurBlock potentially incremented if there was buffered data
+
+          // Flush only if we haven't already flushed till this offset.
+          if (lastFlushOffset != bytesCurBlock) {
+            assert bytesCurBlock > lastFlushOffset;
+            // record the valid offset of this flush
+            lastFlushOffset = bytesCurBlock;
+            enqueueCurrentPacket();
+          } else {
+            // just discard the current packet since it is already been sent.
+            if (oldCurrentPacket == null && currentPacket != null) {
+              // If we didn't previously have a packet queued, and now we do,
+              // but we don't plan on sending it, then we should not
+              // skip a sequence number for it!
+              currentSeqno--;
+            }
+            currentPacket = null;
+          }
+          // Restore state of stream. Record the last flush offset 
+          // of the last full chunk that was flushed.
+          //
+          bytesCurBlock = saveOffset;
+          toWaitFor = lastQueuedSeqno;
         }
-        
-        // Restore state of stream. Record the last flush offset 
-        // of the last full chunk that was flushed.
-        //
-        bytesCurBlock = saveOffset;
+        waitForAckedSeqno(toWaitFor);
 
         // If any new blocks were allocated since the last flush, 
         // then persist block locations on namenode. 
         //
-        if (persistBlocks) {
-          namenode.fsync(src, clientName);
+        boolean willPersist;
+        synchronized (this) {
+          willPersist = persistBlocks && !closed;
           persistBlocks = false;
         }
+        if (willPersist) {
+          try {
+            namenode.fsync(src, clientName);
+          } catch (IOException ioe) {
+            DFSClient.LOG.warn("Unable to persist blocks in hflush for " + 
src, ioe);
+            // If we got an error here, it might be because some other thread 
called
+            // close before our hflush completed. In that case, we should 
throw an
+            // exception that the stream is closed.
+            isClosed();
+            if (closed) {
+              throw new IOException("DFSOutputStream is closed");
+            }
+
+            // If we aren't closed but failed to sync, we should expose that 
to the
+            // caller.
+            throw ioe;
+          }
+        }
       } catch (IOException e) {
-          lastException = new IOException("IOException flush:" + e);
-          closed = true;
-          closeThreads();
-          throw e;
+        LOG.warn("Error while syncing", e);
+        synchronized (this) {
+          if (!closed) {
+            lastException = new IOException("IOException flush:" + e);
+            closed = true;
+            closeThreads();
+          }
+        }
+        throw e;
       }
     }
 
@@ -3275,57 +3321,34 @@ public class DFSClient implements FSCons
      * Waits till all existing data is flushed and confirmations 
      * received from datanodes. 
      */
-    private synchronized void flushInternal() throws IOException {
-      checkOpen();
+    private void flushInternal() throws IOException {
       isClosed();
+      checkOpen();
 
-      while (!closed) {
-        synchronized (dataQueue) {
-          isClosed();
-          //
-          // If there is data in the current buffer, send it across
-          //
-          if (currentPacket != null) {
-            dataQueue.addLast(currentPacket);
-            dataQueue.notifyAll();
-            currentPacket = null;
-          }
-
-          // wait for all buffers to be flushed to datanodes
-          if (!closed && dataQueue.size() != 0) {
-            try {
-              dataQueue.wait();
-            } catch (InterruptedException e) {
-            }
-            continue;
-          }
-        }
+      long toWaitFor;
+      synchronized (this) {
+        enqueueCurrentPacket();
+        toWaitFor = lastQueuedSeqno;
+      }
 
-        // wait for all acks to be received back from datanodes
-        synchronized (ackQueue) {
-          if (!closed && ackQueue.size() != 0) {
-            try {
-              ackQueue.wait();
-            } catch (InterruptedException e) {
-            }
-            continue;
-          }
-        }
+      waitForAckedSeqno(toWaitFor);
+    }
 
-        // acquire both the locks and verify that we are
-        // *really done*. In the case of error recovery, 
-        // packets might move back from ackQueue to dataQueue.
-        //
-        synchronized (dataQueue) {
-          synchronized (ackQueue) {
-            if (dataQueue.size() + ackQueue.size() == 0) {
-              break;       // we are done
-            }
+    private void waitForAckedSeqno(long seqnumToWaitFor) throws IOException {
+      synchronized (ackQueue) {
+        while (!closed) {
+          isClosed();
+          if (lastAckedSeqno >= seqnumToWaitFor) {
+            break;
           }
+          try {
+            ackQueue.wait();
+          } catch (InterruptedException ie) {}
         }
       }
+      isClosed();
     }
-  
+ 
     /**
      * Closes this output stream and releases any system 
      * resources associated with this stream.

Added: 
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
URL: 
http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java?rev=1041718&view=auto
==============================================================================
--- 
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
 (added)
+++ 
hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
 Fri Dec  3 07:54:55 2010
@@ -0,0 +1,248 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.junit.Test;
+
+import java.io.*;
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.apache.log4j.Level;
+
+/**
+ * This class tests the building blocks that are needed to
+ * support HDFS appends.
+ */
+public class TestMultiThreadedSync {
+  static final int blockSize = 1024*1024;
+  static final int numBlocks = 10;
+  static final int fileSize = numBlocks * blockSize + 1;
+
+  private static final int NUM_THREADS = 10;
+  private static final int WRITE_SIZE = 517;
+  private static final int NUM_WRITES_PER_THREAD = 1000;
+  
+  private byte[] toWrite = null;
+
+  {
+    ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL);
+    ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL);
+  }
+
+  /*
+   * creates a file but does not close it
+   */ 
+  private FSDataOutputStream createFile(FileSystem fileSys, Path name, int 
repl)
+    throws IOException {
+    FSDataOutputStream stm = fileSys.create(name, true,
+                                            
fileSys.getConf().getInt("io.file.buffer.size", 4096),
+                                            (short)repl, (long)blockSize);
+    return stm;
+  }
+  
+  private void initBuffer(int size) {
+    long seed = AppendTestUtil.nextLong();
+    toWrite = AppendTestUtil.randomBytes(seed, size);
+  }
+
+  private class WriterThread extends Thread {
+    private final FSDataOutputStream stm;
+    private final AtomicReference<Throwable> thrown;
+    private final int numWrites;
+    private final CountDownLatch countdown;
+
+    public WriterThread(FSDataOutputStream stm,
+      AtomicReference<Throwable> thrown,
+      CountDownLatch countdown, int numWrites) {
+      this.stm = stm;
+      this.thrown = thrown;
+      this.numWrites = numWrites;
+      this.countdown = countdown;
+    }
+
+    public void run() {
+      try {
+        countdown.await();
+        for (int i = 0; i < numWrites && thrown.get() == null; i++) {
+          doAWrite();
+        }
+      } catch (Throwable t) {
+        thrown.compareAndSet(null, t);
+      }
+    }
+
+    private void doAWrite() throws IOException {
+      stm.write(toWrite);
+      stm.sync();
+    }
+  }
+
+
+  @Test
+  public void testMultipleSyncers() throws Exception {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    Path p = new Path("/multiple-syncers.dat");
+    try {
+      doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, 
NUM_WRITES_PER_THREAD);
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  /**
+   * Test case where a bunch of threads are continuously calling sync() while 
another
+   * thread appends some data and then closes the file.
+   *
+   * The syncing threads should eventually catch an IOException stating that 
the stream
+   * was closed -- and not an NPE or anything like that.
+   */
+  @Test
+  public void testSyncWhileClosing() throws Throwable {
+    Configuration conf = new Configuration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    Path p = new Path("/sync-and-close.dat");
+
+    final FSDataOutputStream stm = createFile(fs, p, 1);
+
+
+    ArrayList<Thread> flushers = new ArrayList<Thread>();
+    final AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+    try {
+      for (int i = 0; i < 10; i++) {
+        Thread flusher = new Thread() {
+            public void run() {
+              try {
+                while (true) {
+                  try {
+                    stm.sync();
+                  } catch (IOException ioe) {
+                    if (!ioe.toString().contains("DFSOutputStream is closed")) 
{
+                      throw ioe;
+                    } else {
+                      return;
+                    }
+                  }
+                }
+              } catch (Throwable t) {
+                thrown.set(t);
+              }
+            }
+          };
+        flusher.start();
+        flushers.add(flusher);
+      }
+
+      // Write some data
+      for (int i = 0; i < 10000; i++) {
+        stm.write(1);
+      }
+
+      // Close it while the flushing threads are still flushing
+      stm.close();
+
+      // Wait for the flushers to all die.
+      for (Thread t : flushers) {
+        t.join();
+      }
+
+      // They should have all gotten the expected exception, not anything
+      // else.
+      if (thrown.get() != null) {
+        throw thrown.get();
+      }
+
+    } finally {
+      fs.close();
+      cluster.shutdown();
+    }
+  }
+
+  public void doMultithreadedWrites(
+    Configuration conf, Path p, int numThreads, int bufferSize, int numWrites)
+    throws Exception {
+    initBuffer(bufferSize);
+
+    // create a new file.
+    FileSystem fs = p.getFileSystem(conf);
+    FSDataOutputStream stm = createFile(fs, p, 1);
+    System.out.println("Created file simpleFlush.dat");
+
+    // TODO move this bit to another test case
+    // There have been a couple issues with flushing empty buffers, so do
+    // some empty flushes first.
+    stm.sync();
+    stm.sync();
+    stm.write(1);
+    stm.sync();
+    stm.sync();
+
+    CountDownLatch countdown = new CountDownLatch(1);
+    ArrayList<Thread> threads = new ArrayList<Thread>();
+    AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+    for (int i = 0; i < numThreads; i++) {
+      Thread t = new WriterThread(stm, thrown, countdown, numWrites);
+      threads.add(t);
+      t.start();
+    }
+
+    // Start all the threads at the same time for maximum raciness!
+    countdown.countDown();
+    
+    for (Thread t : threads) {
+      t.join();
+    }
+    if (thrown.get() != null) {
+      
+      throw new RuntimeException("Deferred", thrown.get());
+    }
+    stm.close();
+    System.out.println("Closed file.");
+  }
+
+  public static void main(String args[]) throws Exception {
+    TestMultiThreadedSync test = new TestMultiThreadedSync();
+    Configuration conf = new Configuration();
+    Path p = new Path("/user/todd/test.dat");
+    long st = System.nanoTime();
+    test.doMultithreadedWrites(conf, p, 10, 511, 50000);
+    long et = System.nanoTime();
+
+    System.out.println("Finished in " + ((et - st) / 1000000) + "ms");
+  }
+
+}


Reply via email to