Author: hairong Date: Fri Dec 3 07:54:55 2010 New Revision: 1041718 URL: http://svn.apache.org/viewvc?rev=1041718&view=rev Log: HDFS-895. Allow hflush/sync to occur in parallel with new writes to the file. Contributed by Todd Lipcon.
Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Modified: hadoop/common/branches/branch-0.20-append/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/CHANGES.txt?rev=1041718&r1=1041717&r2=1041718&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/CHANGES.txt (original) +++ hadoop/common/branches/branch-0.20-append/CHANGES.txt Fri Dec 3 07:54:55 2010 @@ -25,6 +25,9 @@ Release 0.20-append - Unreleased HDFS-1202. DataBlockScanner throws NPE when updated before initialized. (Todd Lipcon via dhruba) + HDFS-895. Allow hflush/sync to occur in parallel with new writes to + the file. (Todd Lipcon via hairong) + IMPROVEMENTS BUG FIXES Modified: hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1041718&r1=1041717&r2=1041718&view=diff ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original) +++ hadoop/common/branches/branch-0.20-append/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Dec 3 07:54:55 2010 @@ -2171,6 +2171,8 @@ public class DFSClient implements FSCons private DataStreamer streamer = new DataStreamer();; private ResponseProcessor response = null; private long currentSeqno = 0; + private long lastQueuedSeqno = -1; + private long lastAckedSeqno = -1; private long bytesCurBlock = 0; // bytes writen in current block private int packetSize = 0; // write packet size, including the header. private int chunksPerPacket = 0; @@ -2180,7 +2182,7 @@ public class DFSClient implements FSCons private volatile int errorIndex = 0; private volatile IOException lastException = null; private long artificialSlowdown = 0; - private long lastFlushOffset = -1; // offset when flush was invoked + private long lastFlushOffset = 0; // offset when flush was invoked private boolean persistBlocks = false; // persist blocks on namenode private int recoveryErrorCount = 0; // number of times block recovery failed private int maxRecoveryErrorCount = 5; // try block recovery 5 times @@ -2563,6 +2565,8 @@ public class DFSClient implements FSCons lastPacketInBlock = one.lastPacketInBlock; synchronized (ackQueue) { + assert ack.getSeqno() == lastAckedSeqno + 1; + lastAckedSeqno = ack.getSeqno(); ackQueue.removeFirst(); ackQueue.notifyAll(); } @@ -3178,11 +3182,9 @@ public class DFSClient implements FSCons if (bytesCurBlock == blockSize) { currentPacket.lastPacketInBlock = true; bytesCurBlock = 0; - lastFlushOffset = -1; + lastFlushOffset = 0; } - dataQueue.addLast(currentPacket); - dataQueue.notifyAll(); - currentPacket = null; + enqueueCurrentPacket(); // If this was the first write after reopening a file, then the above // write filled up any partial chunk. Tell the summer to generate full @@ -3198,58 +3200,102 @@ public class DFSClient implements FSCons //LOG.debug("DFSClient writeChunk done length " + len + // " checksum length " + cklen); } - + + private synchronized void enqueueCurrentPacket() { + synchronized (dataQueue) { + if (currentPacket == null) return; + dataQueue.addLast(currentPacket); + dataQueue.notifyAll(); + lastQueuedSeqno = currentPacket.seqno; + currentPacket = null; + } + } + /** * All data is written out to datanodes. It is not guaranteed * that data has been flushed to persistent store on the * datanode. Block allocations are persisted on namenode. */ - public synchronized void sync() throws IOException { + public void sync() throws IOException { + checkOpen(); + if (closed) { + throw new IOException("DFSOutputStream is closed"); + } try { - /* Record current blockOffset. This might be changed inside - * flushBuffer() where a partial checksum chunk might be flushed. - * After the flush, reset the bytesCurBlock back to its previous value, - * any partial checksum chunk will be sent now and in next packet. - */ - long saveOffset = bytesCurBlock; - - // flush checksum buffer, but keep checksum buffer intact - flushBuffer(true); - - LOG.debug("DFSClient flush() : saveOffset " + saveOffset + - " bytesCurBlock " + bytesCurBlock + - " lastFlushOffset " + lastFlushOffset); - - // Flush only if we haven't already flushed till this offset. - if (lastFlushOffset != bytesCurBlock) { - - // record the valid offset of this flush - lastFlushOffset = bytesCurBlock; + long toWaitFor; + synchronized (this) { + /* Record current blockOffset. This might be changed inside + * flushBuffer() where a partial checksum chunk might be flushed. + * After the flush, reset the bytesCurBlock back to its previous value, + * any partial checksum chunk will be sent now and in next packet. + */ + long saveOffset = bytesCurBlock; + Packet oldCurrentPacket = currentPacket; - // wait for all packets to be sent and acknowledged - flushInternal(); - } else { - // just discard the current packet since it is already been sent. - currentPacket = null; + // flush checksum buffer, but keep checksum buffer intact + flushBuffer(true); + // bytesCurBlock potentially incremented if there was buffered data + + // Flush only if we haven't already flushed till this offset. + if (lastFlushOffset != bytesCurBlock) { + assert bytesCurBlock > lastFlushOffset; + // record the valid offset of this flush + lastFlushOffset = bytesCurBlock; + enqueueCurrentPacket(); + } else { + // just discard the current packet since it is already been sent. + if (oldCurrentPacket == null && currentPacket != null) { + // If we didn't previously have a packet queued, and now we do, + // but we don't plan on sending it, then we should not + // skip a sequence number for it! + currentSeqno--; + } + currentPacket = null; + } + // Restore state of stream. Record the last flush offset + // of the last full chunk that was flushed. + // + bytesCurBlock = saveOffset; + toWaitFor = lastQueuedSeqno; } - - // Restore state of stream. Record the last flush offset - // of the last full chunk that was flushed. - // - bytesCurBlock = saveOffset; + waitForAckedSeqno(toWaitFor); // If any new blocks were allocated since the last flush, // then persist block locations on namenode. // - if (persistBlocks) { - namenode.fsync(src, clientName); + boolean willPersist; + synchronized (this) { + willPersist = persistBlocks && !closed; persistBlocks = false; } + if (willPersist) { + try { + namenode.fsync(src, clientName); + } catch (IOException ioe) { + DFSClient.LOG.warn("Unable to persist blocks in hflush for " + src, ioe); + // If we got an error here, it might be because some other thread called + // close before our hflush completed. In that case, we should throw an + // exception that the stream is closed. + isClosed(); + if (closed) { + throw new IOException("DFSOutputStream is closed"); + } + + // If we aren't closed but failed to sync, we should expose that to the + // caller. + throw ioe; + } + } } catch (IOException e) { - lastException = new IOException("IOException flush:" + e); - closed = true; - closeThreads(); - throw e; + LOG.warn("Error while syncing", e); + synchronized (this) { + if (!closed) { + lastException = new IOException("IOException flush:" + e); + closed = true; + closeThreads(); + } + } + throw e; } } @@ -3275,57 +3321,34 @@ public class DFSClient implements FSCons * Waits till all existing data is flushed and confirmations * received from datanodes. */ - private synchronized void flushInternal() throws IOException { - checkOpen(); + private void flushInternal() throws IOException { isClosed(); + checkOpen(); - while (!closed) { - synchronized (dataQueue) { - isClosed(); - // - // If there is data in the current buffer, send it across - // - if (currentPacket != null) { - dataQueue.addLast(currentPacket); - dataQueue.notifyAll(); - currentPacket = null; - } - - // wait for all buffers to be flushed to datanodes - if (!closed && dataQueue.size() != 0) { - try { - dataQueue.wait(); - } catch (InterruptedException e) { - } - continue; - } - } + long toWaitFor; + synchronized (this) { + enqueueCurrentPacket(); + toWaitFor = lastQueuedSeqno; + } - // wait for all acks to be received back from datanodes - synchronized (ackQueue) { - if (!closed && ackQueue.size() != 0) { - try { - ackQueue.wait(); - } catch (InterruptedException e) { - } - continue; - } - } + waitForAckedSeqno(toWaitFor); + } - // acquire both the locks and verify that we are - // *really done*. In the case of error recovery, - // packets might move back from ackQueue to dataQueue. - // - synchronized (dataQueue) { - synchronized (ackQueue) { - if (dataQueue.size() + ackQueue.size() == 0) { - break; // we are done - } + private void waitForAckedSeqno(long seqnumToWaitFor) throws IOException { + synchronized (ackQueue) { + while (!closed) { + isClosed(); + if (lastAckedSeqno >= seqnumToWaitFor) { + break; } + try { + ackQueue.wait(); + } catch (InterruptedException ie) {} } } + isClosed(); } - + /** * Closes this output stream and releases any system * resources associated with this stream. Added: hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java?rev=1041718&view=auto ============================================================================== --- hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java (added) +++ hadoop/common/branches/branch-0.20-append/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java Fri Dec 3 07:54:55 2010 @@ -0,0 +1,248 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import org.junit.Test; + +import java.io.*; +import java.util.ArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicReference; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; +import org.apache.hadoop.hdfs.server.namenode.LeaseManager; +import org.apache.hadoop.hdfs.server.namenode.NameNode; +import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; +import org.apache.log4j.Level; + +/** + * This class tests the building blocks that are needed to + * support HDFS appends. + */ +public class TestMultiThreadedSync { + static final int blockSize = 1024*1024; + static final int numBlocks = 10; + static final int fileSize = numBlocks * blockSize + 1; + + private static final int NUM_THREADS = 10; + private static final int WRITE_SIZE = 517; + private static final int NUM_WRITES_PER_THREAD = 1000; + + private byte[] toWrite = null; + + { + ((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL); + ((Log4JLogger)LeaseManager.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)FSNamesystem.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)DataNode.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)DFSClient.LOG).getLogger().setLevel(Level.ALL); + ((Log4JLogger)InterDatanodeProtocol.LOG).getLogger().setLevel(Level.ALL); + } + + /* + * creates a file but does not close it + */ + private FSDataOutputStream createFile(FileSystem fileSys, Path name, int repl) + throws IOException { + FSDataOutputStream stm = fileSys.create(name, true, + fileSys.getConf().getInt("io.file.buffer.size", 4096), + (short)repl, (long)blockSize); + return stm; + } + + private void initBuffer(int size) { + long seed = AppendTestUtil.nextLong(); + toWrite = AppendTestUtil.randomBytes(seed, size); + } + + private class WriterThread extends Thread { + private final FSDataOutputStream stm; + private final AtomicReference<Throwable> thrown; + private final int numWrites; + private final CountDownLatch countdown; + + public WriterThread(FSDataOutputStream stm, + AtomicReference<Throwable> thrown, + CountDownLatch countdown, int numWrites) { + this.stm = stm; + this.thrown = thrown; + this.numWrites = numWrites; + this.countdown = countdown; + } + + public void run() { + try { + countdown.await(); + for (int i = 0; i < numWrites && thrown.get() == null; i++) { + doAWrite(); + } + } catch (Throwable t) { + thrown.compareAndSet(null, t); + } + } + + private void doAWrite() throws IOException { + stm.write(toWrite); + stm.sync(); + } + } + + + @Test + public void testMultipleSyncers() throws Exception { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); + FileSystem fs = cluster.getFileSystem(); + Path p = new Path("/multiple-syncers.dat"); + try { + doMultithreadedWrites(conf, p, NUM_THREADS, WRITE_SIZE, NUM_WRITES_PER_THREAD); + } finally { + fs.close(); + cluster.shutdown(); + } + } + + /** + * Test case where a bunch of threads are continuously calling sync() while another + * thread appends some data and then closes the file. + * + * The syncing threads should eventually catch an IOException stating that the stream + * was closed -- and not an NPE or anything like that. + */ + @Test + public void testSyncWhileClosing() throws Throwable { + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster(conf, 1, true, null); + FileSystem fs = cluster.getFileSystem(); + Path p = new Path("/sync-and-close.dat"); + + final FSDataOutputStream stm = createFile(fs, p, 1); + + + ArrayList<Thread> flushers = new ArrayList<Thread>(); + final AtomicReference<Throwable> thrown = new AtomicReference<Throwable>(); + try { + for (int i = 0; i < 10; i++) { + Thread flusher = new Thread() { + public void run() { + try { + while (true) { + try { + stm.sync(); + } catch (IOException ioe) { + if (!ioe.toString().contains("DFSOutputStream is closed")) { + throw ioe; + } else { + return; + } + } + } + } catch (Throwable t) { + thrown.set(t); + } + } + }; + flusher.start(); + flushers.add(flusher); + } + + // Write some data + for (int i = 0; i < 10000; i++) { + stm.write(1); + } + + // Close it while the flushing threads are still flushing + stm.close(); + + // Wait for the flushers to all die. + for (Thread t : flushers) { + t.join(); + } + + // They should have all gotten the expected exception, not anything + // else. + if (thrown.get() != null) { + throw thrown.get(); + } + + } finally { + fs.close(); + cluster.shutdown(); + } + } + + public void doMultithreadedWrites( + Configuration conf, Path p, int numThreads, int bufferSize, int numWrites) + throws Exception { + initBuffer(bufferSize); + + // create a new file. + FileSystem fs = p.getFileSystem(conf); + FSDataOutputStream stm = createFile(fs, p, 1); + System.out.println("Created file simpleFlush.dat"); + + // TODO move this bit to another test case + // There have been a couple issues with flushing empty buffers, so do + // some empty flushes first. + stm.sync(); + stm.sync(); + stm.write(1); + stm.sync(); + stm.sync(); + + CountDownLatch countdown = new CountDownLatch(1); + ArrayList<Thread> threads = new ArrayList<Thread>(); + AtomicReference<Throwable> thrown = new AtomicReference<Throwable>(); + for (int i = 0; i < numThreads; i++) { + Thread t = new WriterThread(stm, thrown, countdown, numWrites); + threads.add(t); + t.start(); + } + + // Start all the threads at the same time for maximum raciness! + countdown.countDown(); + + for (Thread t : threads) { + t.join(); + } + if (thrown.get() != null) { + + throw new RuntimeException("Deferred", thrown.get()); + } + stm.close(); + System.out.println("Closed file."); + } + + public static void main(String args[]) throws Exception { + TestMultiThreadedSync test = new TestMultiThreadedSync(); + Configuration conf = new Configuration(); + Path p = new Path("/user/todd/test.dat"); + long st = System.nanoTime(); + test.doMultithreadedWrites(conf, p, 10, 511, 50000); + long et = System.nanoTime(); + + System.out.println("Finished in " + ((et - st) / 1000000) + "ms"); + } + +}