Hi Csaba, I deep appreciated your help and support ! Thanks so much, Now Its running correct and using the good practices.
I take advantage in this email , for ask another step,in this same pipeline. The File csv will be generated to .avro, And I have 2 differents schemas : When came from FTP files with *02*.csv we have *schema_02.avsc* *01*.csv = schema_01.avsc . Each schema has different structure Look my attempt bellow: class FtpGcsDoFn extends DoFn<FtpInput, String> { @ProcessElement public void processElement(@Element FtpInput f, OutputReceiver<String> outputReceiver, ProcessContext c) { ArgsOptions option= c.getPipelineOptions().as(ArgsOptions.class); .... String pathSavedFile = saveFile(download, file.getName(), option); outputReceiver.output(pathSavedFile); // Save on GCP option.setCurrentSchema(String.format("schema_%s.avsc",file.getName()); } ... .apply("Ftp download", ParDo.of( FtpTransferFileFn.from(options .getFtpHost())) .apply("Read csv" ,TextIO.readAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW)) .apply("Read line", ParDo.of(new DoFn<String, String>(){ @ProcessElement public void processElement(ProcessContext c) { ArgsOptions opt= c.getPipelineOptions().as(ArgsOptions. class); String nameFileSchema= opt.getCurrentSchema().get(); ** This method Not working, return always the last option.setCurrentSchema( ...) in class FtpGcsDoFn Schema schema = new Schema.Parser().parse(schema); String[] rowValues = c.element().split("," ); // Create Avro Generic Record GenericRecord genericRecord = new GenericData.Record(schema); for (int index = 0; index < fields.size(); ++index) { Schema.Field field = fields.get(index); genericRecord.put(field.name(), rowValues[ index]); } c.output(genericRecord); // NO serialized GenericRecord :'( } })) .apply("Write Avro formatted data", AvroIO .writeGenericRecords(SCHEMA_DINAMIC ?? ) // How I get different schema... :`( .to(options.getOutput()).withCodec( CodecFactory.snappyCodec()).withSuffix(".avro")); Thank & Regards On Fri, Feb 1, 2019 at 6:53 AM Csaba Kassai <cs...@aliz.ai> wrote: > Hi, > > you can output the path where you saved the files on GCS in your first > DoFn and use the TextIO.readAll() method. > Also it is better to initialise the FTPClient in the @Setup method instead > of every time you process and element in the @ProcessElement method. > Something like this: > PCollection<FtpInput> input = ... > PCollection<String> fileTransfers = input.apply("Transfer FTP", > ParDo.of(FtpToGcsDoFn.from(options.getFtpHost()))) > .apply("Read File", > TextIO.readAll()) // This is not correct ... > .apply("Read CSV LINES ", .....) > .apply("Convert to AVRO".....) > .apply("Save in AVRO",...) > > public class FtpToGcsDoFn extends DoFn<FtpInput, String> { > > private final ValueProvider<String> ftpHost; > private FTPClient ftpClient; > > public static FtpToGcsDoFn from(ValueProvider<String> ftpHost) { > return new FtpToGcsDoFn(ftpHost); > } > > private FtpToGcsDoFn(ValueProvider<String> ftpHost) { > this.ftpHost = ftpHost; > } > > @Setup > public void setup() { > ftpClient = new FTPClient(); > ftpClient.connect(ftpHost.get()); > > } > > @ProcessElement > public void processElement(@Element FtpInput f, OutputReceiver<String> > outputReceiver) { > > ByteArrayOutputStream download = new ByteArrayOutputStream(); > boolean result = ftpClient.retrieveFile(f.getName(), download); > String destinationPath = saveCSV(download, f.getName()); // save > CSV in Storage Google cloud > outputReceiver.output(destinationPath); > } > } > > Regards, > Csabi > > > > On Fri, 1 Feb 2019 at 08:55, Henrique Molina <henrique.mol...@gmail.com> > wrote: > >> Dear all, >> I Using FTP Client to download some file dynamically , and the file is >> csv. ( it is working fine) >> And the next step I need to open the files, and read lines >> >> Somebody could help me using the good practices in this approach ? >> I using Java > Google DataFlow > apache beam 2.9.0 >> >> PCollection<String> fileTransfers= pipeline.apply("Transfer FTP", new >> DoFn<FtpInput, String>{ >> >> @ProcessElement >> >> public void processElement(ProcessContext c) { >> >> ArgsOptions opt= c.getPipelineOptions().as(ArgsOptions.class); >> >> FTPClient ftp = new FTPClient(); >> >> ftp.connect(opt.getFtpHost()); >> >> ByteArrayOutputStream download = new >> ByteArrayOutputStream(); >> >> boolean result= ftp.retrieveFile(f.getName(), >> download); >> >> saveCSV(download); // save CSV in Storage Google >> cloud >> >> c.output("???"); >> >> ... >> }) >> .apply("Read File", TextIO.read().from("")); // This is not correct ... >> .apply("Read CSV LINES ", .....); >> .appply("Convert to AVRO".....) ; >> .apply("Save in AVRO",...); >> >> What I found at Internet is samples using the easy way: >> Start the pipeline with TextIO.read().from("hardcoded path") first. >> But I can't find some example in my situations. >> Someone already faced this challenge? >> >> Thanks in Advanced >> >>