Hi all,

Our current file transport is written using Apache VFS library. Through
that when a directory is specified as listening source via siddhi io file
source , it will start reading any file copied to it. The issue arises when
we use FTP to copy a large file(around 68mb) and we use following code
block to do so,

case Constants.READ:
    if (path.exists()) {
        byte[] bytes;
        inputStream = path.getContent().getInputStream();
        bytes = toByteArray(inputStream);
        BinaryCarbonMessage message = new
BinaryCarbonMessage(ByteBuffer.wrap(bytes), true);
        message.setProperty(org.wso2.carbon.messaging.Constants.DIRECTION,
                org.wso2.carbon.messaging.Constants.DIRECTION_RESPONSE);
        carbonMessageProcessor.receive(message, carbonCallback);
    } else {
        throw new ClientConnectorException(
                "Failed to read file: " + path.getName().getURI() + "
not found");
    }
    break;


Since the copying of file will take some time, above read will happen for
partial copied file and send those bytes to downstream. Basically reading
will not wait till the file is fully copied . In order to handle this we
are going to use following approach ,

case Constants.READ:
    if (path.exists()) {
        byte[] bytes;
        do {
            inputStream = path.getContent().getInputStream();
            bytes = toByteArray(inputStream);
            Thread.sleep(100);
        } while (bytes.length !=
toByteArray(path.getContent().getInputStream()).length);
        BinaryCarbonMessage message = new
BinaryCarbonMessage(ByteBuffer.wrap(bytes), true);
        message.setProperty(org.wso2.carbon.messaging.Constants.DIRECTION,
                org.wso2.carbon.messaging.Constants.DIRECTION_RESPONSE);
        carbonMessageProcessor.receive(message, carbonCallback);
    } else {
        throw new ClientConnectorException(
                "Failed to read file: " + path.getName().getURI() + "
not found");
    }
    break;

Above while loop will not exit until two consecutive reads will provide
same length of bytes. And we will provide default value for thread sleep
value and also allow use to change it from siddhi io file source as well.
I checked on a possibility of a locking mechanism but since we do not have
the control over writing process i think we have to go with above solution.
Please mention is there any concerns or any other way of  handling this .

Regards,
Damith

-- 
Associate Technical Lead | WSO2 Inc.
lean.enterprise.middleware

mobile: *+94728671315*
_______________________________________________
Dev mailing list
[email protected]
http://wso2.org/cgi-bin/mailman/listinfo/dev

Reply via email to