Author: davsclaus
Date: Sun Jul 27 05:31:26 2008
New Revision: 680093
URL: http://svn.apache.org/viewvc?rev=680093&view=rev
Log:
CAMEL-760: Using java.nio for exclusive lock in FileConsumer
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=680093&r1=680092&r2=680093&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
Sun Jul 27 05:31:26 2008
@@ -18,12 +18,16 @@
import java.io.File;
import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.camel.processor.DeadLetterChannel;
+import org.apache.camel.util.ObjectHelper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -165,30 +169,21 @@
}
protected void acquireExclusiveRead(File file) throws IOException {
- LOG.trace("Acquiring exclusive read (avoid reading file that is in
progress of being written)");
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Acquiring exclusive read (avoid reading file that is in
progress of being written) to " + file);
+ }
- // the trick is to try to rename the file, if we can rename then we
have exclusive read
- // NOTE: using java.nio (channel lokc) doesn't help us as we can have
write access but the
- // file is still in progress of being written (slow writer)
- // TODO: Seems to not work on Unix boxes (see the unit test
FileExclusiveReadTest)
- String originalName = file.getAbsolutePath();
- File newName = new File(originalName + ".camelExclusiveRead");
- boolean exclusive = false;
- while (! exclusive) {
- exclusive = file.renameTo(newName);
- if (exclusive) {
- LOG.trace("Got it renaming it back to original name");
- // rename it back
- newName.renameTo(file);
- } else {
- LOG.trace("Exclusive read not granted. Sleeping for 1000
millis.");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- // ignore
- }
- }
+ // try to acquire rw lock on the file before we can consume it
+ FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
+ try {
+ FileLock lock = channel.lock();
+ // just release it now we dont want to hold it during the rest of
the processing
+ lock.release();
+ } finally {
+ // must close channel
+ ObjectHelper.close(channel, "FileConsumer during acquiring of
exclusive read", LOG);
}
+
if (LOG.isDebugEnabled()) {
LOG.debug("Acquired exclusive read to: " + file);
}
Modified:
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java?rev=680093&r1=680092&r2=680093&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
(original)
+++
activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileExclusiveReadTest.java
Sun Jul 27 05:31:26 2008
@@ -18,6 +18,8 @@
import java.io.File;
import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileLock;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.builder.RouteBuilder;
@@ -51,13 +53,12 @@
mock.assertIsSatisfied();
}
- // TODO: Fix me on Bamboo
- public void xxxtestPollFileWhileSlowFileIsBeingWritten() throws Exception {
+ public void testPollFileWhileSlowFileIsBeingWritten() throws Exception {
deleteDirectory("./target/exclusiveread");
createDirectory("./target/exclusiveread/slowfile");
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(1);
- mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye
World");
+ mock.expectedBodiesReceived("Hello World");
createSlowFile();
@@ -66,15 +67,24 @@
private void createSlowFile() throws Exception {
LOG.info("Creating a slow file ...");
+
File file = new File("./target/exclusiveread/slowfile/hello.txt");
FileOutputStream fos = new FileOutputStream(file);
- fos.write("Hello World".getBytes());
- for (int i = 0; i < 3; i++) {
- Thread.sleep(1000);
- fos.write(("Line #" + i).getBytes());
+
+ // get a lock so we are the only one working on this file
+ FileLock lock = fos.getChannel().lock();
+
+ byte[] buffer = "Hello World".getBytes();
+ ByteBuffer bb = ByteBuffer.wrap(buffer);
+ for (int i = 0; i < buffer.length; i++) {
LOG.info("Appending to slowfile");
+ Thread.sleep(300);
}
- fos.write("Bye World".getBytes());
+ LOG.info("Writing to file");
+ fos.write(buffer);
+ LOG.info("Releasing lock");
+ lock.release();
+ LOG.info("Closing file");
fos.close();
LOG.info("... done creating slowfile");
}