Author: davsclaus
Date: Fri Dec 19 01:26:35 2008
New Revision: 727984
URL: http://svn.apache.org/viewvc?rev=727984&view=rev
Log:
CAMEL-1195: Introduced ExclusiveReadLockStrategy with two build in strategies:
fileLock and rename. Reworked the URI options to avoid consumer. prefix.
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/ExclusiveReadLockStrategy.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
(contents, props changed)
- copied, changed from r727639,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java
(contents, props changed)
- copied, changed from r727639,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
Removed:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
activemq/camel/trunk/camel-core/src/test/resources/log4j.properties
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/ExclusiveReadLockStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/ExclusiveReadLockStrategy.java?rev=727984&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/ExclusiveReadLockStrategy.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/ExclusiveReadLockStrategy.java
Fri Dec 19 01:26:35 2008
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Strategy for acquiring exclusive read locks for files to be consumed.
+ * After granting the read lock it is realeased, we just want to make sure
that when we start
+ * consuming the file its not currently in progress of being written by third
party.
+ * <p/>
+ * Camel supports out of the box the following strategies:
+ * <ul>
+ * <li>FileLockExclusiveReadLockStrategy using {...@link
java.nio.channels.FileLock}</li>
+ * <li>FileRenameExclusiveReadLockStrategy waiting until its possible to
rename the file. Can be used on file
+ * systems where the FileLock isn't supported.</li>
+ * </ul>
+ */
+public interface ExclusiveReadLockStrategy {
+
+ /**
+ * Acquires exclusive read lock to the file.
+ *
+ * @param file the file
+ * @return true if read lock was acquired
+ * @throws IOException can be thrown
+ */
+ boolean acquireExclusiveReadLock(File file) throws IOException;
+
+}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
Fri Dec 19 01:26:35 2008
@@ -17,10 +17,6 @@
package org.apache.camel.component.file;
import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.channels.FileChannel;
-import java.nio.channels.FileLock;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
@@ -29,7 +25,6 @@
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.processor.DeadLetterChannel;
-import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -44,10 +39,6 @@
private FileEndpoint endpoint;
private boolean recursive;
private String regexPattern = "";
- private boolean exclusiveReadLock = true;
-
- // TODO: move option to endpoint to get rid of consumer. prefix
- // TODO: remove idempotent again, we should just use the idempotent DSL we
already have
public FileConsumer(final FileEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -139,11 +130,6 @@
try {
final FileProcessStrategy processStrategy =
endpoint.getFileStrategy();
- // is we use excluse read then acquire the exclusive read (waiting
until we got it)
- if (exclusiveReadLock) {
- acquireExclusiveReadLock(target);
- }
-
if (LOG.isDebugEnabled()) {
LOG.debug("About to process file: " + target + " using
exchange: " + exchange);
}
@@ -189,31 +175,6 @@
}
/**
- * Acquires exclusive read lock to the given file. Will wait until the
lock is granted.
- * After granting the read lock it is realeased, we just want to make sure
that when we start
- * consuming the file its not currently in progress of being written by
third party.
- */
- protected void acquireExclusiveReadLock(File file) throws IOException {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Waiting for exclusive read lock to file: " + file);
- }
-
- // try to acquire rw lock on the file before we can consume it
- FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
- try {
- FileLock lock = channel.lock();
- if (LOG.isTraceEnabled()) {
- LOG.trace("Acquired exclusive read lock: " + lock + " to file:
" + file);
- }
- // just release it now we dont want to hold it during the rest of
the processing
- lock.release();
- } finally {
- // must close channel
- ObjectHelper.close(channel, "FileConsumer during acquiring of
exclusive read lock", LOG);
- }
- }
-
- /**
* Strategy when the file was processed and a commit should be executed.
*
* @param processStrategy the strategy to perform the commit
@@ -225,6 +186,12 @@
*/
protected void processStrategyCommit(FileProcessStrategy processStrategy,
FileExchange exchange,
File file, boolean failureHandled) {
+ if (endpoint.isIdempotent()) {
+ // only add to idempotent repository if we could process the file
+ // use file.getPath as key for the idempotent repository to
support files with same name but in different folders
+ endpoint.getIdempotentRepository().add(file.getPath());
+ }
+
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Committing file strategy: " + processStrategy + "
for file: "
@@ -262,11 +229,9 @@
LOG.trace("File did not match. Will skip this file: " + file);
}
return false;
- } else if (endpoint.isIdempotent() &&
!endpoint.getIdempotentRepository().add(file.getName())) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("FileConsumer is idempotent and the file has been
consumed before. Will skip this file: " + file);
- }
- // skip as we have already processed it
+ } else if (endpoint.isIdempotent() &&
endpoint.getIdempotentRepository().contains(file.getPath())) {
+ // use file.getPath as key for the idempotent repository to
support files with same name but in different folders
+ LOG.warn("FileConsumer is idempotent and the file has been
consumed before. Will skip this file: " + file);
return false;
}
@@ -352,12 +317,4 @@
this.regexPattern = regexPattern;
}
- public boolean isExclusiveReadLock() {
- return exclusiveReadLock;
- }
-
- public void setExclusiveReadLock(boolean exclusiveReadLock) {
- this.exclusiveReadLock = exclusiveReadLock;
- }
-
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileEndpoint.java
Fri Dec 19 01:26:35 2008
@@ -78,6 +78,9 @@
private FileFilter filter;
private Comparator<File> sorter;
private Comparator<FileExchange> sortBy;
+ private ExclusiveReadLockStrategy exclusiveReadLockStrategy;
+ private String readLock = "fileLock";
+ private long readLockTimeout;
protected FileEndpoint(File file, String endpointUri, FileComponent
component) {
super(endpointUri, component);
@@ -424,6 +427,30 @@
setSortBy(DefaultFileSorter.sortByFileLanguage(expression, reverse));
}
+ public ExclusiveReadLockStrategy getExclusiveReadLockStrategy() {
+ return exclusiveReadLockStrategy;
+ }
+
+ public void setExclusiveReadLockStrategy(ExclusiveReadLockStrategy
exclusiveReadLockStrategy) {
+ this.exclusiveReadLockStrategy = exclusiveReadLockStrategy;
+ }
+
+ public String getReadLock() {
+ return readLock;
+ }
+
+ public void setReadLock(String readLock) {
+ this.readLock = readLock;
+ }
+
+ public long getReadLockTimeout() {
+ return readLockTimeout;
+ }
+
+ public void setReadLockTimeout(long readLockTimeout) {
+ this.readLockTimeout = readLockTimeout;
+ }
+
/**
* A strategy method to lazily create the file strategy
*/
@@ -488,6 +515,15 @@
if (preMoveExpression != null) {
params.put("preMoveExpression", preMoveExpression);
}
+ if (exclusiveReadLockStrategy != null) {
+ params.put("exclusiveReadLockStrategy", exclusiveReadLockStrategy);
+ }
+ if (readLock != null) {
+ params.put("readLock", readLock);
+ }
+ if (readLockTimeout > 0) {
+ params.put("readLockTimeout", Long.valueOf(readLockTimeout));
+ }
return params;
}
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java?rev=727984&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileLockExclusiveReadLockStrategy.java
Fri Dec 19 01:26:35 2008
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+
+import org.apache.camel.component.file.ExclusiveReadLockStrategy;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Acquires exclusive read lock to the given file. Will wait until the lock is
granted.
+ * After granting the read lock it is realeased, we just want to make sure
that when we start
+ * consuming the file its not currently in progress of being written by third
party.
+ */
+public class FileLockExclusiveReadLockStrategy implements
ExclusiveReadLockStrategy {
+ private static final transient Log LOG =
LogFactory.getLog(FileLockExclusiveReadLockStrategy.class);
+ private long timeout;
+
+ public boolean acquireExclusiveReadLock(File file) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for exclusive read lock to file: " + file);
+ }
+
+ // try to acquire rw lock on the file before we can consume it
+ FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
+
+ long start = System.currentTimeMillis();
+ boolean exclusive = false;
+ try {
+ while (!exclusive) {
+ // timeout check
+ if (timeout > 0) {
+ long delta = System.currentTimeMillis() - start;
+ if (delta > timeout) {
+ LOG.debug("Could not acquire read lock within " +
timeout + " millis. Will skip the file: " + file);
+ // we could not get the lock within the timeout
period, so return false
+ return false;
+ }
+ }
+
+ // get the lock using either try lock or not depending on if
we are using timeout or not
+ FileLock lock = timeout > 0 ? channel.tryLock() :
channel.lock();
+ if (lock != null) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Acquired exclusive read lock: " + lock + "
to file: " + file);
+ }
+ // just release it now we dont want to hold it during the
rest of the processing
+ lock.release();
+ exclusive = true;
+ } else {
+ LOG.trace("Exclusive read lock not granted. Sleeping for
1000 millis.");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+ } finally {
+ // must close channel
+ ObjectHelper.close(channel, "while acquiring exclusive read lock
for file: " + file, LOG);
+ }
+
+ return true;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Sets an optional timeout period.
+ * <p/>
+ * If the readlock could not be granted within the timeperiod then the
wait is stopped and the
+ * {...@link #acquireExclusiveReadLock(java.io.File)} returns
<tt>false</tt>.
+ *
+ * @param timeout period in millis
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+
+}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategyFactory.java
Fri Dec 19 01:26:35 2008
@@ -19,7 +19,9 @@
import java.util.Map;
import org.apache.camel.Expression;
+import org.apache.camel.component.file.ExclusiveReadLockStrategy;
import org.apache.camel.component.file.FileProcessStrategy;
+import org.apache.camel.util.ObjectHelper;
/**
* Factory to provide the {...@link
org.apache.camel.component.file.FileProcessStrategy} to use.
@@ -49,9 +51,12 @@
boolean preMove = preMoveNamePrefix != null || preMoveNamePostfix !=
null;
if (isNoop) {
- return new NoOpFileProcessStrategy(isLock);
+ NoOpFileProcessStrategy strategy = new
NoOpFileProcessStrategy(isLock);
+
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
+ return strategy;
} else if (move || preMove) {
RenameFileProcessStrategy strategy = new
RenameFileProcessStrategy(isLock);
+
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
if (move) {
strategy.setCommitRenamer(new
DefaultFileRenamer(moveNamePrefix, moveNamePostfix));
}
@@ -61,6 +66,7 @@
return strategy;
} else if (expression != null || preMoveExpression != null) {
RenameFileProcessStrategy strategy = new
RenameFileProcessStrategy(isLock);
+
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
if (expression != null) {
FileExpressionRenamer renamer = new FileExpressionRenamer();
renamer.setExpression(expression);
@@ -73,10 +79,44 @@
}
return strategy;
} else if (isDelete) {
- return new DeleteFileProcessStrategy(isLock);
+ DeleteFileProcessStrategy strategy = new
DeleteFileProcessStrategy(isLock);
+
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
+ return strategy;
} else {
// default strategy will move to .camel subfolder
- return new RenameFileProcessStrategy(isLock);
+ RenameFileProcessStrategy strategy = new
RenameFileProcessStrategy(isLock);
+
strategy.setExclusiveReadLockStrategy(getExclusiveReadLockStrategy(params));
+ return strategy;
+ }
+ }
+
+ private static ExclusiveReadLockStrategy
getExclusiveReadLockStrategy(Map<String, Object> params) {
+ ExclusiveReadLockStrategy strategy = (ExclusiveReadLockStrategy)
params.get("exclusiveReadLockStrategy");
+ if (strategy != null) {
+ return strategy;
+ }
+
+ // no explicit stategy set then fallback to readLock option
+ String readLock = (String) params.get("readLock");
+ if (ObjectHelper.isNotEmpty(readLock)) {
+ if ("none".equals(readLock) || "false".equals(readLock)) {
+ return null;
+ } else if ("fileLock".equals(readLock)) {
+ FileLockExclusiveReadLockStrategy readLockStrategy = new
FileLockExclusiveReadLockStrategy();
+ Long timeout = (Long) params.get("readLockTimeout");
+ if (timeout != null) {
+ readLockStrategy.setTimeout(timeout);
+ }
+ } else if ("rename".equals(readLock)) {
+ FileRenameExclusiveReadLockStrategy readLockStrategy = new
FileRenameExclusiveReadLockStrategy();
+ Long timeout = (Long) params.get("readLockTimeout");
+ if (timeout != null) {
+ readLockStrategy.setTimeout(timeout);
+ }
+ return readLockStrategy;
+ }
}
+
+ return null;
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileProcessStrategySupport.java
Fri Dec 19 01:26:35 2008
@@ -22,6 +22,7 @@
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
+import org.apache.camel.component.file.ExclusiveReadLockStrategy;
import org.apache.camel.component.file.FileEndpoint;
import org.apache.camel.component.file.FileExchange;
import org.apache.camel.component.file.FileProcessStrategy;
@@ -39,6 +40,7 @@
private static final transient Log LOG =
LogFactory.getLog(FileProcessStrategySupport.class);
private boolean lockFile;
private FileRenamer lockFileRenamer;
+ private ExclusiveReadLockStrategy exclusiveReadLockStrategy;
protected FileProcessStrategySupport() {
this(true);
@@ -54,6 +56,15 @@
}
public boolean begin(FileEndpoint endpoint, FileExchange exchange, File
file) throws Exception {
+ // is we use excluse read then acquire the exclusive read (waiting
until we got it)
+ if (exclusiveReadLockStrategy != null) {
+ boolean lock =
exclusiveReadLockStrategy.acquireExclusiveReadLock(file);
+ if (!lock) {
+ // do not begin sice we could not get the exclusive read lcok
+ return false;
+ }
+ }
+
if (isLockFile()) {
File newFile = lockFileRenamer.renameFile(exchange, file);
String lockFileName = newFile.getAbsolutePath();
@@ -71,6 +82,7 @@
return false;
}
}
+
return true;
}
@@ -86,22 +98,6 @@
}
}
- public boolean isLockFile() {
- return lockFile;
- }
-
- public void setLockFile(boolean lockFile) {
- this.lockFile = lockFile;
- }
-
- public FileRenamer getLockFileRenamer() {
- return lockFileRenamer;
- }
-
- public void setLockFileRenamer(FileRenamer lockFileRenamer) {
- this.lockFileRenamer = lockFileRenamer;
- }
-
protected void unlockFile(FileEndpoint endpoint, FileExchange exchange,
File file) throws Exception {
if (isLockFile()) {
FileLock lock = ExchangeHelper.getMandatoryProperty(exchange,
"org.apache.camel.file.lock", FileLock.class);
@@ -124,4 +120,28 @@
}
}
}
+
+ public boolean isLockFile() {
+ return lockFile;
+ }
+
+ public void setLockFile(boolean lockFile) {
+ this.lockFile = lockFile;
+ }
+
+ public FileRenamer getLockFileRenamer() {
+ return lockFileRenamer;
+ }
+
+ public void setLockFileRenamer(FileRenamer lockFileRenamer) {
+ this.lockFileRenamer = lockFileRenamer;
+ }
+
+ public ExclusiveReadLockStrategy getExclusiveReadLockStrategy() {
+ return exclusiveReadLockStrategy;
+ }
+
+ public void setExclusiveReadLockStrategy(ExclusiveReadLockStrategy
exclusiveReadLockStrategy) {
+ this.exclusiveReadLockStrategy = exclusiveReadLockStrategy;
+ }
}
Added:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java?rev=727984&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
(added)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/strategy/FileRenameExclusiveReadLockStrategy.java
Fri Dec 19 01:26:35 2008
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.strategy;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.camel.component.file.ExclusiveReadLockStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Acquires exclusive read lock to the given file. Will wait until the lock is
granted.
+ * After granting the read lock it is realeased, we just want to make sure
that when we start
+ * consuming the file its not currently in progress of being written by third
party.
+ */
+public class FileRenameExclusiveReadLockStrategy implements
ExclusiveReadLockStrategy {
+ private static final transient Log LOG =
LogFactory.getLog(FileRenameExclusiveReadLockStrategy.class);
+ private long timeout;
+
+ public boolean acquireExclusiveReadLock(File file) throws IOException {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Waiting for exclusive read lock to file: " + file);
+ }
+
+ // the trick is to try to rename the file, if we can rename then we
have exclusive read
+ // since its a remote file we can not use java.nio to get a RW lock
+ String originalName = file.getName();
+ File tempFile = new File(file.getParent(), originalName +
".camelExclusiveReadLock");
+
+ long start = System.currentTimeMillis();
+
+ boolean exclusive = false;
+ while (!exclusive) {
+ // timeout check
+ if (timeout > 0) {
+ long delta = System.currentTimeMillis() - start;
+ if (delta > timeout) {
+ LOG.debug("Could not acquire read lock within " + timeout
+ " millis. Will skip the file: " + file);
+ // we could not get the lock within the timeout period, so
return false
+ return false;
+ }
+ }
+
+ exclusive = file.renameTo(tempFile);
+ if (exclusive) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Acquired exclusive read lock to file: " + file);
+ }
+ // rename it back so we can read it
+ tempFile.renameTo(file);
+ } else {
+ LOG.trace("Exclusive read lock not granted. Sleeping for 1000
millis.");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public long getTimeout() {
+ return timeout;
+ }
+
+ /**
+ * Sets an optional timeout period.
+ * <p/>
+ * If the readlock could not be granted within the timeperiod then the
wait is stopped and the
+ * {...@link #acquireExclusiveReadLock(java.io.File)} returns
<tt>false</tt>.
+ *
+ * @param timeout period in millis
+ */
+ public void setTimeout(long timeout) {
+ this.timeout = timeout;
+ }
+}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/mock/MockEndpoint.java
Fri Dec 19 01:26:35 2008
@@ -718,7 +718,7 @@
LOG.debug("Took " + delta + " millis to complete latch");
if (resultMinimumWaitTime > 0 && delta < resultMinimumWaitTime) {
- fail("Expected minimum " + resultWaitTime
+ fail("Expected minimum " + resultMinimumWaitTime
+ " millis waiting on the result, but was faster with " +
delta + " millis.");
}
}
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/ScheduledPollConsumer.java
Fri Dec 19 01:26:35 2008
@@ -61,7 +61,9 @@
LOG.debug("Starting to poll: " + this.getEndpoint());
}
try {
- poll();
+ if (isRunAllowed()) {
+ poll();
+ }
} catch (Exception e) {
LOG.warn("An exception occured while polling: " +
this.getEndpoint() + ": " + e.getMessage(), e);
if (firstExceptionThrown == null) {
Copied:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
(from r727639,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java&r1=727639&r2=727984&rev=727984&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
Fri Dec 19 01:26:35 2008
@@ -16,24 +16,16 @@
*/
package org.apache.camel.component.file;
-import java.io.File;
-import java.io.FileOutputStream;
-import java.nio.channels.FileLock;
-
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
- * Unit test to verify exclusive read - that we do not poll files that is in
progress of being written.
+ * Unit test to verify exclusive read by for manual testing.
*/
-public class FileExclusiveReadTest extends ContextTestSupport {
-
- private static final Log LOG =
LogFactory.getLog(FileExclusiveReadTest.class);
+public class FileExclusiveReadManuelTest extends ContextTestSupport {
- private String fileUrl =
"file://target/exclusiveread/slowfile?consumer.delay=500&consumer.exclusiveReadLock=true";
+ private String fileUrl = "file://target/exclusiveread?readLock=fileLock";
@Override
protected void setUp() throws Exception {
@@ -41,47 +33,15 @@
super.setUp();
}
- public void testPoolIn3SecondsButNoFiles() throws Exception {
+ public void testManually() throws Exception {
deleteDirectory("./target/exclusiveread");
- createDirectory("./target/exclusiveread/slowfile");
MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.setSleepForEmptyTest(10000);
mock.expectedMessageCount(0);
- Thread.sleep(3 * 1000L);
-
- mock.assertIsSatisfied();
- }
-
- // TODO: Not possible to test in the same JVM (see javadoc for FileLock)
- public void xxxtestPollFileWhileSlowFileIsBeingWritten() throws Exception {
- deleteDirectory("./target/exclusiveread");
- createDirectory("./target/exclusiveread/slowfile");
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(1);
- mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye
Worl");
-
- createSlowFile();
-
mock.assertIsSatisfied();
}
- private void createSlowFile() throws Exception {
- LOG.info("Creating a slow file ...");
- File file = new File("./target/exclusiveread/slowfile/hello.txt");
- FileOutputStream fos = new FileOutputStream(file);
- FileLock lock = fos.getChannel().lock();
- fos.write("Hello World".getBytes());
- for (int i = 0; i < 3; i++) {
- Thread.sleep(1000);
- fos.write(("Line #" + i).getBytes());
- LOG.info("Appending to slowfile");
- }
- fos.write("Bye World".getBytes());
- lock.release();
- fos.close();
- LOG.info("... done creating slowfile");
- }
-
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
@@ -90,4 +50,4 @@
};
}
-}
+}
\ No newline at end of file
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadManuelTest.java
------------------------------------------------------------------------------
svn:mergeinfo =
Added:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java?rev=727984&view=auto
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
(added)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadNoneStrategyTest.java
Fri Dec 19 01:26:35 2008
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.channels.FileLock;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Unit test to verify exclusive read option using *none*
+ */
+public class FileExclusiveReadNoneStrategyTest extends ContextTestSupport {
+
+ private static final Log LOG =
LogFactory.getLog(FileExclusiveReadNoneStrategyTest.class);
+ private String fileUrl =
"file://target/exclusiveread/slowfile?noop=true&consumer.delay=500&readLock=none";
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:start").process(new MySlowFileProcessor());
+ from(fileUrl + "&readLockTimeout=1000").to("mock:result");
+ }
+ };
+ }
+
+ public void testPollFileWhileSlowFileIsBeingWritten() throws Exception {
+ deleteDirectory("./target/exclusiveread");
+ createDirectory("./target/exclusiveread/slowfile");
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("Hello World");
+ mock.expectedMessageCount(1);
+
+ // send a message to seda:start to trigger the creating of the
slowfile to poll
+ template.sendBody("seda:start", "Create the slow file");
+
+ mock.assertIsSatisfied();
+
+ String body =
mock.getReceivedExchanges().get(0).getIn().getBody(String.class);
+ LOG.debug("Body is: " + body);
+ assertFalse("Should not wait and read the entire file",
body.endsWith("Bye World"));
+ }
+
+ private class MySlowFileProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ LOG.info("Creating a slow fil with no locks...");
+ File file = new File("./target/exclusiveread/slowfile/hello.txt");
+ FileOutputStream fos = new FileOutputStream(file);
+ fos.write("Hello World".getBytes());
+ for (int i = 0; i < 3; i++) {
+ Thread.sleep(1000);
+ fos.write(("Line #" + i).getBytes());
+ LOG.info("Appending to slowfile");
+ }
+ fos.write("Bye World".getBytes());
+ fos.close();
+ LOG.info("... done creating slowfile");
+ }
+ }
+
+}
\ No newline at end of file
Copied:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java
(from r727639,
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java)
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java&r1=727639&r2=727984&rev=727984&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java
Fri Dec 19 01:26:35 2008
@@ -21,6 +21,8 @@
import java.nio.channels.FileLock;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.commons.logging.Log;
@@ -29,16 +31,14 @@
/**
* Unit test to verify exclusive read - that we do not poll files that is in
progress of being written.
*/
-public class FileExclusiveReadTest extends ContextTestSupport {
+public class FileExclusiveReadRenameStrategyTest extends ContextTestSupport {
- private static final Log LOG =
LogFactory.getLog(FileExclusiveReadTest.class);
-
- private String fileUrl =
"file://target/exclusiveread/slowfile?consumer.delay=500&consumer.exclusiveReadLock=true";
+ private static final Log LOG =
LogFactory.getLog(FileExclusiveReadRenameStrategyTest.class);
+ private String fileUrl =
"file://target/exclusiveread/slowfile?consumer.delay=500&readLock=rename";
@Override
- protected void setUp() throws Exception {
- disableJMX();
- super.setUp();
+ public boolean isUseRouteBuilder() {
+ return false;
}
public void testPoolIn3SecondsButNoFiles() throws Exception {
@@ -46,48 +46,103 @@
createDirectory("./target/exclusiveread/slowfile");
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(0);
+ mock.setMinimumResultWaitTime(3000);
Thread.sleep(3 * 1000L);
mock.assertIsSatisfied();
}
- // TODO: Not possible to test in the same JVM (see javadoc for FileLock)
- public void xxxtestPollFileWhileSlowFileIsBeingWritten() throws Exception {
+ public void testPollFileWhileSlowFileIsBeingWritten() throws Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:start").process(new MySlowFileProcessor());
+ from(fileUrl).to("mock:result");
+ }
+ });
+ context.start();
+
deleteDirectory("./target/exclusiveread");
createDirectory("./target/exclusiveread/slowfile");
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
- mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye
Worl");
+ mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye
World");
+ mock.setMinimumResultWaitTime(3000);
- createSlowFile();
+ // send a message to seda:start to trigger the creating of the
slowfile to poll
+ template.sendBody("seda:start", "Create the slow file");
mock.assertIsSatisfied();
}
- private void createSlowFile() throws Exception {
- LOG.info("Creating a slow file ...");
- File file = new File("./target/exclusiveread/slowfile/hello.txt");
- FileOutputStream fos = new FileOutputStream(file);
- FileLock lock = fos.getChannel().lock();
- fos.write("Hello World".getBytes());
- for (int i = 0; i < 3; i++) {
- Thread.sleep(1000);
- fos.write(("Line #" + i).getBytes());
- LOG.info("Appending to slowfile");
- }
- fos.write("Bye World".getBytes());
- lock.release();
- fos.close();
- LOG.info("... done creating slowfile");
+ public void testPollFileWhileSlowFileIsBeingWrittenWithTimeout() throws
Exception {
+ context.addRoutes(new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("seda:start").process(new MySlowFileProcessor());
+ from(fileUrl + "&readLockTimeout=1000").to("mock:result");
+ }
+ });
+ context.start();
+
+ deleteDirectory("./target/exclusiveread");
+ createDirectory("./target/exclusiveread/slowfile");
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(0);
+ mock.setMinimumResultWaitTime(2000);
+ mock.setResultWaitTime(5000);
+
+ // send a message to seda:start to trigger the creating of the
slowfile to poll
+ template.sendBody("seda:start", "Create the slow file");
+
+ mock.assertIsSatisfied();
}
- protected RouteBuilder createRouteBuilder() throws Exception {
- return new RouteBuilder() {
+ public void testPollFileWhileSlowFileIsBeingWrittenWithTimeoutAndNoop()
throws Exception {
+ // to test that if noop and thus idempotent we will retry to consume
the file
+ // the 2nd. time since the first time we could not get the read lock
due timeout
+ // so the file should only be marked in the idempotent repository if
we could process it
+ context.addRoutes(new RouteBuilder() {
+ @Override
public void configure() throws Exception {
- from(fileUrl).to("mock:result");
+ from("seda:start").process(new MySlowFileProcessor());
+ from(fileUrl +
"&readLockTimeout=1000&noop=true").to("mock:result");
}
- };
+ });
+ context.start();
+
+ deleteDirectory("./target/exclusiveread");
+ createDirectory("./target/exclusiveread/slowfile");
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ mock.setMinimumResultWaitTime(3000);
+ mock.setResultWaitTime(5000);
+
+ // send a message to seda:start to trigger the creating of the
slowfile to poll
+ template.sendBody("seda:start", "Create the slow file");
+
+ mock.assertIsSatisfied();
+ }
+
+ private class MySlowFileProcessor implements Processor {
+
+ public void process(Exchange exchange) throws Exception {
+ LOG.info("Creating a slow fil ...");
+ File file = new File("./target/exclusiveread/slowfile/hello.txt");
+ FileOutputStream fos = new FileOutputStream(file);
+ FileLock lock = fos.getChannel().lock();
+ fos.write("Hello World".getBytes());
+ for (int i = 0; i < 3; i++) {
+ Thread.sleep(1000);
+ fos.write(("Line #" + i).getBytes());
+ LOG.info("Appending to slowfile");
+ }
+ fos.write("Bye World".getBytes());
+ lock.release();
+ fos.close();
+ LOG.info("... done creating slowfile");
+ }
}
}
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadRenameStrategyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: activemq/camel/trunk/camel-core/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/resources/log4j.properties?rev=727984&r1=727983&r2=727984&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/resources/log4j.properties
(original)
+++ activemq/camel/trunk/camel-core/src/test/resources/log4j.properties Fri Dec
19 01:26:35 2008
@@ -22,6 +22,7 @@
log4j.logger.org.apache.activemq.spring=WARN
log4j.logger.org.apache.camel=DEBUG
+#log4j.logger.org.apache.camel.component=TRACE
log4j.logger.org.apache.camel.impl.converter=WARN
# CONSOLE appender not used by default