Author: davsclaus
Date: Fri Dec 19 01:26:35 2008
New Revision: 727984

URL: http://svn.apache.org/viewvc?rev=727984&view=rev
Log:
CAMEL-1195: Introduced ExclusiveReadLockStrategy with two build in strategies: 
fileLock and rename. Reworked the URI options to avoid consumer. prefix.

Added:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/ExclusiveReadLockStrategy.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
   (contents, props changed)
      - copied, changed from r727639, 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java
   (contents, props changed)
      - copied, changed from r727639, 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
Removed:
    
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
Modified:
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
    
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
    activemq/camel/trunk/camel-core/src/test/resources/log4j.properties

Added: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/ExclusiveReadLockStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/ExclusiveReadLockStrategy.java?rev=727984&view=auto
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/ExclusiveReadLockStrategy.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/ExclusiveReadLockStrategy.java
 Fri Dec 19 01:26:35 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Strategy for acquiring exclusive read locks for files to be consumed.
+ * After granting the read lock it is realeased, we just want to make sure 
that when we start
+ * consuming the file its not currently in progress of being written by third 
party.
+ * <p/>
+ * Camel supports out of the box the following strategies:
+ * <ul>
+ *   <li>FileLockExclusiveReadLockStrategy using {...@link 
java.nio.channels.FileLock}</li>
+ *   <li>FileRenameExclusiveReadLockStrategy waiting until its possible to 
rename the file. Can be used on file
+ *   systems where the FileLock isn't supported.</li>
+ * </ul>
+ */
+public interface ExclusiveReadLockStrategy {
+
+    /**
+     * Acquires exclusive read lock to the file.
+     *
+     * @param file the file
+     * @return true if read lock was acquired
+     * @throws IOException can be thrown
+     */
+    boolean acquireExclusiveReadLock(File file) throws IOException;
+
+}

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
 Fri Dec 19 01:26:35 2008
@@ -17,10 +17,6 @@
 package org.apache.camel.component.file;
 
 import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -29,7 +25,6 @@
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ScheduledPollConsumer;
 import org.apache.camel.processor.DeadLetterChannel;
-import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -44,10 +39,6 @@
     private FileEndpoint endpoint;
     private boolean recursive;
     private String regexPattern = "";
-    private boolean exclusiveReadLock = true;
-
-    // TODO: move option to endpoint to get rid of consumer. prefix
-    // TODO: remove idempotent again, we should just use the idempotent DSL we 
already have
 
     public FileConsumer(final FileEndpoint endpoint, Processor processor) {
         super(endpoint, processor);
@@ -139,11 +130,6 @@
         try {
             final FileProcessStrategy processStrategy = 
endpoint.getFileStrategy();
 
-            // is we use excluse read then acquire the exclusive read (waiting 
until we got it)
-            if (exclusiveReadLock) {
-                acquireExclusiveReadLock(target);
-            }
-
             if (LOG.isDebugEnabled()) {
                 LOG.debug("About to process file: " + target + " using 
exchange: " + exchange);
             }
@@ -189,31 +175,6 @@
     }
 
     /**
-     * Acquires exclusive read lock to the given file. Will wait until the 
lock is granted.
-     * After granting the read lock it is realeased, we just want to make sure 
that when we start
-     * consuming the file its not currently in progress of being written by 
third party.
-     */
-    protected void acquireExclusiveReadLock(File file) throws IOException {
-        if (LOG.isTraceEnabled()) {
-            LOG.trace("Waiting for exclusive read lock to file: " + file);
-        }
-
-        // try to acquire rw lock on the file before we can consume it
-        FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
-        try {
-            FileLock lock = channel.lock();
-            if (LOG.isTraceEnabled()) {
-                LOG.trace("Acquired exclusive read lock: " + lock + " to file: 
" + file);
-            }
-            // just release it now we dont want to hold it during the rest of 
the processing
-            lock.release();
-        } finally {
-            // must close channel
-            ObjectHelper.close(channel, "FileConsumer during acquiring of 
exclusive read lock", LOG);
-        }
-    }
-
-    /**
      * Strategy when the file was processed and a commit should be executed.
      *
      * @param processStrategy   the strategy to perform the commit
@@ -225,6 +186,12 @@
      */
     protected void processStrategyCommit(FileProcessStrategy processStrategy, 
FileExchange exchange,
                                          File file, boolean failureHandled) {
+        if (endpoint.isIdempotent()) {
+            // only add to idempotent repository if we could process the file
+            // use file.getPath as key for the idempotent repository to 
support files with same name but in different folders
+            endpoint.getIdempotentRepository().add(file.getPath());
+        }
+        
         try {
             if (LOG.isDebugEnabled()) {
                 LOG.debug("Committing file strategy: " + processStrategy + " 
for file: "
@@ -262,11 +229,9 @@
                 LOG.trace("File did not match. Will skip this file: " + file);
             }
             return false;
-        } else  if (endpoint.isIdempotent() && 
!endpoint.getIdempotentRepository().add(file.getName())) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("FileConsumer is idempotent and the file has been 
consumed before. Will skip this file: " + file);
-            }
-            // skip as we have already processed it
+        } else if (endpoint.isIdempotent() && 
endpoint.getIdempotentRepository().contains(file.getPath())) {
+            // use file.getPath as key for the idempotent repository to 
support files with same name but in different folders
+            LOG.warn("FileConsumer is idempotent and the file has been 
consumed before. Will skip this file: " + file);
             return false;
         }
 
@@ -352,12 +317,4 @@
         this.regexPattern = regexPattern;
     }
 
-    public boolean isExclusiveReadLock() {
-        return exclusiveReadLock;
-    }
-
-    public void setExclusiveReadLock(boolean exclusiveReadLock) {
-        this.exclusiveReadLock = exclusiveReadLock;
-    }
-
 }

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
 Fri Dec 19 01:26:35 2008
@@ -78,6 +78,9 @@
     private FileFilter filter;
     private Comparator<File> sorter;
     private Comparator<FileExchange> sortBy;
+    private ExclusiveReadLockStrategy exclusiveReadLockStrategy;
+    private String readLock = "fileLock";
+    private long readLockTimeout;
 
     protected FileEndpoint(File file, String endpointUri, FileComponent 
component) {
         super(endpointUri, component);
@@ -424,6 +427,30 @@
         setSortBy(DefaultFileSorter.sortByFileLanguage(expression, reverse));
     }
 
+    public ExclusiveReadLockStrategy getExclusiveReadLockStrategy() {
+        return exclusiveReadLockStrategy;
+    }
+
+    public void setExclusiveReadLockStrategy(ExclusiveReadLockStrategy 
exclusiveReadLockStrategy) {
+        this.exclusiveReadLockStrategy = exclusiveReadLockStrategy;
+    }
+
+    public String getReadLock() {
+        return readLock;
+    }
+
+    public void setReadLock(String readLock) {
+        this.readLock = readLock;
+    }
+
+    public long getReadLockTimeout() {
+        return readLockTimeout;
+    }
+
+    public void setReadLockTimeout(long readLockTimeout) {
+        this.readLockTimeout = readLockTimeout;
+    }
+
     /**
      * A strategy method to lazily create the file strategy
      */
@@ -488,6 +515,15 @@
         if (preMoveExpression != null) {
             params.put("preMoveExpression", preMoveExpression);
         }
+        if (exclusiveReadLockStrategy != null) {
+            params.put("exclusiveReadLockStrategy", exclusiveReadLockStrategy);
+        }
+        if (readLock != null) {
+            params.put("readLock", readLock);
+        }
+        if (readLockTimeout > 0) {
+            params.put("readLockTimeout", Long.valueOf(readLockTimeout));
+        }
 
         return params;
     }

Added: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java?rev=727984&view=auto
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
 Fri Dec 19 01:26:35 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+import org.apache.camel.component.file.ExclusiveReadLockStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Acquires exclusive read lock to the given file. Will wait until the lock is 
granted.
+ * After granting the read lock it is realeased, we just want to make sure 
that when we start
+ * consuming the file its not currently in progress of being written by third 
party.
+ */
+public class FileLockExclusiveReadLockStrategy implements 
ExclusiveReadLockStrategy {
+    private static final transient Log LOG = 
LogFactory.getLog(FileLockExclusiveReadLockStrategy.class);
+    private long timeout;
+
+    public boolean acquireExclusiveReadLock(File file) throws IOException {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for exclusive read lock to file: " + file);
+        }
+
+        // try to acquire rw lock on the file before we can consume it
+        FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
+
+        long start = System.currentTimeMillis();
+        boolean exclusive = false;
+        try {
+            while (!exclusive) {
+                // timeout check
+                if (timeout > 0) {
+                    long delta = System.currentTimeMillis() - start;
+                    if (delta > timeout) {
+                        LOG.debug("Could not acquire read lock within " + 
timeout + " millis. Will skip the file: " + file);
+                        // we could not get the lock within the timeout 
period, so return false
+                        return false;
+                    }
+                }
+
+                // get the lock using either try lock or not depending on if 
we are using timeout or not
+                FileLock lock = timeout > 0 ? channel.tryLock() : 
channel.lock();
+                if (lock != null) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Acquired exclusive read lock: " + lock + " 
to file: " + file);
+                    }
+                    // just release it now we dont want to hold it during the 
rest of the processing
+                    lock.release();
+                    exclusive = true;
+                } else {
+                    LOG.trace("Exclusive read lock not granted. Sleeping for 
1000 millis.");
+                    try {
+                        Thread.sleep(1000);
+                    } catch (InterruptedException e) {
+                        // ignore
+                    }
+                }
+            }
+        } finally {
+            // must close channel
+            ObjectHelper.close(channel, "while acquiring exclusive read lock 
for file: " + file, LOG);
+        }
+
+        return true;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Sets an optional timeout period.
+     * <p/>
+     * If the readlock could not be granted within the timeperiod then the 
wait is stopped and the
+     * {...@link #acquireExclusiveReadLock(java.io.File)} returns 
<tt>false</tt>.
+     *
+     * @param timeout period in millis
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+
+}

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
 Fri Dec 19 01:26:35 2008
@@ -19,7 +19,9 @@
 import java.util.Map;
 
 import org.apache.camel.Expression;
+import org.apache.camel.component.file.ExclusiveReadLockStrategy;
 import org.apache.camel.component.file.FileProcessStrategy;
+import org.apache.camel.util.ObjectHelper;
 
 /**
  * Factory to provide the {...@link 
org.apache.camel.component.file.FileProcessStrategy} to use.
@@ -49,9 +51,12 @@
         boolean preMove = preMoveNamePrefix != null || preMoveNamePostfix != 
null;
 
         if (isNoop) {
-            return new NoOpFileProcessStrategy(isLock);
+            NoOpFileProcessStrategy strategy = new 
NoOpFileProcessStrategy(isLock);
+            
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
+            return strategy;
         } else if (move || preMove) {
             RenameFileProcessStrategy strategy = new 
RenameFileProcessStrategy(isLock);
+            
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             if (move) {
                 strategy.setCommitRenamer(new 
DefaultFileRenamer(moveNamePrefix, moveNamePostfix));
             }
@@ -61,6 +66,7 @@
             return strategy;
         } else if (expression != null || preMoveExpression != null) {
             RenameFileProcessStrategy strategy = new 
RenameFileProcessStrategy(isLock);
+            
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
             if (expression != null) {
                 FileExpressionRenamer renamer = new FileExpressionRenamer();
                 renamer.setExpression(expression);
@@ -73,10 +79,44 @@
             }
             return strategy;
         } else if (isDelete) {
-            return new DeleteFileProcessStrategy(isLock);
+            DeleteFileProcessStrategy strategy = new 
DeleteFileProcessStrategy(isLock);
+            
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
+            return strategy;
         } else {
             // default strategy will move to .camel subfolder
-            return new RenameFileProcessStrategy(isLock);
+            RenameFileProcessStrategy strategy = new 
RenameFileProcessStrategy(isLock);
+            
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
+            return strategy;
+        }
+    }
+
+    private static ExclusiveReadLockStrategy 
getExclusiveReadLockStrategy(Map<String, Object> params) {
+        ExclusiveReadLockStrategy strategy = (ExclusiveReadLockStrategy) 
params.get("exclusiveReadLockStrategy");
+        if (strategy != null) {
+            return strategy;
+        }
+
+        // no explicit stategy set then fallback to readLock option
+        String readLock = (String) params.get("readLock");
+        if (ObjectHelper.isNotEmpty(readLock)) {
+            if ("none".equals(readLock) || "false".equals(readLock)) {
+                return null;
+            } else if ("fileLock".equals(readLock)) {
+                FileLockExclusiveReadLockStrategy readLockStrategy = new 
FileLockExclusiveReadLockStrategy();
+                Long timeout = (Long) params.get("readLockTimeout");
+                if (timeout != null) {
+                    readLockStrategy.setTimeout(timeout);
+                }
+            } else if ("rename".equals(readLock)) {
+                FileRenameExclusiveReadLockStrategy readLockStrategy = new 
FileRenameExclusiveReadLockStrategy();
+                Long timeout = (Long) params.get("readLockTimeout");
+                if (timeout != null) {
+                    readLockStrategy.setTimeout(timeout);
+                }
+                return readLockStrategy;
+            }
         }
+        
+        return null;
     }
 }

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
 Fri Dec 19 01:26:35 2008
@@ -22,6 +22,7 @@
 import java.nio.channels.FileChannel;
 import java.nio.channels.FileLock;
 
+import org.apache.camel.component.file.ExclusiveReadLockStrategy;
 import org.apache.camel.component.file.FileEndpoint;
 import org.apache.camel.component.file.FileExchange;
 import org.apache.camel.component.file.FileProcessStrategy;
@@ -39,6 +40,7 @@
     private static final transient Log LOG = 
LogFactory.getLog(FileProcessStrategySupport.class);
     private boolean lockFile;
     private FileRenamer lockFileRenamer;
+    private ExclusiveReadLockStrategy exclusiveReadLockStrategy;
 
     protected FileProcessStrategySupport() {
         this(true);
@@ -54,6 +56,15 @@
     }
 
     public boolean begin(FileEndpoint endpoint, FileExchange exchange, File 
file) throws Exception {
+        // is we use excluse read then acquire the exclusive read (waiting 
until we got it)
+        if (exclusiveReadLockStrategy != null) {
+            boolean lock = 
exclusiveReadLockStrategy.acquireExclusiveReadLock(file);
+            if (!lock) {
+                // do not begin sice we could not get the exclusive read lcok
+                return false;
+            }
+        }
+
         if (isLockFile()) {
             File newFile = lockFileRenamer.renameFile(exchange, file);
             String lockFileName = newFile.getAbsolutePath();
@@ -71,6 +82,7 @@
                 return false;
             }
         }
+
         return true;
     }
 
@@ -86,22 +98,6 @@
         }
     }
 
-    public boolean isLockFile() {
-        return lockFile;
-    }
-
-    public void setLockFile(boolean lockFile) {
-        this.lockFile = lockFile;
-    }
-
-    public FileRenamer getLockFileRenamer() {
-        return lockFileRenamer;
-    }
-
-    public void setLockFileRenamer(FileRenamer lockFileRenamer) {
-        this.lockFileRenamer = lockFileRenamer;
-    }
-
     protected void unlockFile(FileEndpoint endpoint, FileExchange exchange, 
File file) throws Exception {
         if (isLockFile()) {
             FileLock lock = ExchangeHelper.getMandatoryProperty(exchange, 
"org.apache.camel.file.lock", FileLock.class);
@@ -124,4 +120,28 @@
             }
         }
     }
+
+    public boolean isLockFile() {
+        return lockFile;
+    }
+
+    public void setLockFile(boolean lockFile) {
+        this.lockFile = lockFile;
+    }
+
+    public FileRenamer getLockFileRenamer() {
+        return lockFileRenamer;
+    }
+
+    public void setLockFileRenamer(FileRenamer lockFileRenamer) {
+        this.lockFileRenamer = lockFileRenamer;
+    }
+
+    public ExclusiveReadLockStrategy getExclusiveReadLockStrategy() {
+        return exclusiveReadLockStrategy;
+    }
+
+    public void setExclusiveReadLockStrategy(ExclusiveReadLockStrategy 
exclusiveReadLockStrategy) {
+        this.exclusiveReadLockStrategy = exclusiveReadLockStrategy;
+    }
 }

Added: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java?rev=727984&view=auto
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
 Fri Dec 19 01:26:35 2008
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.camel.component.file.ExclusiveReadLockStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Acquires exclusive read lock to the given file. Will wait until the lock is 
granted.
+ * After granting the read lock it is realeased, we just want to make sure 
that when we start
+ * consuming the file its not currently in progress of being written by third 
party.
+ */
+public class FileRenameExclusiveReadLockStrategy implements 
ExclusiveReadLockStrategy {
+    private static final transient Log LOG = 
LogFactory.getLog(FileRenameExclusiveReadLockStrategy.class);
+    private long timeout;
+
+    public boolean acquireExclusiveReadLock(File file) throws IOException {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Waiting for exclusive read lock to file: " + file);
+        }
+
+        // the trick is to try to rename the file, if we can rename then we 
have exclusive read
+        // since its a remote file we can not use java.nio to get a RW lock
+        String originalName = file.getName();
+        File tempFile = new File(file.getParent(), originalName + 
".camelExclusiveReadLock");
+
+        long start = System.currentTimeMillis();
+
+        boolean exclusive = false;
+        while (!exclusive) {
+            // timeout check
+            if (timeout > 0) {
+                long delta = System.currentTimeMillis() - start;
+                if (delta > timeout) {
+                    LOG.debug("Could not acquire read lock within " + timeout 
+ " millis. Will skip the file: " + file);
+                    // we could not get the lock within the timeout period, so 
return false
+                    return false;
+                }
+            }
+
+            exclusive = file.renameTo(tempFile);
+            if (exclusive) {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Acquired exclusive read lock to file: " + file);
+                }
+                // rename it back so we can read it
+                tempFile.renameTo(file);
+            } else {
+                LOG.trace("Exclusive read lock not granted. Sleeping for 1000 
millis.");
+                try {
+                    Thread.sleep(1000);
+                } catch (InterruptedException e) {
+                    // ignore
+                }
+            }
+        }
+
+        return true;
+    }
+
+    public long getTimeout() {
+        return timeout;
+    }
+
+    /**
+     * Sets an optional timeout period.
+     * <p/>
+     * If the readlock could not be granted within the timeperiod then the 
wait is stopped and the
+     * {...@link #acquireExclusiveReadLock(java.io.File)} returns 
<tt>false</tt>.
+     *
+     * @param timeout period in millis
+     */
+    public void setTimeout(long timeout) {
+        this.timeout = timeout;
+    }
+}

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
 Fri Dec 19 01:26:35 2008
@@ -718,7 +718,7 @@
         LOG.debug("Took " + delta + " millis to complete latch");
 
         if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
-            fail("Expected minimum " + resultWaitTime
+            fail("Expected minimum " + resultMinimumWaitTime
                     + " millis waiting on the result, but was faster with " + 
delta + " millis.");
         }
     }

Modified: 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
 Fri Dec 19 01:26:35 2008
@@ -61,7 +61,9 @@
             LOG.debug("Starting to poll: " + this.getEndpoint());
         }
         try {
-            poll();
+            if (isRunAllowed()) {
+                poll();
+            }
         } catch (Exception e) {
             LOG.warn("An exception occured while polling: " + 
this.getEndpoint() + ": " + e.getMessage(), e);
             if (firstExceptionThrown == null) {

Copied: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
 (from r727639, 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java)
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java&r1=727639&r2=727984&rev=727984&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
 Fri Dec 19 01:26:35 2008
@@ -16,24 +16,16 @@
  */
 package org.apache.camel.component.file;
 
-import java.io.File;
-import java.io.FileOutputStream;
-import java.nio.channels.FileLock;
-
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 
 /**
- * Unit test to verify exclusive read - that we do not poll files that is in 
progress of being written.
+ * Unit test to verify exclusive read by for manual testing.
  */
-public class FileExclusiveReadTest extends ContextTestSupport {
-
-    private static final Log LOG = 
LogFactory.getLog(FileExclusiveReadTest.class);
+public class FileExclusiveReadManuelTest extends ContextTestSupport {
 
-    private String fileUrl = 
"file://target/exclusiveread/slowfile?consumer.delay=500&consumer.exclusiveReadLock=true";
+    private String fileUrl = "file://target/exclusiveread?readLock=fileLock";
 
     @Override
     protected void setUp() throws Exception {
@@ -41,47 +33,15 @@
         super.setUp();
     }
 
-    public void testPoolIn3SecondsButNoFiles() throws Exception {
+    public void testManually() throws Exception {
         deleteDirectory("./target/exclusiveread");
-        createDirectory("./target/exclusiveread/slowfile");
         MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.setSleepForEmptyTest(10000);
         mock.expectedMessageCount(0);
 
-        Thread.sleep(3 * 1000L);
-
-        mock.assertIsSatisfied();
-    }
-
-    // TODO: Not possible to test in the same JVM (see javadoc for FileLock)
-    public void xxxtestPollFileWhileSlowFileIsBeingWritten() throws Exception {
-        deleteDirectory("./target/exclusiveread");
-        createDirectory("./target/exclusiveread/slowfile");
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(1);
-        mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye 
Worl");
-
-        createSlowFile();
-
         mock.assertIsSatisfied();
     }
 
-    private void createSlowFile() throws Exception {
-        LOG.info("Creating a slow file ...");
-        File file = new File("./target/exclusiveread/slowfile/hello.txt");
-        FileOutputStream fos = new FileOutputStream(file);
-        FileLock lock = fos.getChannel().lock();
-        fos.write("Hello World".getBytes());
-        for (int i = 0; i < 3; i++) {
-            Thread.sleep(1000);
-            fos.write(("Line #" + i).getBytes());
-            LOG.info("Appending to slowfile");
-        }
-        fos.write("Bye World".getBytes());
-        lock.release();
-        fos.close();
-        LOG.info("... done creating slowfile");
-    }
-
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
@@ -90,4 +50,4 @@
         };
     }
 
-}
+}
\ No newline at end of file

Propchange: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
------------------------------------------------------------------------------
    svn:mergeinfo = 

Added: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java?rev=727984&view=auto
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
 (added)
+++ 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
 Fri Dec 19 01:26:35 2008
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.channels.FileLock;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Unit test to verify exclusive read option using *none*
+ */
+public class FileExclusiveReadNoneStrategyTest extends ContextTestSupport {
+
+    private static final Log LOG = 
LogFactory.getLog(FileExclusiveReadNoneStrategyTest.class);
+    private String fileUrl = 
"file://target/exclusiveread/slowfile?noop=true&consumer.delay=500&readLock=none";
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start").process(new MySlowFileProcessor());
+                from(fileUrl + "&readLockTimeout=1000").to("mock:result");
+            }
+        };
+    }
+
+    public void testPollFileWhileSlowFileIsBeingWritten() throws Exception {
+        deleteDirectory("./target/exclusiveread");
+        createDirectory("./target/exclusiveread/slowfile");
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("Hello World");
+        mock.expectedMessageCount(1);
+
+        // send a message to seda:start to trigger the creating of the 
slowfile to poll
+        template.sendBody("seda:start", "Create the slow file");
+
+        mock.assertIsSatisfied();
+
+        String body = 
mock.getReceivedExchanges().get(0).getIn().getBody(String.class);
+        LOG.debug("Body is: " + body);
+        assertFalse("Should not wait and read the entire file", 
body.endsWith("Bye World"));
+    }
+
+    private class MySlowFileProcessor implements Processor {
+
+        public void process(Exchange exchange) throws Exception {
+            LOG.info("Creating a slow fil with no locks...");
+            File file = new File("./target/exclusiveread/slowfile/hello.txt");
+            FileOutputStream fos = new FileOutputStream(file);
+            fos.write("Hello World".getBytes());
+            for (int i = 0; i < 3; i++) {
+                Thread.sleep(1000);
+                fos.write(("Line #" + i).getBytes());
+                LOG.info("Appending to slowfile");
+            }
+            fos.write("Bye World".getBytes());
+            fos.close();
+            LOG.info("... done creating slowfile");
+        }
+    }
+
+}
\ No newline at end of file

Copied: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java
 (from r727639, 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java)
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java&r1=727639&r2=727984&rev=727984&view=diff
==============================================================================
--- 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
 (original)
+++ 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java
 Fri Dec 19 01:26:35 2008
@@ -21,6 +21,8 @@
 import java.nio.channels.FileLock;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.commons.logging.Log;
@@ -29,16 +31,14 @@
 /**
  * Unit test to verify exclusive read - that we do not poll files that is in 
progress of being written.
  */
-public class FileExclusiveReadTest extends ContextTestSupport {
+public class FileExclusiveReadRenameStrategyTest extends ContextTestSupport {
 
-    private static final Log LOG = 
LogFactory.getLog(FileExclusiveReadTest.class);
-
-    private String fileUrl = 
"file://target/exclusiveread/slowfile?consumer.delay=500&consumer.exclusiveReadLock=true";
+    private static final Log LOG = 
LogFactory.getLog(FileExclusiveReadRenameStrategyTest.class);
+    private String fileUrl = 
"file://target/exclusiveread/slowfile?consumer.delay=500&readLock=rename";
 
     @Override
-    protected void setUp() throws Exception {
-        disableJMX();
-        super.setUp();
+    public boolean isUseRouteBuilder() {
+        return false;
     }
 
     public void testPoolIn3SecondsButNoFiles() throws Exception {
@@ -46,48 +46,103 @@
         createDirectory("./target/exclusiveread/slowfile");
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(0);
+        mock.setMinimumResultWaitTime(3000);
 
         Thread.sleep(3 * 1000L);
 
         mock.assertIsSatisfied();
     }
 
-    // TODO: Not possible to test in the same JVM (see javadoc for FileLock)
-    public void xxxtestPollFileWhileSlowFileIsBeingWritten() throws Exception {
+    public void testPollFileWhileSlowFileIsBeingWritten() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start").process(new MySlowFileProcessor());
+                from(fileUrl).to("mock:result");
+            }
+        });
+        context.start();
+
         deleteDirectory("./target/exclusiveread");
         createDirectory("./target/exclusiveread/slowfile");
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(1);
-        mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye 
Worl");
+        mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye 
World");
+        mock.setMinimumResultWaitTime(3000);
 
-        createSlowFile();
+        // send a message to seda:start to trigger the creating of the 
slowfile to poll
+        template.sendBody("seda:start", "Create the slow file");
 
         mock.assertIsSatisfied();
     }
 
-    private void createSlowFile() throws Exception {
-        LOG.info("Creating a slow file ...");
-        File file = new File("./target/exclusiveread/slowfile/hello.txt");
-        FileOutputStream fos = new FileOutputStream(file);
-        FileLock lock = fos.getChannel().lock();
-        fos.write("Hello World".getBytes());
-        for (int i = 0; i < 3; i++) {
-            Thread.sleep(1000);
-            fos.write(("Line #" + i).getBytes());
-            LOG.info("Appending to slowfile");
-        }
-        fos.write("Bye World".getBytes());
-        lock.release();
-        fos.close();
-        LOG.info("... done creating slowfile");
+    public void testPollFileWhileSlowFileIsBeingWrittenWithTimeout() throws 
Exception {
+        context.addRoutes(new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("seda:start").process(new MySlowFileProcessor());
+                from(fileUrl + "&readLockTimeout=1000").to("mock:result");
+            }
+        });
+        context.start();
+
+        deleteDirectory("./target/exclusiveread");
+        createDirectory("./target/exclusiveread/slowfile");
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(0);
+        mock.setMinimumResultWaitTime(2000);
+        mock.setResultWaitTime(5000);
+
+        // send a message to seda:start to trigger the creating of the 
slowfile to poll
+        template.sendBody("seda:start", "Create the slow file");
+
+        mock.assertIsSatisfied();
     }
 
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
+    public void testPollFileWhileSlowFileIsBeingWrittenWithTimeoutAndNoop() 
throws Exception {
+        // to test that if noop and thus idempotent we will retry to consume 
the file
+        // the 2nd. time since the first time we could not get the read lock 
due timeout
+        // so the file should only be marked in the idempotent repository if 
we could process it
+        context.addRoutes(new RouteBuilder() {
+            @Override
             public void configure() throws Exception {
-                from(fileUrl).to("mock:result");
+                from("seda:start").process(new MySlowFileProcessor());
+                from(fileUrl + 
"&readLockTimeout=1000&noop=true").to("mock:result");
             }
-        };
+        });
+        context.start();
+
+        deleteDirectory("./target/exclusiveread");
+        createDirectory("./target/exclusiveread/slowfile");
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+        mock.setMinimumResultWaitTime(3000);
+        mock.setResultWaitTime(5000);
+
+        // send a message to seda:start to trigger the creating of the 
slowfile to poll
+        template.sendBody("seda:start", "Create the slow file");
+
+        mock.assertIsSatisfied();
+    }
+
+    private class MySlowFileProcessor implements Processor {
+
+        public void process(Exchange exchange) throws Exception {
+            LOG.info("Creating a slow fil ...");
+            File file = new File("./target/exclusiveread/slowfile/hello.txt");
+            FileOutputStream fos = new FileOutputStream(file);
+            FileLock lock = fos.getChannel().lock();
+            fos.write("Hello World".getBytes());
+            for (int i = 0; i < 3; i++) {
+                Thread.sleep(1000);
+                fos.write(("Line #" + i).getBytes());
+                LOG.info("Appending to slowfile");
+            }
+            fos.write("Bye World".getBytes());
+            lock.release();
+            fos.close();
+            LOG.info("... done creating slowfile");
+        }
     }
 
 }

Propchange: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: 
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/camel-core/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/resources/log4j.properties 
(original)
+++ activemq/camel/trunk/camel-core/src/test/resources/log4j.properties Fri Dec 
19 01:26:35 2008
@@ -22,6 +22,7 @@
 
 log4j.logger.org.apache.activemq.spring=WARN
 log4j.logger.org.apache.camel=DEBUG
+#log4j.logger.org.apache.camel.component=TRACE
 log4j.logger.org.apache.camel.impl.converter=WARN
 
 # CONSOLE appender not used by default


Reply via email to