help with right transform to read tgz file

2018-12-21 Thread Sridevi Nookala
Hi,

I am newbie to apache beam
I am trying to write a simple pipeline using apache beam java sdk.
the pipleline will read a bunch of tgz files.
each tgz files have multiple CSV files with data

public static final void main(String args[]) throws Exception {

PipelineOptions options = PipelineOptionsFactory.create();
Pipeline pipeline = Pipeline.create(options);


PCollection matches =
pipeline.apply(FileIO.match().filepattern("/tmp/beam5/*.tgz"));

PCollection compGz = 
matches.apply(FileIO.readMatches().withCompression(Compression.GZIP));
PCollection contents = compGz.apply(FlatMapElements
// uses imports from TypeDescriptors
.into(TypeDescriptors.strings())
.via((ReadableFile f) -> {
try {
return 
Arrays.asList(f.readFullyAsUTF8String().replaceAll("^@","").split("\\r?\\n|\\r",
 -1));
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return null;
}) );
PDone ret = 
contents.apply(TextIO.write().to("/tmp/beam6/output.txt").withoutSharding());

}

instead of returning flat list of strings, i tried parsing the 
f.readFullyAsUTF8String() and make CSVFileBean, but it does not seem to like

basically the above program is crude

i am looking for suggestions on right transform to transform this tgz into 
individual CSV bean POJO's that have name of CSV and contents

i am stuck decoding the tgz from readFullyAsUTF8String()

eventually i need to take each CSV bean and combine them

Eg.  test1.tgz has  foo_time1.csv, bar_time1.csv and
   test2.tgz has  foo_time2.csv, bar_time2.csv

so i need to extract these CSV's and combine all the foo's and bar's

and possibly manipulate foo's, bar's by adding columns and transforming and 
then sending out to destinations which can be filesystem or kafka

thanks
Any help is appreciated
Sri







Re: help with right transform to read tgz file

2018-12-26 Thread Jeff Klukas
The general approach in your example looks reasonable to me. I don't think
there's anything built in to Beam to help with parsing the tar file format
and I don't know how robust the method of replacing "^@" and then splitting
on newlines will be. I'd likely use Apache's commons-compress library for
walking through the bytes of the tar file, pulling out the file names and
associated contents.

You should be able to put all of that logic into a single FlatMapElements
invocation as in your example. I'd suggest returning KV
where the key is the file name (or perhaps just the prefix of the file name
if that's what you want to combine on) and the value is the line of content.

It looks like Beam does include some capabilities for parsing CSV lines,
but I have no experience using them. It looks like they're building upon
Apache commons-csv, so you might consider using that library directly if
you need to parse fields out of the CSV and do transformations.

On Fri, Dec 21, 2018 at 6:24 PM Sridevi Nookala <
snook...@parallelwireless.com> wrote:

> Hi,
>
> I am newbie to apache beam
> I am trying to write a simple pipeline using apache beam java sdk.
> the pipleline will read a bunch of tgz files.
> each tgz files have multiple CSV files with data
>
> public static final void main(String args[]) throws Exception {
>
> PipelineOptions options = PipelineOptionsFactory.create();
> Pipeline pipeline = Pipeline.create(options);
>
>
> PCollection matches =
> pipeline.apply(FileIO.match().filepattern("/tmp/beam5/*.tgz"));
>
> PCollection compGz =
> matches.apply(FileIO.readMatches().withCompression(Compression.GZIP));
> PCollection contents = compGz.apply(FlatMapElements
> // uses imports from TypeDescriptors
> .into(TypeDescriptors.strings())
> .via((ReadableFile f) -> {
> try {
> return
> Arrays.asList(f.readFullyAsUTF8String().replaceAll("^@","").split("\\r?\\n|\\r",
> -1));
> } catch (IOException e) {
> // TODO Auto-generated catch block
> e.printStackTrace();
> }
> return null;
> }) );
> PDone ret =
> contents.apply(TextIO.write().to("/tmp/beam6/output.txt").withoutSharding());
>
> }
>
> instead of returning flat list of strings, i tried parsing the
> f.readFullyAsUTF8String() and make CSVFileBean, but it does not seem to
> like
>
> basically the above program is crude
>
> i am looking for suggestions on right transform to transform this tgz into
> individual CSV bean POJO's that have name of CSV and contents
>
> i am stuck decoding the tgz from readFullyAsUTF8String()
>
> eventually i need to take each CSV bean and combine them
>
> Eg.  test1.tgz has  foo_time1.csv, bar_time1.csv and
>test2.tgz has  foo_time2.csv, bar_time2.csv
>
> so i need to extract these CSV's and combine all the foo's and bar's
>
> and possibly manipulate foo's, bar's by adding columns and transforming
> and then sending out to destinations which can be filesystem or kafka
>
> thanks
> Any help is appreciated
> Sri
>
>
>
> --
>
>