+1. I fixed it. One thing I noticed with the vfs transport is that it requires some re-factoring. May be we can re-write it completely at some point in future, possibly replacing the busy loop polling, proper streaming and large file transfer support etc.. etc..
Rajika On Fri, Aug 9, 2013 at 2:21 AM, Hiranya Jayathilaka <[email protected]>wrote: > Hi Rajika, > > On Aug 8, 2013, at 10:58 PM, [email protected] wrote: > > Author: rajikak > Date: Fri Aug 9 05:58:20 2013 > New Revision: 1512144 > > URL: http://svn.apache.org/r1512144 > Log: > fixed SYNAPSE-502. > > Modified: > > > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java > > > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java > > > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java > > > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java > > Modified: > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java > URL: > http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java?rev=1512144&r1=1512143&r2=1512144&view=diff > > ============================================================================== > --- > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java > (original) > +++ > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSConstants.java > Fri Aug 9 05:58:20 2013 > @@ -20,7 +20,7 @@ > package org.apache.synapse.transport.vfs; > > public final class VFSConstants { > - > + > // vfs transport prefix (e.g. used in an out EPR etc) > public static final String VFS_PREFIX = "vfs:"; > > @@ -41,8 +41,8 @@ public final class VFSConstants { > public static final String TRANSPORT_FILE_FILE_URI = > "transport.vfs.FileURI"; > public static final String TRANSPORT_FILE_FILE_NAME_PATTERN = > "transport.vfs.FileNamePattern"; > public static final String TRANSPORT_FILE_CONTENT_TYPE = > "transport.vfs.ContentType"; > - public static final String TRANSPORT_FILE_LOCKING = > "transport.vfs.Locking"; > - public static final String TRANSPORT_FILE_LOCKING_ENABLED = "enable"; > > + public static final String TRANSPORT_FILE_LOCKING = > "transport.vfs.Locking"; > + public static final String TRANSPORT_FILE_LOCKING_ENABLED = "enable"; > public static final String TRANSPORT_FILE_LOCKING_DISABLED = "disable"; > > public static final String REPLY_FILE_URI = > "transport.vfs.ReplyFileURI"; > @@ -53,15 +53,15 @@ public final class VFSConstants { > > public static final String DEFAULT_XML_RESPONSE_FILE = "response.xml"; > public static final String DEFAULT_NON_XML_RESPONSE_FILE = > "response.dat"; > - > + > public static final String STREAMING = "transport.vfs.Streaming"; > - > + > public static final String MAX_RETRY_COUNT = > "transport.vfs.MaxRetryCount"; > public static final String RECONNECT_TIMEOUT = > "transport.vfs.ReconnectTimeout"; > public static final String APPEND = "transport.vfs.Append"; > public static final int DEFAULT_MAX_RETRY_COUNT = 3; > public static final long DEFAULT_RECONNECT_TIMEOUT = 30000; > - > + > // transport header property names used by the VFS transport > public static final String FILE_PATH = "FILE_PATH"; > public static final String FILE_NAME = "FILE_NAME"; > @@ -99,6 +99,11 @@ public final class VFSConstants { > public static final String > TRANSPORT_FAILED_RECORD_NEXT_RETRY_DURATION = > "transport.vfs.FailedRecordNextRetryDuration"; > > + /** > + * Should use a temp file when uploading ? > + */ > + public static final String TRANSPORT_FILE_USE_TEMP_FILE = > "transport.vfs.UseTempFile"; > + > public static final int DEFAULT_NEXT_RETRY_DURATION = 3000; // 3 > seconds > > /** > > Modified: > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java > URL: > http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java?rev=1512144&r1=1512143&r2=1512144&view=diff > > ============================================================================== > --- > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java > (original) > +++ > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSOutTransportInfo.java > Fri Aug 9 05:58:20 2013 > @@ -43,22 +43,23 @@ public class VFSOutTransportInfo impleme > private long reconnectTimeout = 30000; > private boolean append; > private boolean fileLocking; > + private boolean isUseTempFile = false; > > /** > * Constructs the VFSOutTransportInfo containing the information about > the file to which the > * response has to be submitted to. > - * > + * > * @param outFileURI URI of the file to which the message is delivered > */ > VFSOutTransportInfo(String outFileURI, boolean fileLocking) { > - > + > if (outFileURI.startsWith(VFSConstants.VFS_PREFIX)) { > this.outFileURI = > outFileURI.substring(VFSConstants.VFS_PREFIX.length()); > } else { > this.outFileURI = outFileURI; > } > > - Map<String,String> properties = > BaseUtils.getEPRProperties(outFileURI); > + Map<String, String> properties = > BaseUtils.getEPRProperties(outFileURI); > if (properties.containsKey(VFSConstants.MAX_RETRY_COUNT)) { > String strMaxRetryCount = > properties.get(VFSConstants.MAX_RETRY_COUNT); > maxRetryCount = Integer.parseInt(strMaxRetryCount); > @@ -66,6 +67,11 @@ public class VFSOutTransportInfo impleme > maxRetryCount = VFSConstants.DEFAULT_MAX_RETRY_COUNT; > } > > + if > (properties.containsKey(VFSConstants.TRANSPORT_FILE_USE_TEMP_FILE)) { > + String useTempFile = > properties.get(VFSConstants.TRANSPORT_FILE_USE_TEMP_FILE); > + isUseTempFile = Boolean.valueOf(useTempFile).booleanValue(); > + } > + > if (properties.containsKey(VFSConstants.RECONNECT_TIMEOUT)) { > String strReconnectTimeout = > properties.get(VFSConstants.RECONNECT_TIMEOUT); > reconnectTimeout = Long.parseLong(strReconnectTimeout) * 1000; > @@ -122,10 +128,14 @@ public class VFSOutTransportInfo impleme > return reconnectTimeout; > } > > + public boolean isUseTempFile() { > + return isUseTempFile; > + } > + > public void setReconnectTimeout(long reconnectTimeout) { > this.reconnectTimeout = reconnectTimeout; > } > - > + > public boolean isAppend() { > return append; > } > > Modified: > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java > URL: > http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java?rev=1512144&r1=1512143&r2=1512144&view=diff > > ============================================================================== > --- > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java > (original) > +++ > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSTransportSender.java > Fri Aug 9 05:58:20 2013 > @@ -38,6 +38,7 @@ import org.apache.commons.vfs2.FileSyste > import org.apache.commons.vfs2.FileType; > import org.apache.commons.vfs2.impl.StandardFileSystemManager; > > +import java.io.File; > import java.io.IOException; > > /** > @@ -127,7 +128,9 @@ public class VFSTransportSender extends > int maxRetryCount = vfsOutInfo.getMaxRetryCount(); > long reconnectionTimeout = > vfsOutInfo.getReconnectTimeout(); > boolean append = vfsOutInfo.isAppend(); > - > + boolean isUseTempFile = vfsOutInfo.isUseTempFile(); > + String tempTargetFileName, actualTargetFileName = null; > + > while (wasError) { > > try { > @@ -152,7 +155,7 @@ public class VFSTransportSender extends > try { > Thread.sleep(reconnectionTimeout); > } catch (InterruptedException e2) { > - e2.printStackTrace(); > + Thread.currentThread().interrupt(); > } > } > } > @@ -160,9 +163,14 @@ public class VFSTransportSender extends > if (replyFile.exists()) { > > if (replyFile.getType() == FileType.FOLDER) { > + if (isUseTempFile) { > + tempTargetFileName = > VFSUtils.generateTempFileName(); > + actualTargetFileName = > VFSUtils.getFileName(msgCtx, vfsOutInfo); > + } else { > + tempTargetFileName = > VFSUtils.getFileName(msgCtx, vfsOutInfo); > + } > // we need to write a file containing the message > to this folder > - FileObject responseFile = > fsManager.resolveFile(replyFile, > - VFSUtils.getFileName(msgCtx, vfsOutInfo)); > + FileObject responseFile = > fsManager.resolveFile(replyFile, tempTargetFileName); > > // if file locking is not disabled acquire the lock > // before uploading the file > @@ -180,7 +188,16 @@ public class VFSTransportSender extends > populateResponseFile(responseFile, > msgCtx,append, false); > } > > + if (isUseTempFile) { > + > > responseFile.moveTo(fsManager.resolveFile(replyFile, > actualTargetFileName)); > + } > + > } else if (replyFile.getType() == FileType.FILE) { > + if (isUseTempFile) { > + tempTargetFileName = > VFSUtils.generateTempFileName(); > + actualTargetFileName = > replyFile.getURL().toString(); > + replyFile = getTempFileObject(fsManager, > replyFile, tempTargetFileName); > + } > > // if file locking is not disabled acquire the lock > // before uploading the file > @@ -192,11 +209,20 @@ public class VFSTransportSender extends > populateResponseFile(replyFile, msgCtx, > append, false); > } > > + if (isUseTempFile) { > + > > replyFile.moveTo(fsManager.resolveFile(actualTargetFileName)); > + } > } else { > handleException("Unsupported reply file type : " + > replyFile.getType() + > " for file : " + > vfsOutInfo.getOutFileURI()); > } > } else { > + if (isUseTempFile) { > + tempTargetFileName = > VFSUtils.generateTempFileName(); > + actualTargetFileName = > replyFile.getURL().toString(); > + replyFile = getTempFileObject(fsManager, > replyFile, tempTargetFileName); > + } > + > // if file locking is not disabled acquire the lock > before uploading the file > if (vfsOutInfo.isFileLockingEnabled()) { > acquireLockForSending(replyFile, vfsOutInfo); > @@ -207,6 +233,10 @@ public class VFSTransportSender extends > replyFile.createFile(); > populateResponseFile(replyFile, msgCtx, append, > false); > } > + > + if (isUseTempFile) { > + > > replyFile.moveTo(fsManager.resolveFile(actualTargetFileName)); > + } > } > } catch (FileSystemException e) { > handleException("Error resolving reply file : " + > @@ -278,4 +308,12 @@ public class VFSTransportSender extends > } > } > } > + > + private FileObject getTempFileObject(FileSystemManager fsManager, > + FileObject originalFileObj, > + String fileName) throws > FileSystemException { > + FileObject pareFileObject = originalFileObj.getParent(); > + String parentURL = pareFileObject.getURL().toString(); > + return > fsManager.resolveFile(parentURL.concat(File.separator).concat(fileName)); > + } > > > This function implementation looks a little problematic. File.separator > returns the file separator character used by the OS on which Synapse runs. > It might not be applicable to the remote file system. So how about doing > this instead: > > FileObject parentFileObject = originalFileObj.getParent(); > return fsManager.resolveFile(parentFileObject, fileName); > > Thanks, > Hiranya > > > } > > Modified: > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java > URL: > http://svn.apache.org/viewvc/synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java?rev=1512144&r1=1512143&r2=1512144&view=diff > > ============================================================================== > --- > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java > (original) > +++ > synapse/trunk/java/modules/transports/core/vfs/src/main/java/org/apache/synapse/transport/vfs/VFSUtils.java > Fri Aug 9 05:58:20 2013 > @@ -33,10 +33,7 @@ import java.io.IOException; > import java.io.InputStream; > import java.io.OutputStream; > import java.text.SimpleDateFormat; > -import java.util.Arrays; > -import java.util.Date; > -import java.util.Map; > -import java.util.Random; > +import java.util.*; > import java.util.regex.Pattern; > import java.util.regex.Matcher; > > @@ -114,12 +111,12 @@ public class VFSUtils extends BaseUtils > * @return boolean true if the lock has been acquired or false if not > */ > public synchronized static boolean acquireLock(FileSystemManager > fsManager, FileObject fo) { > - > + > // generate a random lock value to ensure that there are no two > parties > // processing the same file > Random random = new Random(); > byte[] lockValue = String.valueOf(random.nextLong()).getBytes(); > - > + > try { > // check whether there is an existing lock for this item, if > so it is assumed > // to be processed by an another listener (downloading) or a > sender (uploading) > @@ -214,6 +211,13 @@ public class VFSUtils extends BaseUtils > return url; > } > > + /** > + * Generate a unique file name to be used as a temp file > + * @return the temp file name > + */ > + public static String generateTempFileName() { > + return "file".concat(UUID.randomUUID().toString()); > + } > > private static boolean verifyLock(byte[] lockValue, FileObject > lockObject) { > try { > > > > -- > Hiranya Jayathilaka > Mayhem Lab/RACE Lab; > Dept. of Computer Science, UCSB; http://cs.ucsb.edu > E-mail: [email protected] <[email protected]>; Mobile: +1 (805) 895-7443 > Blog: > http://techfeast-hiranya.**blogspot.com<http://techfeast-hiranya.blogspot.com/> > >
