Author: brandonli Date: Thu Nov 7 21:49:13 2013 New Revision: 1539834 URL: http://svn.apache.org/r1539834 Log: HDFS-5364. Add OpenFileCtx cache. Contributed by Brandon Li
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java?rev=1539834&r1=1539833&r2=1539834&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/Nfs3.java Thu Nov 7 21:49:13 2013 @@ -23,33 +23,47 @@ import java.util.List; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.nfs.mount.Mountd; +import org.apache.hadoop.mount.MountdBase; import org.apache.hadoop.nfs.nfs3.Nfs3Base; import org.apache.hadoop.util.StringUtils; +import com.google.common.annotations.VisibleForTesting; + /** * Nfs server. Supports NFS v3 using {@link RpcProgramNfs3}. * Currently Mountd program is also started inside this class. * Only TCP server is supported and UDP is not supported. */ public class Nfs3 extends Nfs3Base { + private Mountd mountd; + static { Configuration.addDefaultResource("hdfs-default.xml"); Configuration.addDefaultResource("hdfs-site.xml"); } public Nfs3(List<String> exports) throws IOException { - super(new Mountd(exports), new RpcProgramNfs3()); + super(new RpcProgramNfs3()); + mountd = new Mountd(exports); } + @VisibleForTesting public Nfs3(List<String> exports, Configuration config) throws IOException { - super(new Mountd(exports, config), new RpcProgramNfs3(config), config); + super(new RpcProgramNfs3(config), config); + mountd = new Mountd(exports, config); } + public Mountd getMountd() { + return mountd; + } + public static void main(String[] args) throws IOException { StringUtils.startupShutdownMessage(Nfs3.class, args, LOG); List<String> exports = new ArrayList<String>(); exports.add("/"); + final Nfs3 nfsServer = new Nfs3(exports); + nfsServer.mountd.start(true); // Start mountd nfsServer.start(true); } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java?rev=1539834&r1=1539833&r2=1539834&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java Thu Nov 7 21:49:13 2013 @@ -24,7 +24,6 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.nio.ByteBuffer; import java.nio.channels.ClosedChannelException; -import java.security.InvalidParameterException; import java.util.EnumSet; import java.util.Iterator; import java.util.Map.Entry; @@ -96,7 +95,7 @@ class OpenFileCtx { // It's updated after each sync to HDFS private Nfs3FileAttributes latestAttr; - + private final ConcurrentNavigableMap<OffsetRange, WriteCtx> pendingWrites; private final ConcurrentNavigableMap<Long, CommitCtx> pendingCommits; @@ -165,10 +164,22 @@ class OpenFileCtx { return System.currentTimeMillis() - lastAccessTime > streamTimeout; } + long getLastAccessTime() { + return lastAccessTime; + } + public long getNextOffset() { return nextOffset.get(); } + boolean getActiveState() { + return this.activeState; + } + + boolean hasPendingWork() { + return (pendingWrites.size() != 0 || pendingCommits.size() != 0); + } + // Increase or decrease the memory occupation of non-sequential writes private long updateNonSequentialWriteInMemory(long count) { long newValue = nonSequentialWriteInMemory.addAndGet(count); @@ -792,19 +803,18 @@ class OpenFileCtx { * @return true, remove stream; false, keep stream */ public synchronized boolean streamCleanup(long fileId, long streamTimeout) { - if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) { - throw new InvalidParameterException("StreamTimeout" + streamTimeout - + "ms is less than MINIMIUM_STREAM_TIMEOUT " - + WriteManager.MINIMIUM_STREAM_TIMEOUT + "ms"); + Preconditions + .checkState(streamTimeout >= Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); + if (!activeState) { + return true; } boolean flag = false; // Check the stream timeout if (checkStreamTimeout(streamTimeout)) { if (LOG.isDebugEnabled()) { - LOG.debug("closing stream for fileId:" + fileId); + LOG.debug("stream can be closed for fileId:" + fileId); } - cleanup(); flag = true; } return flag; @@ -975,7 +985,7 @@ class OpenFileCtx { FileHandle handle = writeCtx.getHandle(); if (LOG.isDebugEnabled()) { LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " - + offset + " length:" + count + " stableHow:" + stableHow.getValue()); + + offset + " length:" + count + " stableHow:" + stableHow.name()); } try { @@ -1056,7 +1066,7 @@ class OpenFileCtx { } } - private synchronized void cleanup() { + synchronized void cleanup() { if (!activeState) { LOG.info("Current OpenFileCtx is already inactive, no need to cleanup."); return; @@ -1064,7 +1074,7 @@ class OpenFileCtx { activeState = false; // stop the dump thread - if (dumpThread != null) { + if (dumpThread != null && dumpThread.isAlive()) { dumpThread.interrupt(); try { dumpThread.join(3000); @@ -1146,4 +1156,10 @@ class OpenFileCtx { void setActiveStatusForTest(boolean activeState) { this.activeState = activeState; } + + @Override + public String toString() { + return String.format("activeState: %b asyncStatus: %b nextOffset: %d", + activeState, asyncStatus, nextOffset.get()); + } } \ No newline at end of file Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java?rev=1539834&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtxCache.java Thu Nov 7 21:49:13 2013 @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.nfs.nfs3; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.nfs.nfs3.FileHandle; +import org.apache.hadoop.nfs.nfs3.Nfs3Constant; +import org.apache.hadoop.util.Daemon; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import com.google.common.collect.Maps; + +/** + * A cache saves OpenFileCtx objects for different users. Each cache entry is + * used to maintain the writing context for a single file. + */ +class OpenFileCtxCache { + private static final Log LOG = LogFactory.getLog(OpenFileCtxCache.class); + // Insert and delete with openFileMap are synced + private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps + .newConcurrentMap(); + + private final int maxStreams; + private final long streamTimeout; + private final StreamMonitor streamMonitor; + + OpenFileCtxCache(Configuration config, long streamTimeout) { + maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES, + Nfs3Constant.MAX_OPEN_FILES_DEFAULT); + LOG.info("Maximum open streams is " + maxStreams); + this.streamTimeout = streamTimeout; + streamMonitor = new StreamMonitor(); + } + + /** + * The entry to be evicted is based on the following rules:<br> + * 1. if the OpenFileCtx has any pending task, it will not be chosen.<br> + * 2. if there is inactive OpenFileCtx, the first found one is to evict. <br> + * 3. For OpenFileCtx entries don't belong to group 1 or 2, the idlest one + * is select. If it's idle longer than OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT, it + * will be evicted. Otherwise, the whole eviction request is failed. + */ + @VisibleForTesting + Entry<FileHandle, OpenFileCtx> getEntryToEvict() { + Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet() + .iterator(); + if (LOG.isTraceEnabled()) { + LOG.trace("openFileMap size:" + openFileMap.size()); + } + + Entry<FileHandle, OpenFileCtx> idlest = null; + + while (it.hasNext()) { + Entry<FileHandle, OpenFileCtx> pairs = it.next(); + OpenFileCtx ctx = pairs.getValue(); + if (!ctx.getActiveState()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Got one inactive stream: " + ctx); + } + return pairs; + } + if (ctx.hasPendingWork()) { + // Always skip files with pending work. + continue; + } + if (idlest == null) { + idlest = pairs; + } else { + if (ctx.getLastAccessTime() < idlest.getValue().getLastAccessTime()) { + idlest = pairs; + } + } + } + + if (idlest == null) { + LOG.warn("No eviction candidate. All streams have pending work."); + return null; + } else { + long idleTime = System.currentTimeMillis() + - idlest.getValue().getLastAccessTime(); + if (idleTime < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) { + if (LOG.isDebugEnabled()) { + LOG.debug("idlest stream's idle time:" + idleTime); + } + LOG.warn("All opened streams are busy, can't remove any from cache."); + return null; + } else { + return idlest; + } + } + } + + boolean put(FileHandle h, OpenFileCtx context) { + OpenFileCtx toEvict = null; + synchronized (this) { + Preconditions.checkState(openFileMap.size() <= this.maxStreams, + "stream cache size " + openFileMap.size() + + " is larger than maximum" + this.maxStreams); + if (openFileMap.size() == this.maxStreams) { + Entry<FileHandle, OpenFileCtx> pairs = getEntryToEvict(); + if (pairs ==null) { + return false; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Evict stream ctx: " + pairs.getValue()); + } + toEvict = openFileMap.remove(pairs.getKey()); + Preconditions.checkState(toEvict == pairs.getValue(), + "The deleted entry is not the same as odlest found."); + } + } + openFileMap.put(h, context); + } + + // Cleanup the old stream outside the lock + if (toEvict != null) { + toEvict.cleanup(); + } + return true; + } + + @VisibleForTesting + void scan(long streamTimeout) { + ArrayList<OpenFileCtx> ctxToRemove = new ArrayList<OpenFileCtx>(); + Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet() + .iterator(); + if (LOG.isTraceEnabled()) { + LOG.trace("openFileMap size:" + openFileMap.size()); + } + + while (it.hasNext()) { + Entry<FileHandle, OpenFileCtx> pairs = it.next(); + FileHandle handle = pairs.getKey(); + OpenFileCtx ctx = pairs.getValue(); + if (!ctx.streamCleanup(handle.getFileId(), streamTimeout)) { + continue; + } + + // Check it again inside lock before removing + synchronized (this) { + OpenFileCtx ctx2 = openFileMap.get(handle); + if (ctx2 != null) { + if (ctx2.streamCleanup(handle.getFileId(), streamTimeout)) { + openFileMap.remove(handle); + if (LOG.isDebugEnabled()) { + LOG.debug("After remove stream " + handle.getFileId() + + ", the stream number:" + openFileMap.size()); + } + ctxToRemove.add(ctx2); + } + } + } + } + + // Invoke the cleanup outside the lock + for (OpenFileCtx ofc : ctxToRemove) { + ofc.cleanup(); + } + } + + OpenFileCtx get(FileHandle key) { + return openFileMap.get(key); + } + + int size() { + return openFileMap.size(); + } + + void start() { + streamMonitor.start(); + } + + // Evict all entries + void cleanAll() { + ArrayList<OpenFileCtx> cleanedContext = new ArrayList<OpenFileCtx>(); + synchronized (this) { + Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet() + .iterator(); + if (LOG.isTraceEnabled()) { + LOG.trace("openFileMap size:" + openFileMap.size()); + } + + while (it.hasNext()) { + Entry<FileHandle, OpenFileCtx> pairs = it.next(); + OpenFileCtx ctx = pairs.getValue(); + it.remove(); + cleanedContext.add(ctx); + } + } + + // Invoke the cleanup outside the lock + for (OpenFileCtx ofc : cleanedContext) { + ofc.cleanup(); + } + } + + void shutdown() { + // stop the dump thread + if (streamMonitor != null && streamMonitor.isAlive()) { + streamMonitor.shouldRun(false); + streamMonitor.interrupt(); + try { + streamMonitor.join(3000); + } catch (InterruptedException e) { + } + } + + cleanAll(); + } + + /** + * StreamMonitor wakes up periodically to find and closes idle streams. + */ + class StreamMonitor extends Daemon { + private final static int rotation = 5 * 1000; // 5 seconds + private long lastWakeupTime = 0; + private boolean shouldRun = true; + + void shouldRun(boolean shouldRun) { + this.shouldRun = shouldRun; + } + + @Override + public void run() { + while (shouldRun) { + scan(streamTimeout); + + // Check if it can sleep + try { + long workedTime = System.currentTimeMillis() - lastWakeupTime; + if (workedTime < rotation) { + if (LOG.isTraceEnabled()) { + LOG.trace("StreamMonitor can still have a sleep:" + + ((rotation - workedTime) / 1000)); + } + Thread.sleep(rotation - workedTime); + } + lastWakeupTime = System.currentTimeMillis(); + + } catch (InterruptedException e) { + LOG.info("StreamMonitor got interrupted"); + return; + } + } + } + } +} \ No newline at end of file Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java?rev=1539834&r1=1539833&r2=1539834&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/RpcProgramNfs3.java Thu Nov 7 21:49:13 2013 @@ -214,6 +214,11 @@ public class RpcProgramNfs3 extends RpcP } } + @Override + public void startDaemons() { + writeManager.startAsyncDataSerivce(); + } + /****************************************************** * RPC call handlers ******************************************************/ @@ -778,7 +783,8 @@ public class RpcProgramNfs3 extends RpcP int createMode = request.getMode(); if ((createMode != Nfs3Constant.CREATE_EXCLUSIVE) - && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE)) { + && request.getObjAttr().getUpdateFields().contains(SetAttrField.SIZE) + && request.getObjAttr().getSize() != 0) { LOG.error("Setting file size is not supported when creating file: " + fileName + " dir fileId:" + dirHandle.getFileId()); return new CREATE3Response(Nfs3Status.NFS3ERR_INVAL); @@ -831,6 +837,23 @@ public class RpcProgramNfs3 extends RpcP postOpObjAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); dirWcc = Nfs3Utils.createWccData(Nfs3Utils.getWccAttr(preOpDirAttr), dfsClient, dirFileIdPath, iug); + + // Add open stream + OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, + writeDumpDir + "/" + postOpObjAttr.getFileId(), dfsClient, iug); + fileHandle = new FileHandle(postOpObjAttr.getFileId()); + if (!writeManager.addOpenFileStream(fileHandle, openFileCtx)) { + LOG.warn("Can't add more stream, close it." + + " Future write will become append"); + fos.close(); + fos = null; + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Opened stream for file:" + fileName + ", fileId:" + + fileHandle.getFileId()); + } + } + } catch (IOException e) { LOG.error("Exception", e); if (fos != null) { @@ -859,16 +882,6 @@ public class RpcProgramNfs3 extends RpcP } } - // Add open stream - OpenFileCtx openFileCtx = new OpenFileCtx(fos, postOpObjAttr, writeDumpDir - + "/" + postOpObjAttr.getFileId(), dfsClient, iug); - fileHandle = new FileHandle(postOpObjAttr.getFileId()); - writeManager.addOpenFileStream(fileHandle, openFileCtx); - if (LOG.isDebugEnabled()) { - LOG.debug("open stream for file:" + fileName + ", fileId:" - + fileHandle.getFileId()); - } - return new CREATE3Response(Nfs3Status.NFS3_OK, fileHandle, postOpObjAttr, dirWcc); } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java?rev=1539834&r1=1539833&r2=1539834&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java Thu Nov 7 21:49:13 2013 @@ -18,8 +18,6 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import java.io.IOException; -import java.util.Iterator; -import java.util.Map.Entry; import java.util.concurrent.ConcurrentMap; import org.apache.commons.logging.Log; @@ -29,11 +27,12 @@ import org.apache.hadoop.fs.CommonConfig import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.COMMIT_STATUS; +import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.nfs.NfsFileType; import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.IdUserGroup; import org.apache.hadoop.nfs.nfs3.Nfs3Constant; -import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; import org.apache.hadoop.nfs.nfs3.Nfs3Status; import org.apache.hadoop.nfs.nfs3.request.WRITE3Request; @@ -56,69 +55,70 @@ public class WriteManager { private final Configuration config; private final IdUserGroup iug; - private final ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = Maps - .newConcurrentMap(); - + private AsyncDataService asyncDataService; private boolean asyncDataServiceStarted = false; - private final StreamMonitor streamMonitor; - + private final int maxStreams; + /** * The time limit to wait for accumulate reordered sequential writes to the * same file before the write is considered done. */ private long streamTimeout; - - public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes - public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds - - void addOpenFileStream(FileHandle h, OpenFileCtx ctx) { - openFileMap.put(h, ctx); - if (LOG.isDebugEnabled()) { - LOG.debug("After add the new stream " + h.getFileId() - + ", the stream number:" + openFileMap.size()); + + private final OpenFileCtxCache fileContextCache; + + static public class MultipleCachedStreamException extends IOException { + private static final long serialVersionUID = 1L; + + public MultipleCachedStreamException(String msg) { + super(msg); } } + boolean addOpenFileStream(FileHandle h, OpenFileCtx ctx) { + return fileContextCache.put(h, ctx); + } + WriteManager(IdUserGroup iug, final Configuration config) { this.iug = iug; this.config = config; - - streamTimeout = config.getLong("dfs.nfs3.stream.timeout", - DEFAULT_STREAM_TIMEOUT); + streamTimeout = config.getLong(Nfs3Constant.OUTPUT_STREAM_TIMEOUT, + Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT); LOG.info("Stream timeout is " + streamTimeout + "ms."); - if (streamTimeout < MINIMIUM_STREAM_TIMEOUT) { + if (streamTimeout < Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT) { LOG.info("Reset stream timeout to minimum value " - + MINIMIUM_STREAM_TIMEOUT + "ms."); - streamTimeout = MINIMIUM_STREAM_TIMEOUT; + + Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + "ms."); + streamTimeout = Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT; } - - this.streamMonitor = new StreamMonitor(); + maxStreams = config.getInt(Nfs3Constant.MAX_OPEN_FILES, + Nfs3Constant.MAX_OPEN_FILES_DEFAULT); + LOG.info("Maximum open streams is "+ maxStreams); + this.fileContextCache = new OpenFileCtxCache(config, streamTimeout); } - private void startAsyncDataSerivce() { - streamMonitor.start(); + void startAsyncDataSerivce() { + if (asyncDataServiceStarted) { + return; + } + fileContextCache.start(); this.asyncDataService = new AsyncDataService(); asyncDataServiceStarted = true; } - private void shutdownAsyncDataService() { - asyncDataService.shutdown(); + void shutdownAsyncDataService() { + if (!asyncDataServiceStarted) { + return; + } asyncDataServiceStarted = false; - streamMonitor.interrupt(); + asyncDataService.shutdown(); + fileContextCache.shutdown(); } void handleWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, Nfs3FileAttributes preOpAttr) throws IOException { - // First write request starts the async data service - if (!asyncDataServiceStarted) { - startAsyncDataSerivce(); - } - - long offset = request.getOffset(); int count = request.getCount(); - WriteStableHow stableHow = request.getStableHow(); byte[] data = request.getData().array(); if (data.length < count) { WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL); @@ -129,13 +129,12 @@ public class WriteManager { FileHandle handle = request.getHandle(); if (LOG.isDebugEnabled()) { - LOG.debug("handleWrite fileId: " + handle.getFileId() + " offset: " - + offset + " length:" + count + " stableHow:" + stableHow.getValue()); + LOG.debug("handleWrite " + request); } // Check if there is a stream to write FileHandle fileHandle = request.getHandle(); - OpenFileCtx openFileCtx = openFileMap.get(fileHandle); + OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx == null) { LOG.info("No opened stream for fileId:" + fileHandle.getFileId()); @@ -150,6 +149,15 @@ public class WriteManager { fos = dfsClient.append(fileIdPath, bufferSize, null, null); latestAttr = Nfs3Utils.getFileAttr(dfsClient, fileIdPath, iug); + } catch (RemoteException e) { + IOException io = e.unwrapRemoteException(); + if (io instanceof AlreadyBeingCreatedException) { + LOG.warn("Can't append file:" + fileIdPath + + ". Possibly the file is being closed. Drop the request:" + + request + ", wait for the client to retry..."); + return; + } + throw e; } catch (IOException e) { LOG.error("Can't apapend to file:" + fileIdPath + ", error:" + e); if (fos != null) { @@ -170,9 +178,26 @@ public class WriteManager { Nfs3Constant.FILE_DUMP_DIR_DEFAULT); openFileCtx = new OpenFileCtx(fos, latestAttr, writeDumpDir + "/" + fileHandle.getFileId(), dfsClient, iug); - addOpenFileStream(fileHandle, openFileCtx); + + if (!addOpenFileStream(fileHandle, openFileCtx)) { + LOG.info("Can't add new stream. Close it. Tell client to retry."); + try { + fos.close(); + } catch (IOException e) { + LOG.error("Can't close stream for fileId:" + handle.getFileId()); + } + // Notify client to retry + WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_JUKEBOX, + fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannel(channel, + response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()), + xid); + return; + } + if (LOG.isDebugEnabled()) { - LOG.debug("opened stream for file:" + fileHandle.getFileId()); + LOG.debug("Opened stream for appending file:" + fileHandle.getFileId()); } } @@ -185,7 +210,7 @@ public class WriteManager { void handleCommit(DFSClient dfsClient, FileHandle fileHandle, long commitOffset, Channel channel, int xid, Nfs3FileAttributes preOpAttr) { int status; - OpenFileCtx openFileCtx = openFileMap.get(fileHandle); + OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx == null) { LOG.info("No opened stream for fileId:" + fileHandle.getFileId() @@ -238,7 +263,7 @@ public class WriteManager { String fileIdPath = Nfs3Utils.getFileIdPath(fileHandle); Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug); if (attr != null) { - OpenFileCtx openFileCtx = openFileMap.get(fileHandle); + OpenFileCtx openFileCtx = fileContextCache.get(fileHandle); if (openFileCtx != null) { attr.setSize(openFileCtx.getNextOffset()); attr.setUsed(openFileCtx.getNextOffset()); @@ -253,8 +278,8 @@ public class WriteManager { Nfs3FileAttributes attr = Nfs3Utils.getFileAttr(client, fileIdPath, iug); if ((attr != null) && (attr.getType() == NfsFileType.NFSREG.toValue())) { - OpenFileCtx openFileCtx = openFileMap - .get(new FileHandle(attr.getFileId())); + OpenFileCtx openFileCtx = fileContextCache.get(new FileHandle(attr + .getFileId())); if (openFileCtx != null) { attr.setSize(openFileCtx.getNextOffset()); @@ -263,56 +288,9 @@ public class WriteManager { } return attr; } - - @VisibleForTesting - ConcurrentMap<FileHandle, OpenFileCtx> getOpenFileMap() { - return this.openFileMap; - } - - /** - * StreamMonitor wakes up periodically to find and closes idle streams. - */ - class StreamMonitor extends Daemon { - private int rotation = 5 * 1000; // 5 seconds - private long lastWakeupTime = 0; - - @Override - public void run() { - while (true) { - Iterator<Entry<FileHandle, OpenFileCtx>> it = openFileMap.entrySet() - .iterator(); - if (LOG.isTraceEnabled()) { - LOG.trace("openFileMap size:" + openFileMap.size()); - } - while (it.hasNext()) { - Entry<FileHandle, OpenFileCtx> pairs = it.next(); - OpenFileCtx ctx = pairs.getValue(); - if (ctx.streamCleanup((pairs.getKey()).getFileId(), streamTimeout)) { - it.remove(); - if (LOG.isDebugEnabled()) { - LOG.debug("After remove stream " + pairs.getKey().getFileId() - + ", the stream number:" + openFileMap.size()); - } - } - } - - // Check if it can sleep - try { - long workedTime = System.currentTimeMillis() - lastWakeupTime; - if (workedTime < rotation) { - if (LOG.isTraceEnabled()) { - LOG.trace("StreamMonitor can still have a sleep:" - + ((rotation - workedTime) / 1000)); - } - Thread.sleep(rotation - workedTime); - } - lastWakeupTime = System.currentTimeMillis(); - } catch (InterruptedException e) { - LOG.info("StreamMonitor got interrupted"); - return; - } - } - } + @VisibleForTesting + OpenFileCtxCache getOpenFileCtxCache() { + return this.fileContextCache; } } Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java?rev=1539834&r1=1539833&r2=1539834&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestMountd.java Thu Nov 7 21:49:13 2013 @@ -51,7 +51,7 @@ public class TestMountd { Nfs3 nfs3 = new Nfs3(exports, config); nfs3.start(false); - RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountBase() + RpcProgramMountd mountd = (RpcProgramMountd) nfs3.getMountd() .getRpcProgram(); mountd.nullOp(new XDR(), 1234, InetAddress.getByName("localhost")); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java?rev=1539834&r1=1539833&r2=1539834&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/TestOutOfOrderWrite.java Thu Nov 7 21:49:13 2013 @@ -135,6 +135,7 @@ public class TestOutOfOrderWrite { @Override protected ChannelPipelineFactory setPipelineFactory() { this.pipelineFactory = new ChannelPipelineFactory() { + @Override public ChannelPipeline getPipeline() { return Channels.pipeline( RpcUtil.constructRpcFrameDecoder(), Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java?rev=1539834&view=auto ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java (added) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOpenFileCtxCache.java Thu Nov 7 21:49:13 2013 @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.nfs.nfs3; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.client.HdfsDataOutputStream; +import org.apache.hadoop.hdfs.nfs.nfs3.OpenFileCtx.CommitCtx; +import org.apache.hadoop.nfs.nfs3.FileHandle; +import org.apache.hadoop.nfs.nfs3.IdUserGroup; +import org.apache.hadoop.nfs.nfs3.Nfs3Constant; +import org.apache.hadoop.nfs.nfs3.Nfs3FileAttributes; +import org.junit.Test; +import org.mockito.Mockito; + +public class TestOpenFileCtxCache { + static boolean cleaned = false; + + @Test + public void testEviction() throws IOException, InterruptedException { + Configuration conf = new Configuration(); + + // Only two entries will be in the cache + conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2); + + DFSClient dfsClient = Mockito.mock(DFSClient.class); + Nfs3FileAttributes attr = new Nfs3FileAttributes(); + HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); + Mockito.when(fos.getPos()).thenReturn((long) 0); + + OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context5 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + + OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100); + + boolean ret = cache.put(new FileHandle(1), context1); + assertTrue(ret); + Thread.sleep(1000); + ret = cache.put(new FileHandle(2), context2); + assertTrue(ret); + ret = cache.put(new FileHandle(3), context3); + assertFalse(ret); + assertTrue(cache.size() == 2); + + // Wait for the oldest stream to be evict-able, insert again + Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); + assertTrue(cache.size() == 2); + + ret = cache.put(new FileHandle(3), context3); + assertTrue(ret); + assertTrue(cache.size() == 2); + assertTrue(cache.get(new FileHandle(1)) == null); + + // Test inactive entry is evicted immediately + context3.setActiveStatusForTest(false); + ret = cache.put(new FileHandle(4), context4); + assertTrue(ret); + + // Now the cache has context2 and context4 + // Test eviction failure if all entries have pending work. + context2.getPendingWritesForTest().put(new OffsetRange(0, 100), + new WriteCtx(null, 0, 0, 0, null, null, null, 0, false, null)); + context4.getPendingCommitsForTest().put(new Long(100), + new CommitCtx(0, null, 0, attr)); + Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); + ret = cache.put(new FileHandle(5), context5); + assertFalse(ret); + } + + @Test + public void testScan() throws IOException, InterruptedException { + Configuration conf = new Configuration(); + + // Only two entries will be in the cache + conf.setInt(Nfs3Constant.MAX_OPEN_FILES, 2); + + DFSClient dfsClient = Mockito.mock(DFSClient.class); + Nfs3FileAttributes attr = new Nfs3FileAttributes(); + HdfsDataOutputStream fos = Mockito.mock(HdfsDataOutputStream.class); + Mockito.when(fos.getPos()).thenReturn((long) 0); + + OpenFileCtx context1 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context2 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context3 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + OpenFileCtx context4 = new OpenFileCtx(fos, attr, "/dumpFilePath", + dfsClient, new IdUserGroup()); + + OpenFileCtxCache cache = new OpenFileCtxCache(conf, 10 * 60 * 100); + + // Test cleaning expired entry + boolean ret = cache.put(new FileHandle(1), context1); + assertTrue(ret); + ret = cache.put(new FileHandle(2), context2); + assertTrue(ret); + Thread.sleep(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT + 1); + cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_MIN_DEFAULT); + assertTrue(cache.size() == 0); + + // Test cleaning inactive entry + ret = cache.put(new FileHandle(3), context3); + assertTrue(ret); + ret = cache.put(new FileHandle(4), context4); + assertTrue(ret); + context3.setActiveStatusForTest(false); + cache.scan(Nfs3Constant.OUTPUT_STREAM_TIMEOUT_DEFAULT); + assertTrue(cache.size() == 1); + assertTrue(cache.get(new FileHandle(3)) == null); + assertTrue(cache.get(new FileHandle(4)) != null); + } +} Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java?rev=1539834&r1=1539833&r2=1539834&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestWrites.java Thu Nov 7 21:49:13 2013 @@ -186,9 +186,8 @@ public class TestWrites { private void waitWrite(RpcProgramNfs3 nfsd, FileHandle handle, int maxWaitTime) throws InterruptedException { int waitedTime = 0; - ConcurrentMap<FileHandle, OpenFileCtx> openFileMap = nfsd.getWriteManager() - .getOpenFileMap(); - OpenFileCtx ctx = openFileMap.get(handle); + OpenFileCtx ctx = nfsd.getWriteManager() + .getOpenFileCtxCache().get(handle); assertTrue(ctx != null); do { Thread.sleep(3000); Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1539834&r1=1539833&r2=1539834&view=diff ============================================================================== --- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original) +++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov 7 21:49:13 2013 @@ -596,6 +596,8 @@ Release 2.2.1 - UNRELEASED HDFS-5252. Stable write is not handled correctly in someplace. (brandonli) + HDFS-5364. Add OpenFileCtx cache. (brandonli) + Release 2.2.0 - 2013-10-13 INCOMPATIBLE CHANGES