Gret,
Are you working on this patch? I had fixed it sometime back and
attaching the patch..
-----Original Message-----
From: Gert Vanthienen (JIRA) [mailto:[EMAIL PROTECTED]
Sent: Monday, July 16, 2007 1:17 PM
To: servicemix-dev@geronimo.apache.org
Subject: [jira] Created: (SM-1004) File poller deletes files, even if
errors occur while processing
File poller deletes files, even if errors occur while processing
----------------------------------------------------------------
Key: SM-1004
URL: https://issues.apache.org/activemq/browse/SM-1004
Project: ServiceMix
Issue Type: Bug
Components: servicemix-file
Affects Versions: 3.1.1
Reporter: Gert Vanthienen
Other poller components (e.g. FTP poller) leave the message where it is
in case of error/fault, so it can be retried afterwards (e.g. services
unavailable, XML message content not yet complete, ...).
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.
The information contained in this electronic message and any attachments to this message are intended for the exclusive use of the addressee(s) and may contain proprietary, confidential or privileged information. If you are not the intended recipient, you should not disseminate, distribute or copy this e-mail. Please notify the sender immediately and destroy all copies of this message and any attachments.
WARNING: Computer viruses can be transmitted via email. The recipient should
check this email and any attachments for the presence of viruses. The company
accepts no liability for any damage caused by any virus transmitted by this
email.
www.wipro.com
------------------------------------------------------------------------
Index:
E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
===================================================================
---
E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
(revision 555678)
+++
E:/Rabi/work/workspace/servicemix/deployables/bindingcomponents/servicemix-file/src/main/java/org/apache/servicemix/file/FilePollerEndpoint.java
(working copy)
@@ -41,217 +41,229 @@
import java.util.concurrent.locks.Lock;
/**
- * A polling endpoint which looks for a file or files in a directory
- * and sends the files into the JBI bus as messages, deleting the files
- * by default when they are processed.
- *
+ * A polling endpoint which looks for a file or files in a directory and sends
+ * the files into the JBI bus as messages, deleting the files by default when
+ * they are processed.
+ *
* @org.apache.xbean.XBean element="poller"
- *
+ *
* @version $Revision$
*/
-public class FilePollerEndpoint extends PollingEndpoint implements
FileEndpointType {
+public class FilePollerEndpoint extends PollingEndpoint implements
+ FileEndpointType {
- private File file;
- private FileFilter filter;
- private boolean deleteFile = true;
- private boolean recursive = true;
- private boolean autoCreateDirectory = true;
- private FileMarshaler marshaler = new DefaultFileMarshaler();
- private LockManager lockManager;
+ private File file;
- public FilePollerEndpoint() {
- }
+ private FileFilter filter;
- public FilePollerEndpoint(ServiceUnit serviceUnit, QName service, String endpoint) {
- super(serviceUnit, service, endpoint);
- }
+ private boolean deleteFile = true;
- public FilePollerEndpoint(DefaultComponent component, ServiceEndpoint endpoint) {
- super(component, endpoint);
- }
+ private boolean recursive = true;
- public void poll() throws Exception {
- pollFileOrDirectory(file);
- }
+ private boolean autoCreateDirectory = true;
- public void validate() throws DeploymentException {
- super.validate();
- if (file == null) {
- throw new DeploymentException("You must specify a file property");
- }
- if (isAutoCreateDirectory() && !file.exists()) {
- file.mkdirs();
- }
- if (lockManager == null) {
- lockManager = createLockManager();
- }
- }
-
- protected LockManager createLockManager() {
- return new SimpleLockManager();
- }
+ private FileMarshaler marshaler = new DefaultFileMarshaler();
+ private LockManager lockManager;
- // Properties
- //-------------------------------------------------------------------------
- public File getFile() {
- return file;
- }
+ public FilePollerEndpoint() {
+ }
- /**
- * Sets the file to poll, which can be a directory or a file.
- *
- * @param file
- */
- public void setFile(File file) {
- this.file = file;
- }
+ public FilePollerEndpoint(ServiceUnit serviceUnit, QName service,
+ String endpoint) {
+ super(serviceUnit, service, endpoint);
+ }
- /**
- * @return the lockManager
- */
- public LockManager getLockManager() {
- return lockManager;
- }
+ public FilePollerEndpoint(DefaultComponent component,
+ ServiceEndpoint endpoint) {
+ super(component, endpoint);
+ }
- /**
- * @param lockManager the lockManager to set
- */
- public void setLockManager(LockManager lockManager) {
- this.lockManager = lockManager;
- }
+ public void poll() throws Exception {
+ pollFileOrDirectory(file);
+ }
- public FileFilter getFilter() {
- return filter;
- }
+ public void validate() throws DeploymentException {
+ super.validate();
+ if (file == null) {
+ throw new DeploymentException("You must specify a file
property");
+ }
+ if (isAutoCreateDirectory() && !file.exists()) {
+ file.mkdirs();
+ }
+ if (lockManager == null) {
+ lockManager = createLockManager();
+ }
+ }
- /**
- * Sets the optional filter to choose which files to process
- */
- public void setFilter(FileFilter filter) {
- this.filter = filter;
- }
+ protected LockManager createLockManager() {
+ return new SimpleLockManager();
+ }
- /**
- * Returns whether or not we should delete the file when its processed
- */
- public boolean isDeleteFile() {
- return deleteFile;
- }
+ // Properties
+ //
-------------------------------------------------------------------------
+ public File getFile() {
+ return file;
+ }
- public void setDeleteFile(boolean deleteFile) {
- this.deleteFile = deleteFile;
- }
+ /**
+ * Sets the file to poll, which can be a directory or a file.
+ *
+ * @param file
+ */
+ public void setFile(File file) {
+ this.file = file;
+ }
- public boolean isRecursive() {
- return recursive;
- }
+ /**
+ * @return the lockManager
+ */
+ public LockManager getLockManager() {
+ return lockManager;
+ }
- public void setRecursive(boolean recursive) {
- this.recursive = recursive;
- }
+ /**
+ * @param lockManager
+ * the lockManager to set
+ */
+ public void setLockManager(LockManager lockManager) {
+ this.lockManager = lockManager;
+ }
- public boolean isAutoCreateDirectory() {
- return autoCreateDirectory;
- }
+ public FileFilter getFilter() {
+ return filter;
+ }
- public void setAutoCreateDirectory(boolean autoCreateDirectory) {
- this.autoCreateDirectory = autoCreateDirectory;
- }
+ /**
+ * Sets the optional filter to choose which files to process
+ */
+ public void setFilter(FileFilter filter) {
+ this.filter = filter;
+ }
- public FileMarshaler getMarshaler() {
- return marshaler;
- }
+ /**
+ * Returns whether or not we should delete the file when its processed
+ */
+ public boolean isDeleteFile() {
+ return deleteFile;
+ }
- public void setMarshaler(FileMarshaler marshaler) {
- this.marshaler = marshaler;
- }
+ public void setDeleteFile(boolean deleteFile) {
+ this.deleteFile = deleteFile;
+ }
- // Implementation methods
- //-------------------------------------------------------------------------
+ public boolean isRecursive() {
+ return recursive;
+ }
+ public void setRecursive(boolean recursive) {
+ this.recursive = recursive;
+ }
- protected void pollFileOrDirectory(File fileOrDirectory) {
- pollFileOrDirectory(fileOrDirectory, true);
- }
+ public boolean isAutoCreateDirectory() {
+ return autoCreateDirectory;
+ }
- protected void pollFileOrDirectory(File fileOrDirectory, boolean processDir) {
- if (!fileOrDirectory.isDirectory()) {
- pollFile(fileOrDirectory); // process the file
- }
- else if (processDir) {
- logger.debug("Polling directory " + fileOrDirectory);
- File[] files = fileOrDirectory.listFiles(getFilter());
- for (int i = 0; i < files.length; i++) {
- pollFileOrDirectory(files[i], isRecursive()); // self-recursion
- }
- }
- else {
- logger.debug("Skipping directory " + fileOrDirectory);
- }
- }
+ public void setAutoCreateDirectory(boolean autoCreateDirectory) {
+ this.autoCreateDirectory = autoCreateDirectory;
+ }
- protected void pollFile(final File aFile) {
- if (logger.isDebugEnabled()) {
- logger.debug("Scheduling file " + aFile + " for processing");
- }
- getExecutor().execute(new Runnable() {
- public void run() {
- String uri = file.toURI().relativize(aFile.toURI()).toString();
- Lock lock = lockManager.getLock(uri);
- if (lock.tryLock()) {
- try {
- processFileAndDelete(aFile);
- }
- finally {
- lock.unlock();
- }
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Unable to acquire lock on " + aFile);
- }
- }
- }
- });
- }
+ public FileMarshaler getMarshaler() {
+ return marshaler;
+ }
- protected void processFileAndDelete(File aFile) {
- try {
- if (logger.isDebugEnabled()) {
- logger.debug("Processing file " + aFile);
- }
- if (aFile.exists()) {
- processFile(aFile);
- if (isDeleteFile()) {
- if (!aFile.delete()) {
- throw new IOException("Could not delete file " +
aFile);
- }
- }
- }
- }
- catch (Exception e) {
- logger.error("Failed to process file: " + aFile + ". Reason: " +
e, e);
- }
- }
+ public void setMarshaler(FileMarshaler marshaler) {
+ this.marshaler = marshaler;
+ }
- protected void processFile(File aFile) throws Exception {
- String name = aFile.getCanonicalPath();
- InputStream in = new BufferedInputStream(new FileInputStream(aFile));
- InOnly exchange = getExchangeFactory().createInOnlyExchange();
- configureExchangeTarget(exchange);
- NormalizedMessage message = exchange.createMessage();
- exchange.setInMessage(message);
- marshaler.readMessage(exchange, message, in, name);
- sendSync(exchange);
- in.close();
- }
+ // Implementation methods
+ //
-------------------------------------------------------------------------
- public String getLocationURI() {
- return file.toURI().toString();
- }
+ protected void pollFileOrDirectory(File fileOrDirectory) {
+ pollFileOrDirectory(fileOrDirectory, true);
+ }
- public void process(MessageExchange exchange) throws Exception {
- // Do nothing. In our case, this method should never be called
- // as we only send synchronous InOnly exchange
- }
+ protected void pollFileOrDirectory(File fileOrDirectory, boolean
processDir) {
+ if (!fileOrDirectory.isDirectory()) {
+ pollFile(fileOrDirectory); // process the file
+ } else if (processDir) {
+ logger.debug("Polling directory " + fileOrDirectory);
+ File[] files = fileOrDirectory.listFiles(getFilter());
+ for (int i = 0; i < files.length; i++) {
+ pollFileOrDirectory(files[i], isRecursive());
// self-recursion
+ }
+ } else {
+ logger.debug("Skipping directory " + fileOrDirectory);
+ }
+ }
+
+ protected void pollFile(final File aFile) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Scheduling file " + aFile + " for
processing");
+ }
+ getExecutor().execute(new Runnable() {
+ public void run() {
+ String uri =
file.toURI().relativize(aFile.toURI()).toString();
+ Lock lock = lockManager.getLock(uri);
+ if (lock.tryLock()) {
+ boolean unlock = true;
+ try {
+ unlock =
processFileAndDelete(aFile);
+ } finally {
+ if (unlock) {
+ lock.unlock();
+ }
+ }
+ } else {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Unable to acquire lock
on " + aFile);
+ }
+ }
+ }
+ });
+ }
+
+ protected boolean processFileAndDelete(File aFile) {
+ boolean unlock = true;
+ try {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Processing file " + aFile);
+ }
+ if (aFile.exists()) {
+ processFile(aFile);
+ unlock = false;
+ if (isDeleteFile()) {
+ if (!aFile.delete()) {
+ throw new IOException("Could not
delete file " + aFile);
+ }
+ unlock = true;
+ }
+ }
+ } catch (Exception e) {
+ logger.error("Failed to process file: " + aFile + ". Reason:
" + e,
+ e);
+ }
+ return unlock;
+ }
+
+ protected void processFile(File aFile) throws Exception {
+ String name = aFile.getCanonicalPath();
+ InputStream in = new BufferedInputStream(new
FileInputStream(aFile));
+ InOnly exchange = getExchangeFactory().createInOnlyExchange();
+ configureExchangeTarget(exchange);
+ NormalizedMessage message = exchange.createMessage();
+ exchange.setInMessage(message);
+ marshaler.readMessage(exchange, message, in, name);
+ sendSync(exchange);
+ in.close();
+ }
+
+ public String getLocationURI() {
+ return file.toURI().toString();
+ }
+
+ public void process(MessageExchange exchange) throws Exception {
+ // Do nothing. In our case, this method should never be called
+ // as we only send synchronous InOnly exchange
+ }
}