Hi,
you can output the path where you saved the files on GCS in your first DoFn
and use the TextIO.readAll() method.
Also it is better to initialise the FTPClient in the @Setup method instead
of every time you process and element in the @ProcessElement method.
Something like this:
PCollection<FtpInput> input = ...
PCollection<String> fileTransfers = input.apply("Transfer FTP",
ParDo.of(FtpToGcsDoFn.from(options.getFtpHost())))
.apply("Read File",
TextIO.readAll()) // This is not correct ...
.apply("Read CSV LINES ", .....)
.apply("Convert to AVRO".....)
.apply("Save in AVRO",...)
public class FtpToGcsDoFn extends DoFn<FtpInput, String> {
private final ValueProvider<String> ftpHost;
private FTPClient ftpClient;
public static FtpToGcsDoFn from(ValueProvider<String> ftpHost) {
return new FtpToGcsDoFn(ftpHost);
}
private FtpToGcsDoFn(ValueProvider<String> ftpHost) {
this.ftpHost = ftpHost;
}
@Setup
public void setup() {
ftpClient = new FTPClient();
ftpClient.connect(ftpHost.get());
}
@ProcessElement
public void processElement(@Element FtpInput f, OutputReceiver<String>
outputReceiver) {
ByteArrayOutputStream download = new ByteArrayOutputStream();
boolean result = ftpClient.retrieveFile(f.getName(), download);
String destinationPath = saveCSV(download, f.getName()); // save
CSV in Storage Google cloud
outputReceiver.output(destinationPath);
}
}
Regards,
Csabi
On Fri, 1 Feb 2019 at 08:55, Henrique Molina <[email protected]>
wrote:
> Dear all,
> I Using FTP Client to download some file dynamically , and the file is
> csv. ( it is working fine)
> And the next step I need to open the files, and read lines
>
> Somebody could help me using the good practices in this approach ?
> I using Java > Google DataFlow > apache beam 2.9.0
>
> PCollection<String> fileTransfers= pipeline.apply("Transfer FTP", new
> DoFn<FtpInput, String>{
>
> @ProcessElement
>
> public void processElement(ProcessContext c) {
>
> ArgsOptions opt= c.getPipelineOptions().as(ArgsOptions.class);
>
> FTPClient ftp = new FTPClient();
>
> ftp.connect(opt.getFtpHost());
>
> ByteArrayOutputStream download = new
> ByteArrayOutputStream();
>
> boolean result= ftp.retrieveFile(f.getName(),
> download);
>
> saveCSV(download); // save CSV in Storage Google
> cloud
>
> c.output("???");
>
> ...
> })
> .apply("Read File", TextIO.read().from("")); // This is not correct ...
> .apply("Read CSV LINES ", .....);
> .appply("Convert to AVRO".....) ;
> .apply("Save in AVRO",...);
>
> What I found at Internet is samples using the easy way:
> Start the pipeline with TextIO.read().from("hardcoded path") first.
> But I can't find some example in my situations.
> Someone already faced this challenge?
>
> Thanks in Advanced
>
>