Hi Csaba,
I deep appreciated your help and support !
Thanks so much, Now Its running correct and using the good practices.

I take advantage in this email , for ask another step,in this same pipeline.
The File csv will be generated to .avro,
And I have 2 differents schemas :
When came from FTP files with *02*.csv we have *schema_02.avsc*
*01*.csv = schema_01.avsc .
Each schema has different structure
Look my attempt bellow:
class FtpGcsDoFn extends DoFn<FtpInput, String> {

     @ProcessElement

    public void processElement(@Element FtpInput f, OutputReceiver<String>
     outputReceiver, ProcessContext c) {

ArgsOptions option=      c.getPipelineOptions().as(ArgsOptions.class);

....

String pathSavedFile = saveFile(download, file.getName(), option);

outputReceiver.output(pathSavedFile); // Save on GCP

  option.setCurrentSchema(String.format("schema_%s.avsc",file.getName());

}


...


.apply("Ftp download", ParDo.of( FtpTransferFileFn.from(options
.getFtpHost()))

                    .apply("Read csv"
,TextIO.readAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))

                    .apply("Read line", ParDo.of(new DoFn<String, String>(){

                     @ProcessElement

                         public void processElement(ProcessContext c) {

                     ArgsOptions opt= c.getPipelineOptions().as(ArgsOptions.
class);

                                 String nameFileSchema=
opt.getCurrentSchema().get();
** This method Not working, return always the last option.setCurrentSchema(
...) in class FtpGcsDoFn

                                 Schema schema = new
Schema.Parser().parse(schema);

                                  String[] rowValues = c.element().split(","
);

                                 // Create Avro Generic Record

                          GenericRecord genericRecord = new
GenericData.Record(schema);

                          for (int index = 0; index < fields.size(); ++index)
{

                              Schema.Field field = fields.get(index);

                              genericRecord.put(field.name(), rowValues[
index]);

                                  }

                          c.output(genericRecord); // NO serialized
GenericRecord  :'(

                         }

                    }))

                    .apply("Write Avro formatted data", AvroIO
.writeGenericRecords(SCHEMA_DINAMIC ?? ) //  How I get different schema...
:`(

                              .to(options.getOutput()).withCodec(
CodecFactory.snappyCodec()).withSuffix(".avro"));


Thank & Regards

On Fri, Feb 1, 2019 at 6:53 AM Csaba Kassai <cs...@aliz.ai> wrote:

> Hi,
>
> you can output the path where you saved the files on GCS in your first
> DoFn and use the TextIO.readAll() method.
> Also it is better to initialise the FTPClient in the @Setup method instead
> of every time you process and element in the @ProcessElement method.
> Something like this:
> PCollection<FtpInput> input = ...
> PCollection<String> fileTransfers = input.apply("Transfer FTP",
> ParDo.of(FtpToGcsDoFn.from(options.getFtpHost())))
>                                          .apply("Read File",
> TextIO.readAll()) // This is not correct ...
>                                          .apply("Read CSV LINES ", .....)
>                                          .apply("Convert to AVRO".....)
>                                          .apply("Save in AVRO",...)
>
> public class FtpToGcsDoFn extends DoFn<FtpInput, String> {
>
>     private final ValueProvider<String> ftpHost;
>     private FTPClient ftpClient;
>
>     public static FtpToGcsDoFn from(ValueProvider<String> ftpHost) {
>         return new FtpToGcsDoFn(ftpHost);
>     }
>
>     private FtpToGcsDoFn(ValueProvider<String> ftpHost) {
>         this.ftpHost = ftpHost;
>     }
>
>     @Setup
>     public void setup() {
>         ftpClient = new FTPClient();
>         ftpClient.connect(ftpHost.get());
>
>     }
>
>     @ProcessElement
>     public void processElement(@Element FtpInput f, OutputReceiver<String>
> outputReceiver) {
>
>         ByteArrayOutputStream download = new ByteArrayOutputStream();
>         boolean result = ftpClient.retrieveFile(f.getName(), download);
>         String destinationPath = saveCSV(download, f.getName()); // save
> CSV in Storage Google cloud
>         outputReceiver.output(destinationPath);
>     }
> }
>
> Regards,
> Csabi
>
>
>
> On Fri, 1 Feb 2019 at 08:55, Henrique Molina <henrique.mol...@gmail.com>
> wrote:
>
>> Dear all,
>> I Using FTP Client to download some file dynamically , and the file is
>> csv. ( it is working fine)
>> And the next step I need to open the files, and read lines
>>
>> Somebody could help me using the good practices in this approach ?
>> I using Java > Google DataFlow > apache beam 2.9.0
>>
>> PCollection<String> fileTransfers= pipeline.apply("Transfer FTP", new
>> DoFn<FtpInput, String>{
>>
>>         @ProcessElement
>>
>> public void processElement(ProcessContext c) {
>>
>> ArgsOptions opt= c.getPipelineOptions().as(ArgsOptions.class);
>>
>> FTPClient ftp = new FTPClient();
>>
>>                   ftp.connect(opt.getFtpHost());
>>
>>                          ByteArrayOutputStream download = new
>> ByteArrayOutputStream();
>>
>>                          boolean result= ftp.retrieveFile(f.getName(),
>> download);
>>
>>                          saveCSV(download); // save CSV in Storage Google
>> cloud
>>
>>                          c.output("???");
>>
>>             ...
>> })
>> .apply("Read File", TextIO.read().from("")); // This is not correct ...
>> .apply("Read CSV LINES ", .....);
>> .appply("Convert to AVRO".....) ;
>> .apply("Save in AVRO",...);
>>
>> What I found at Internet is  samples using the easy way:
>> Start the pipeline with TextIO.read().from("hardcoded path") first.
>> But I can't find some example in my situations.
>> Someone already  faced this challenge?
>>
>> Thanks in Advanced
>>
>>

Reply via email to