Author: davsclaus
Date: Sat Jul 26 06:37:46 2008
New Revision: 679990
URL: http://svn.apache.org/viewvc?rev=679990&view=rev
Log:
CAMEL-654: Introduced exclusiveRead option to camel-ftp. This option is default
and fixes the problem that the ftp producer can poll files that is in progress
of being written. With this option the ftp producer will wait until the file is
finished written and that it can poll it safely. Beware the option is default.
Added:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java?rev=679990&r1=679989&r2=679990&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/FtpConsumer.java
Sat Jul 26 06:37:46 2008
@@ -39,6 +39,7 @@
private boolean recursive = true;
private String regexPattern;
private boolean setNames = true;
+ private boolean exclusiveRead = true;
public FtpConsumer(FtpEndpoint endpoint, Processor processor, FTPClient
client) {
super(endpoint, processor);
@@ -46,27 +47,30 @@
this.client = client;
}
- public FtpConsumer(FtpEndpoint endpoint, Processor processor, FTPClient
client, ScheduledExecutorService executor) {
+ public FtpConsumer(FtpEndpoint endpoint, Processor processor, FTPClient
client,
+ ScheduledExecutorService executor) {
super(endpoint, processor, executor);
this.endpoint = endpoint;
this.client = client;
}
protected void connectIfNecessary() throws IOException {
- // TODO: is there a way to avoid copy-pasting the reconnect logic?
if (!client.isConnected()) {
- LOG.warn("FtpConsumer's client isn't connected, trying to
reconnect...");
+ LOG.debug("Not connected, trying to reconnect.");
endpoint.connect(client);
- LOG.info("Connected to " + endpoint.getConfiguration());
+ LOG.info("Connected to " +
endpoint.getConfiguration().remoteServerInformation());
}
}
protected void disconnect() throws IOException {
- LOG.info("FtpConsumer's client is being explicitly disconnected");
+ LOG.debug("Disconnecting from " +
endpoint.getConfiguration().remoteServerInformation());
endpoint.disconnect(client);
}
protected void poll() throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling " + endpoint.getConfiguration());
+ }
connectIfNecessary();
// If the attempt to connect isn't successful, then the thrown
// exception will signify that we couldn't poll
@@ -86,12 +90,12 @@
} catch (FTPConnectionClosedException e) {
// If the server disconnected us, then we must manually disconnect
// the client before attempting to reconnect
- LOG.warn("Disconnecting due to exception: " + e.toString());
+ LOG.warn("Disconnecting due to exception: " + e.getMessage());
disconnect();
// Rethrow to signify that we didn't poll
throw e;
} catch (RuntimeCamelException e) {
- LOG.warn("Caught RuntimeCamelException: " + e.toString());
+ LOG.warn("Caught RuntimeCamelException: " + e.getMessage(), e);
LOG.warn("Hoping an explicit disconnect/reconnect will solve the
problem");
disconnect();
// Rethrow to signify that we didn't poll
@@ -111,8 +115,7 @@
pollDirectory(getFullFileName(ftpFile));
}
} else {
- // TODO: Type can be symbolic link etc. so what should we do?
- LOG.warn("Unsupported type of FTPFile: " + ftpFile + " not a
file or directory");
+ LOG.debug("Unsupported type of FTPFile: " + ftpFile + " (not a
file or directory). Is skipped.");
}
}
@@ -125,29 +128,71 @@
}
private void pollFile(FTPFile ftpFile) throws Exception {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling file: " + ftpFile);
+ }
+
+ long ts = ftpFile.getTimestamp().getTimeInMillis();
// TODO do we need to adjust the TZ? can we?
- if (ftpFile.getTimestamp().getTimeInMillis() > lastPollTime) {
- if (isMatched(ftpFile)) {
- String fullFileName = getFullFileName(ftpFile);
- final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
- client.retrieveFile(ftpFile.getName(), byteArrayOutputStream);
- RemoteFileExchange exchange =
endpoint.createExchange(fullFileName, byteArrayOutputStream);
-
- if (isSetNames()) {
- // set the filename in the special header filename marker
to the ftp filename
- String ftpBasePath = endpoint.getConfiguration().getFile();
- String relativePath =
fullFileName.substring(ftpBasePath.length() + 1);
- relativePath = relativePath.replaceFirst("/", "");
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Setting exchange filename to " +
relativePath);
- }
- exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME,
relativePath);
+ if (ts > lastPollTime && isMatched(ftpFile)) {
+ String remoteServer =
endpoint.getConfiguration().remoteServerInformation();
+ String fullFileName = getFullFileName(ftpFile);
+
+ // is we use excluse read then acquire the exclusive read (waiting
until we got it)
+ if (exclusiveRead) {
+ acquireExclusiveRead(client, ftpFile);
+ }
+
+ // retrieve the file
+ final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ client.retrieveFile(ftpFile.getName(), byteArrayOutputStream);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrieved file: " + ftpFile.getName() + " from: " +
remoteServer);
+ }
+
+ RemoteFileExchange exchange =
endpoint.createExchange(fullFileName, byteArrayOutputStream);
+
+ if (isSetNames()) {
+ // set the filename in the special header filename marker to
the ftp filename
+ String ftpBasePath = endpoint.getConfiguration().getFile();
+ String relativePath =
fullFileName.substring(ftpBasePath.length() + 1);
+ relativePath = relativePath.replaceFirst("/", "");
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Setting exchange filename to " + relativePath);
}
+ exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME,
relativePath);
+ }
- getProcessor().process(exchange);
+ getProcessor().process(exchange);
+ }
+ }
+
+ protected void acquireExclusiveRead(FTPClient client, FTPFile ftpFile)
throws IOException {
+ LOG.trace("Acquiring exclusive read (avoid reading file that is in
progress of being written)");
+
+ // the trick is to try to rename the file, if we can rename then we
have exclusive read
+ // since its a remote file we can not use java.nio to get a RW access
+ String originalName = ftpFile.getName();
+ String newName = originalName + ".camel";
+ boolean exclusive = false;
+ while (! exclusive) {
+ exclusive = client.rename(originalName, newName);
+ if (exclusive) {
+ // rename it back so we can read it
+ client.rename(newName, originalName);
+ } else {
+ LOG.trace("Exclusive read not granted. Sleeping for 1000
millis");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Acquired exclusive read to: " + originalName);
+ }
}
protected boolean isMatched(FTPFile file) {
@@ -189,4 +234,12 @@
public void setSetNames(boolean setNames) {
this.setNames = setNames;
}
+
+ public boolean isExclusiveRead() {
+ return exclusiveRead;
+ }
+
+ public void setExclusiveRead(boolean exclusiveRead) {
+ this.exclusiveRead = exclusiveRead;
+ }
}
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java?rev=679990&r1=679989&r2=679990&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpConsumer.java
Sat Jul 26 06:37:46 2008
@@ -30,7 +30,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
public class SftpConsumer extends RemoteFileConsumer<RemoteFileExchange> {
private static final transient Log LOG =
LogFactory.getLog(SftpConsumer.class);
private final SftpEndpoint endpoint;
@@ -41,6 +40,7 @@
private ChannelSftp channel;
private Session session;
private boolean setNames;
+ private boolean exclusiveRead = true;
public SftpConsumer(SftpEndpoint endpoint, Processor processor, Session
session) {
super(endpoint, processor);
@@ -57,30 +57,32 @@
protected void connectIfNecessary() throws JSchException {
if (channel == null || !channel.isConnected()) {
if (session == null || !session.isConnected()) {
- LOG.info("Session isn't connected, trying to recreate and
connect...");
+ LOG.debug("Session isn't connected, trying to recreate and
connect.");
session = endpoint.createSession();
session.connect();
}
- LOG.info("Channel isn't connected, trying to recreate and
connect...");
+ LOG.debug("Channel isn't connected, trying to recreate and
connect.");
channel = endpoint.createChannelSftp(session);
channel.connect();
- LOG.info("Connected to " + endpoint.getConfiguration().toString());
+ LOG.info("Connected to " +
endpoint.getConfiguration().remoteServerInformation());
}
}
protected void disconnect() throws JSchException {
if (session != null) {
- LOG.info("Session is being explicitly disconnected");
+ LOG.debug("Session is being explicitly disconnected");
session.disconnect();
}
if (channel != null) {
- LOG.info("Channel is being explicitly disconnected");
+ LOG.debug("Channel is being explicitly disconnected");
channel.disconnect();
}
}
protected void poll() throws Exception {
- // TODO: is there a way to avoid copy-pasting the reconnect logic?
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling " + endpoint.getConfiguration());
+ }
connectIfNecessary();
// If the attempt to connect isn't successful, then the thrown
// exception will signify that we couldn't poll
@@ -97,15 +99,15 @@
} catch (JSchException e) {
// If the connection has gone stale, then we must manually
disconnect
// the client before attempting to reconnect
- LOG.warn("Disconnecting due to exception: " + e.toString());
+ LOG.warn("Disconnecting due to exception: " + e.getMessage());
disconnect();
// Rethrow to signify that we didn't poll
throw e;
} catch (SftpException e) {
// Still not sure if/when these come up and what we should do
about them
// client.disconnect();
- LOG.warn("Caught SftpException:" + e.toString());
- LOG.warn("Doing nothing for now, need to determine an appropriate
action");
+ LOG.warn("Caught SftpException:" + e.getMessage(), e);
+ LOG.warn("Hoping an explicit disconnect/reconnect will solve the
problem");
// Rethrow to signify that we didn't poll
throw e;
}
@@ -136,23 +138,73 @@
}
private void pollFile(ChannelSftp.LsEntry sftpFile) throws Exception {
- if (sftpFile.getAttrs().getMTime() * 1000L > lastPollTime) {
- if (isMatched(sftpFile)) {
- final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
- channel.get(sftpFile.getFilename(), byteArrayOutputStream);
- RemoteFileExchange exchange =
endpoint.createExchange(getFullFileName(sftpFile), byteArrayOutputStream);
-
- if (isSetNames()) {
- String relativePath =
getFullFileName(sftpFile).substring(endpoint.getConfiguration().getFile().length());
- if (relativePath.startsWith("/")) {
- relativePath = relativePath.substring(1);
- }
- exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME,
relativePath);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Polling file: " + sftpFile);
+ }
+
+ long ts = sftpFile.getAttrs().getMTime() * 1000L;
+
+ // TODO do we need to adjust the TZ? can we?
+ if (ts > lastPollTime && isMatched(sftpFile)) {
+ String remoteServer =
endpoint.getConfiguration().remoteServerInformation();
+
+ // is we use excluse read then acquire the exclusive read (waiting
until we got it)
+ if (exclusiveRead) {
+ acquireExclusiveRead(sftpFile);
+ }
+
+ // retrieve the file
+ final ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
+ channel.get(sftpFile.getFilename(), byteArrayOutputStream);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Retrieved file: " + sftpFile.getFilename() + "
from: " + remoteServer);
+ }
+
+ RemoteFileExchange exchange =
endpoint.createExchange(getFullFileName(sftpFile), byteArrayOutputStream);
+
+ if (isSetNames()) {
+ String relativePath =
getFullFileName(sftpFile).substring(endpoint.getConfiguration().getFile().length());
+ if (relativePath.startsWith("/")) {
+ relativePath = relativePath.substring(1);
}
+ exchange.getIn().setHeader(FileComponent.HEADER_FILE_NAME,
relativePath);
+ }
- getProcessor().process(exchange);
+ getProcessor().process(exchange);
+ }
+ }
+
+ protected void acquireExclusiveRead(ChannelSftp.LsEntry sftpFile) throws
SftpException {
+ LOG.trace("Acquiring exclusive read (avoid reading file that is in
progress of being written)");
+
+ // the trick is to try to rename the file, if we can rename then we
have exclusive read
+ // since its a remote file we can not use java.nio to get a RW access
+ String originalName = sftpFile.getFilename();
+ String newName = originalName + ".camel";
+ boolean exclusive = false;
+ while (! exclusive) {
+ try {
+ channel.rename(originalName, newName);
+ exclusive = true;
+ } catch (SftpException e) {
+ // ignore we can not rename it
+ }
+
+ if (exclusive) {
+ // rename it back so we can read it
+ channel.rename(newName, originalName);
+ } else {
+ LOG.trace("Exclusive read not granted. Sleeping for 1000
millis");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ // ignore
+ }
}
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Acquired exclusive read to: " + originalName);
+ }
}
protected boolean isMatched(ChannelSftp.LsEntry sftpFile) {
@@ -194,4 +246,12 @@
public void setSetNames(boolean setNames) {
this.setNames = setNames;
}
+
+ public boolean isExclusiveRead() {
+ return exclusiveRead;
+ }
+
+ public void setExclusiveRead(boolean exclusiveRead) {
+ this.exclusiveRead = exclusiveRead;
+ }
}
Modified:
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java?rev=679990&r1=679989&r2=679990&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/main/java/org/apache/camel/component/file/remote/SftpEndpoint.java
Sat Jul 26 06:37:46 2008
@@ -46,7 +46,6 @@
protected Session createSession() throws JSchException {
final JSch jsch = new JSch();
final Session session =
jsch.getSession(getConfiguration().getUsername(), getConfiguration().getHost());
- // TODO there's got to be a better way to deal with accepting new
hosts...
session.setUserInfo(new UserInfo() {
public String getPassphrase() {
return null;
Added:
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java?rev=679990&view=auto
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
(added)
+++
activemq/camel/trunk/components/camel-ftp/src/test/java/org/apache/camel/component/file/remote/FromFtpExclusiveReadTest.java
Sat Jul 26 06:37:46 2008
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.file.remote;
+
+import java.io.File;
+import java.io.FileOutputStream;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Unit test to verify exclusive read - that we do not poll files that is in
progress of being written.
+ */
+public class FromFtpExclusiveReadTest extends FtpServerTestSupport {
+
+ private static final Log LOG =
LogFactory.getLog(FromFtpExclusiveReadTest.class);
+
+ private String ftpUrl = "ftp://[EMAIL PROTECTED]:" + getPort() +
"/slowfile?password=admin&binary=false&consumer.exclusiveRead=true&consumer.delay=500";
+
+ public String getPort() {
+ return "20019";
+ }
+
+ public void testPoolIn3SecondsButNoFiles() throws Exception {
+ deleteDirectory("./res/home");
+ createDirectory("./res/home/slowfile");
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(0);
+
+ Thread.sleep(3 * 1000L);
+
+ mock.assertIsSatisfied();
+ }
+
+ public void testPollFileWhileSlowFileIsBeingWritten() throws Exception {
+ deleteDirectory("./res/home");
+ createDirectory("./res/home/slowfile");
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+ mock.expectedBodiesReceived("Hello WorldLine #0Line #1Line #2Bye
World");
+
+ createSlowFile();
+
+ mock.assertIsSatisfied();
+ }
+
+ private void createSlowFile() throws Exception {
+ LOG.info("Creating a slow file ...");
+ File file = new File("./res/home/slowfile/hello.txt");
+ FileOutputStream fos = new FileOutputStream(file);
+ fos.write("Hello World".getBytes());
+ for (int i = 0; i < 3; i++) {
+ Thread.sleep(1000);
+ fos.write(("Line #" + i).getBytes());
+ LOG.info("Appending to slowfile");
+ }
+ fos.write("Bye World".getBytes());
+ fos.close();
+ LOG.info("... done creating slowfile");
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from(ftpUrl).to("mock:result");
+ }
+ };
+ }
+
+ private static void createDirectory(String s) {
+ File file = new File(s);
+ file.mkdirs();
+ }
+}
Modified:
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties?rev=679990&r1=679989&r2=679990&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties
(original)
+++
activemq/camel/trunk/components/camel-ftp/src/test/resources/log4j.properties
Sat Jul 26 06:37:46 2008
@@ -22,17 +22,17 @@
# uncomment the following to enable camel debugging
log4j.logger.org.apache.camel.component.file=DEBUG
-#log4j.logger.org.apache.mina=WARN
-#log4j.logger.org.apache.ftpserver=WARN
+log4j.logger.org.apache.mina=WARN
+log4j.logger.org.apache.ftpserver=WARN
# CONSOLE appender not used by default
log4j.appender.out=org.apache.log4j.ConsoleAppender
log4j.appender.out.layout=org.apache.log4j.PatternLayout
-log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
-#log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} -
%m%n
+#log4j.appender.out.layout.ConversionPattern=[%30.30t] %-30.30c{1} %-5p %m%n
+log4j.appender.out.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} -
%m%n
# File appender
log4j.appender.file=org.apache.log4j.FileAppender
log4j.appender.file.layout=org.apache.log4j.PatternLayout
-log4j.appender.file.layout.ConversionPattern=%d %-5p %c{1} - %m %n
+log4j.appender.file.layout.ConversionPattern=%d [%-15.15t] %-5p %-30.30c{1} -
%m%n
log4j.appender.file.file=target/camel-ftp-test.log
\ No newline at end of file