Author: davsclaus
Date: Tue Jul 29 22:53:15 2008
New Revision: 680913
URL: http://svn.apache.org/viewvc?rev=680913&view=rev
Log:
CAMEL-654 and CAMEL-760: Option exclusiveRead renamed to exclusiveReadLock and
is now default true.
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoFilesTest.java
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
Modified:
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
---
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
(original)
+++
activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/component/file/FileConsumer.java
Tue Jul 29 22:53:15 2008
@@ -51,7 +51,7 @@
private boolean generateEmptyExchangeWhenIdle;
private boolean recursive = true;
private String regexPattern = "";
- private boolean exclusiveRead;
+ private boolean exclusiveReadLock = true;
public FileConsumer(final FileEndpoint endpoint, Processor processor) {
super(endpoint, processor);
@@ -60,6 +60,8 @@
protected synchronized void poll() throws Exception {
int rc = pollFileOrDirectory(endpoint.getFile(), isRecursive());
+
+ // if no files consumes and using generateEmptyExchangeWhenIdle option
then process an empty exchange
if (rc == 0 && generateEmptyExchangeWhenIdle) {
final FileExchange exchange = endpoint.createExchange((File)null);
getAsyncProcessor().process(exchange, new AsyncCallback() {
@@ -67,6 +69,7 @@
}
});
}
+
lastPollTime = System.currentTimeMillis();
}
@@ -79,11 +82,15 @@
*/
protected int pollFileOrDirectory(File fileOrDirectory, boolean
processDir) {
if (!fileOrDirectory.isDirectory()) {
- return pollFile(fileOrDirectory); // process the file
+ // process the file
+ return pollFile(fileOrDirectory);
} else if (processDir) {
+ // directory that can be recursive
int rc = 0;
if (isValidFile(fileOrDirectory)) {
- LOG.debug("Polling directory " + fileOrDirectory);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling directory " + fileOrDirectory);
+ }
File[] files = fileOrDirectory.listFiles();
for (File file : files) {
rc += pollFileOrDirectory(file, isRecursive()); //
self-recursion
@@ -91,7 +98,9 @@
}
return rc;
} else {
- LOG.debug("Skipping directory " + fileOrDirectory);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Skipping directory " + fileOrDirectory);
+ }
return 0;
}
}
@@ -103,6 +112,9 @@
* @return returns 1 if the file was processed, 0 otherwise.
*/
protected int pollFile(final File file) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling file: " + file);
+ }
if (!file.exists()) {
return 0;
@@ -124,8 +136,8 @@
endpoint.configureMessage(file, exchange.getIn());
try {
// is we use excluse read then acquire the exclusive read (waiting
until we got it)
- if (exclusiveRead) {
- acquireExclusiveRead(file);
+ if (exclusiveReadLock) {
+ acquireExclusiveReadLock(file);
}
if (LOG.isDebugEnabled()) {
@@ -158,7 +170,7 @@
} else {
if (LOG.isDebugEnabled()) {
- LOG.debug(endpoint + " cannot process file: " + file);
+ LOG.debug(endpoint + " can not process file: " + file);
}
}
} catch (Throwable e) {
@@ -168,23 +180,28 @@
return 1;
}
- protected void acquireExclusiveRead(File file) throws IOException {
+ /**
+ * Acquires exclusive read lock to the given file. Will wait until the
lock is granted.
+ * After granting the read lock it is realeased, we just want to make sure
that when we start
+ * consuming the file its not currently in progress of being written by
third party.
+ */
+ protected void acquireExclusiveReadLock(File file) throws IOException {
if (LOG.isTraceEnabled()) {
- LOG.trace("Waiting for exclusive lock to file: " + file);
+ LOG.trace("Waiting for exclusive read lock to file: " + file);
}
// try to acquire rw lock on the file before we can consume it
FileChannel channel = new RandomAccessFile(file, "rw").getChannel();
try {
FileLock lock = channel.lock();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Acquired exclusive lock: " + lock + " to file: " +
file);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Acquired exclusive read lock: " + lock + " to file:
" + file);
}
// just release it now we dont want to hold it during the rest of
the processing
lock.release();
} finally {
// must close channel
- ObjectHelper.close(channel, "FileConsumer during acquiring of
exclusive lock", LOG);
+ ObjectHelper.close(channel, "FileConsumer during acquiring of
exclusive read lock", LOG);
}
}
@@ -341,11 +358,11 @@
this.unchangedSize = unchangedSize;
}
- public boolean isExclusiveRead() {
- return exclusiveRead;
+ public boolean isExclusiveReadLock() {
+ return exclusiveReadLock;
}
- public void setExclusiveRead(boolean exclusiveRead) {
- this.exclusiveRead = exclusiveRead;
+ public void setExclusiveReadLock(boolean exclusiveReadLock) {
+ this.exclusiveReadLock = exclusiveReadLock;
}
}
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
Tue Jul 29 22:53:15 2008
@@ -128,7 +128,7 @@
pollDirectory(getFullFileName(ftpFile));
}
} else {
- LOG.debug("Unsupported type of FTPFile: " + ftpFile + " (not a
file or directory). Is skipped.");
+ LOG.debug("Unsupported type of FTPFile: " + ftpFile + " (not a
file or directory). It is skipped.");
}
}
@@ -155,8 +155,8 @@
String fullFileName = getFullFileName(ftpFile);
// is we use excluse read then acquire the exclusive read (waiting
until we got it)
- if (exclusiveRead) {
- acquireExclusiveRead(client, ftpFile);
+ if (exclusiveReadLock) {
+ acquireExclusiveReadLock(client, ftpFile);
}
// retrieve the file
@@ -188,7 +188,7 @@
boolean deleted = client.deleteFile(ftpFile.getName());
if (!deleted) {
// ignore just log a warning
- LOG.warn("Could not delete file: " + ftpFile.getName() + "
from: " + remoteServer());
+ LOG.warn("Can not delete file: " + ftpFile.getName() + "
from: " + remoteServer());
}
} else if (isMoveFile()) {
String fromName = ftpFile.getName();
@@ -206,7 +206,7 @@
if (lastPathIndex != -1) {
String directory = toName.substring(0, lastPathIndex);
if (!FtpUtils.buildDirectory(client, directory)) {
- LOG.warn("Couldn't build directory: " + directory
+ " (could be because of denied permissions)");
+ LOG.warn("Can not build directory: " + directory +
" (maybe because of denied permissions)");
}
}
}
@@ -214,7 +214,7 @@
// try to rename
boolean success = client.rename(fromName, toName);
if (!success) {
- LOG.warn("Could not move file: " + fromName + " to: " +
toName);
+ LOG.warn("Can not move file: " + fromName + " to: " +
toName);
}
}
@@ -222,26 +222,26 @@
}
}
- protected void acquireExclusiveRead(FTPClient client, FTPFile ftpFile)
throws IOException {
+ protected void acquireExclusiveReadLock(FTPClient client, FTPFile ftpFile)
throws IOException {
if (LOG.isTraceEnabled()) {
- LOG.trace("Waiting for exclusive lock to file: " + ftpFile);
+ LOG.trace("Waiting for exclusive read lock to file: " + ftpFile);
}
// the trick is to try to rename the file, if we can rename then we
have exclusive read
// since its a remote file we can not use java.nio to get a RW lock
String originalName = ftpFile.getName();
- String newName = originalName + ".camelExclusiveRead";
+ String newName = originalName + ".camelExclusiveReadLock";
boolean exclusive = false;
while (!exclusive) {
exclusive = client.rename(originalName, newName);
if (exclusive) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Acquired exclusive lock to file: " +
originalName);
+ LOG.debug("Acquired exclusive read lock to file: " +
originalName);
}
// rename it back so we can read it
client.rename(newName, originalName);
} else {
- LOG.trace("Exclusive lock not granted. Sleeping for 1000
millis.");
+ LOG.trace("Exclusive read lock not granted. Sleeping for 1000
millis.");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/RemoteFileConsumer.java
Tue Jul 29 22:53:15 2008
@@ -31,7 +31,7 @@
protected boolean recursive = true;
protected String regexPattern;
protected boolean setNames = true;
- protected boolean exclusiveRead = true;
+ protected boolean exclusiveReadLock = true;
protected boolean deleteFile;
protected String moveNamePrefix;
protected String moveNamePostfix;
@@ -132,12 +132,12 @@
this.setNames = setNames;
}
- public boolean isExclusiveRead() {
- return exclusiveRead;
+ public boolean isExclusiveReadLock() {
+ return exclusiveReadLock;
}
- public void setExclusiveRead(boolean exclusiveRead) {
- this.exclusiveRead = exclusiveRead;
+ public void setExclusiveReadLock(boolean exclusiveReadLock) {
+ this.exclusiveReadLock = exclusiveReadLock;
}
public boolean isDeleteFile() {
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
Tue Jul 29 22:53:15 2008
@@ -169,8 +169,8 @@
String fullFileName = getFullFileName(sftpFile);
// is we use excluse read then acquire the exclusive read (waiting
until we got it)
- if (exclusiveRead) {
- acquireExclusiveRead(sftpFile);
+ if (exclusiveReadLock) {
+ acquireExclusiveReadLock(sftpFile);
}
// retrieve the file
@@ -215,7 +215,7 @@
if (lastPathIndex != -1) {
String directory = toName.substring(0, lastPathIndex);
if (!SftpUtils.buildDirectory(channel, directory)) {
- LOG.warn("Couldn't build directory: " + directory
+ " (could be because of denied permissions)");
+ LOG.warn("Can not build directory: " + directory +
" (maybe because of denied permissions)");
}
}
}
@@ -225,7 +225,7 @@
channel.rename(fromName, toName);
} catch (SftpException e) {
// ignore just log a warning
- LOG.warn("Could not move file: " + fromName + " to: " +
toName);
+ LOG.warn("Can not move file: " + fromName + " to: " +
toName);
}
}
@@ -244,15 +244,15 @@
}
}
- protected void acquireExclusiveRead(ChannelSftp.LsEntry sftpFile) throws
SftpException {
+ protected void acquireExclusiveReadLock(ChannelSftp.LsEntry sftpFile)
throws SftpException {
if (LOG.isTraceEnabled()) {
- LOG.trace("Waiting for exclusive lock to file: " + sftpFile);
+ LOG.trace("Waiting for exclusive read lock to file: " + sftpFile);
}
// the trick is to try to rename the file, if we can rename then we
have exclusive read
// since its a remote file we can not use java.nio to get a RW access
String originalName = sftpFile.getFilename();
- String newName = originalName + ".camelExclusiveRead";
+ String newName = originalName + ".camelExclusiveReadLock";
boolean exclusive = false;
while (!exclusive) {
try {
@@ -264,12 +264,12 @@
if (exclusive) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Acquired exclusive lock to file: " +
originalName);
+ LOG.debug("Acquired exclusive read lock to file: " +
originalName);
}
// rename it back so we can read it
channel.rename(newName, originalName);
} else {
- LOG.trace("Exclusive lock not granted. Sleeping for 1000
millis");
+ LOG.trace("Exclusive read lock not granted. Sleeping for 1000
millis");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Modified:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
Tue Jul 29 22:53:15 2008
@@ -20,7 +20,6 @@
import java.io.FileOutputStream;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,7 +31,7 @@
private static final Log LOG =
LogFactory.getLog(FromFtpExclusiveReadTest.class);
private String port = "20090";
- private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/slowfile?password=admin&consumer.exclusiveRead=true&consumer.delay=500";
+ private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/slowfile?password=admin&consumer.exclusiveReadLock=true&consumer.delay=500";
public String getPort() {
return port;
Modified:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoFilesTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoFilesTest.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoFilesTest.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNoFilesTest.java
Tue Jul 29 22:53:15 2008
@@ -17,7 +17,6 @@
package org.apache.camel.component.file.remote;
import java.io.File;
-import java.io.FileOutputStream;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
@@ -32,7 +31,7 @@
private static final Log LOG =
LogFactory.getLog(FromFtpExclusiveReadTest.class);
private String port = "20020";
- private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/slowfile?password=admin&binary=false&consumer.exclusiveRead=true&consumer.delay=500";
+ private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/slowfile?password=admin&binary=false&consumer.exclusiveReadLock=true&consumer.delay=500";
public String getPort() {
return port;
Modified:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java?rev=680913&r1=680912&r2=680913&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpNonExclusiveReadTest.java
Tue Jul 29 22:53:15 2008
@@ -32,7 +32,7 @@
private static final Log LOG =
LogFactory.getLog(FromFtpExclusiveReadTest.class);
private String port = "20027";
- private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/slowfile?password=admin&consumer.exclusiveRead=false&consumer.delay=500";
+ private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + port +
"/slowfile?password=admin&consumer.exclusiveReadLock=false&consumer.delay=500";
public String getPort() {
return port;