Thank you Priyanka. Do you have any example that uses this Operator for FTP?

Regards,
Surya Vamshi

From: Priyanka Gugale [mailto:priya...@datatorrent.com]
Sent: 2016, July, 08 10:48 AM
To: users@apex.apache.org
Subject: RE: Inputs needed on File Writer


Yes, ftp is supported but not sftp.

-Priyanka
On Jul 8, 2016 7:00 PM, "Mukkamula, Suryavamshivardhan (CWM-NR)" 
<suryavamshivardhan.mukkam...@rbc.com<mailto:suryavamshivardhan.mukkam...@rbc.com>>
 wrote:
Hi Priyanka,

Thank you for your inputs.

It may be dumb question, I heard from data torrent that SFTP is not supported 
for now in my previous communications.That means FTP is supported and SFTP is 
not supported ? please clarify the difference.

Regards,
Surya Vamshi

From: Priyanka Gugale 
[mailto:priya...@datatorrent.com<mailto:priya...@datatorrent.com>]
Sent: 2016, July, 08 12:07 AM
To: users@apex.apache.org<mailto:users@apex.apache.org>
Subject: Re: Inputs needed on File Writer

Hi,

The file will be available after window is committed, you can overwrite 
committed call and start your thread after super.commit is called. You might 
want to double check if file is actually finalized before starting your thread..

For your usecase I would suggest you to use AbstractFileOutputOperator to 
directly write file to ftp.

-Priyanka

On Fri, Jul 8, 2016 at 12:41 AM, Mukkamula, Suryavamshivardhan (CWM-NR) 
<suryavamshivardhan.mukkam...@rbc.com<mailto:suryavamshivardhan.mukkam...@rbc.com>>
 wrote:
Hi ,

Can you please let me know what happen when the requestFinalize() method is 
called as per below ?

Once the output files are written to HDFS, I would like to initiate a thread 
that reads the HDFS files and copies to FTP location. So I am trying to 
understand when can I trigger the thread.

####################### File Writer ##########################################

package com.rbc.aml.cnscan.operator;

import com.datatorrent.api.Context;
import com.datatorrent.lib.io.fs.AbstractFileOutputOperator;
import com.rbc.aml.cnscan.utils.KeyValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class FileWriter extends AbstractFileOutputOperator<KeyValue<String, 
String>> {
    private static final Logger LOG = LoggerFactory.getLogger(FileWriter.class);
    private List<String> filesToFinalize = new ArrayList<>();

    @Override
    public void setup(Context.OperatorContext context) {
        super.setup(context);
        finalizeFiles();
    }

    @Override
    protected byte[] getBytesForTuple(KeyValue<String, String> tuple) {
        if (tuple.value == null) {
         LOG.debug("File to finalize {}",tuple.key);
            filesToFinalize.add(tuple.key);
            return new byte[0];
        }
        else {
            return tuple.value.getBytes();
        }
    }

    @Override
    protected String getFileName(KeyValue<String, String> tuple) {
        return tuple.key;
    }

    @Override
    public void endWindow() {
         LOG.info("end window is called, files are :{}"+filesToFinalize);
        super.endWindow();
        finalizeFiles();
    }

    private void finalizeFiles() {
         LOG.debug("Files to finalize {}",filesToFinalize.toArray());
        Iterator<String> fileIt = filesToFinalize.iterator();
        while(fileIt.hasNext()) {
            requestFinalize(fileIt.next());
            fileIt.remove();
        }
    }
}
####################################################################################################


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email 
or otherwise) immediately. You have consented to receive the attached 
electronically at the above-noted email address; please retain a copy of this 
confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur 
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté 
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse 
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation 
pour les fins de reference future.


_______________________________________________________________________

If you received this email in error, please advise the sender (by return email 
or otherwise) immediately. You have consented to receive the attached 
electronically at the above-noted email address; please retain a copy of this 
confirmation for future reference.

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur 
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté 
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse 
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation 
pour les fins de reference future.
_______________________________________________________________________
If you received this email in error, please advise the sender (by return email 
or otherwise) immediately. You have consented to receive the attached 
electronically at the above-noted email address; please retain a copy of this 
confirmation for future reference.  

Si vous recevez ce courriel par erreur, veuillez en aviser l'expéditeur 
immédiatement, par retour de courriel ou par un autre moyen. Vous avez accepté 
de recevoir le(s) document(s) ci-joint(s) par voie électronique à l'adresse 
courriel indiquée ci-dessus; veuillez conserver une copie de cette confirmation 
pour les fins de reference future.

Reply via email to