Created https://issues.apache.org/jira/browse/BEAM-3867.
Thanks, Cham On Fri, Mar 16, 2018 at 3:00 PM Eugene Kirpichov <kirpic...@google.com> wrote: > Reading can not be parallelized, but processing can be - so there is value > in having our file-based sources automatically decompress .tar and .tar.gz. > (also, I suspect that many people use Beam even for cases with a modest > amount of data, that don't have or need parallelism, just for the sake of > convenience of Beam's APIs and IOs) > > On Fri, Mar 16, 2018 at 2:50 PM Chamikara Jayalath <chamik...@google.com> > wrote: > >> FWIW, if you have a concat gzip file [1] TextIO and other file-based >> sources should be able to read that. But we don't support tar files. Is it >> possible to perform tar extraction before running the pipeline ? This step >> probably cannot be parallelized. So not much value in performing within the >> pipeline anyways (other than easy access to various file-systems). >> >> - Cham >> >> [1] >> https://stackoverflow.com/questions/8005114/fast-concatenation-of-multiple-gzip-files >> >> >> On Fri, Mar 16, 2018 at 12:26 PM Sajeevan Achuthan < >> achuthan.sajee...@gmail.com> wrote: >> >>> Eugene - Yes, you are correct. I tried with a text file & Beam >>> wordcount example. The TextIO reader reads some illegal characters as seen >>> below. >>> >>> >>> here’s: 1 >>> addiction: 1 >>> new: 1 >>> we: 1 >>> mood: 1 >>> an: 1 >>> incredible: 1 >>> swings,: 1 >>> known: 1 >>> choices.: 1 >>> ^@eachsaj^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@eachsaj^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@They’re: >>> 1 >>> already: 2 >>> today: 1 >>> the: 3 >>> generation: 1 >>> wordcount-00002 >>> >>> >>> thanks >>> Saj >>> >>> >>> On 16 March 2018 at 17:45, Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> To clarify: I think natively supporting .tar and .tar.gz would be quite >>>> useful. I'm just saying that currently we don't. >>>> >>>> On Fri, Mar 16, 2018 at 10:44 AM Eugene Kirpichov <kirpic...@google.com> >>>> wrote: >>>> >>>>> The code behaves as I expected, and the output is corrupt. >>>>> Beam unzipped the .gz, but then interpreted the .tar as a text file, >>>>> and split the .tar file by \n. >>>>> E.g. the first file of the output starts with lines: >>>>> A20171012.1145+0200-1200+0200_epg10-1_node.xml/0000755000175000017500000000000013252764467016513 >>>>> 5ustar >>>>> eachsajeachsajA20171012.1145+0200-1200+0200_epg10-1_node.xml/data0000644000175000017500000000360513252764467017353 >>>>> 0ustar eachsajeachsaj<?xml version="1.0" encoding="UTF-8"?> >>>>> >>>>> which are clearly not the expected input. >>>>> >>>>> On Fri, Mar 16, 2018 at 10:39 AM Sajeevan Achuthan < >>>>> achuthan.sajee...@gmail.com> wrote: >>>>> >>>>>> Eugene, I ran the code and it works fine. I am very confident in >>>>>> this case. I appreciate you guys for the great work. >>>>>> >>>>>> The code supposed to show that Beam TextIO can read the double >>>>>> compressed files and write output without any processing. so ignored the >>>>>> processing steps. I agree with you the further processing is not easy in >>>>>> this case. >>>>>> >>>>>> >>>>>> import org.apache.beam.sdk.Pipeline; >>>>>> import org.apache.beam.sdk.io.TextIO; >>>>>> import org.apache.beam.sdk.options.PipelineOptions; >>>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>>> import org.apache.beam.sdk.transforms.DoFn; >>>>>> import org.apache.beam.sdk.transforms.ParDo; >>>>>> >>>>>> public class ReadCompressedTextFile { >>>>>> >>>>>> public static void main(String[] args) { >>>>>> PipelineOptions optios = >>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().create(); >>>>>> Pipeline p = Pipeline.create(optios); >>>>>> >>>>>> p.apply("ReadLines", >>>>>> TextIO.read().from("./dataset.tar.gz") >>>>>> >>>>>> ).apply(ParDo.of(new DoFn<String, String>(){ >>>>>> @ProcessElement >>>>>> public void processElement(ProcessContext c) { >>>>>> c.output(c.element()); >>>>>> // Just write the all content to "/tmp/filout/outputfile" >>>>>> } >>>>>> >>>>>> })) >>>>>> >>>>>> .apply(TextIO.write().to("/tmp/filout/outputfile")); >>>>>> >>>>>> p.run().waitUntilFinish(); >>>>>> } >>>>>> >>>>>> } >>>>>> >>>>>> The full code, data file & output contents are attached. >>>>>> >>>>>> thanks >>>>>> Saj >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> >>>>>> On 16 March 2018 at 16:56, Eugene Kirpichov <kirpic...@google.com> >>>>>> wrote: >>>>>> >>>>>>> Sajeevan - I'm quite confident that TextIO can handle .gz, but can >>>>>>> not handle properly .tar. Did you run this code? Did your test .tar.gz >>>>>>> file >>>>>>> contain multiple files? Did you obtain the expected output, identical to >>>>>>> the input except for order of lines? >>>>>>> (also, the ParDo in this code doesn't do anything - it outputs its >>>>>>> input - so it can be removed) >>>>>>> >>>>>>> On Fri, Mar 16, 2018 at 9:06 AM Sajeevan Achuthan < >>>>>>> achuthan.sajee...@gmail.com> wrote: >>>>>>> >>>>>>>> Hi Guys, >>>>>>>> >>>>>>>> The TextIo can handle the tar.gz type double compressed files. See >>>>>>>> the code test code. >>>>>>>> >>>>>>>> PipelineOptions optios = >>>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().create(); >>>>>>>> Pipeline p = Pipeline.create(optios); >>>>>>>> >>>>>>>> * p.apply("ReadLines", TextIO.read().from("/dataset.tar.gz"))* >>>>>>>> .apply(ParDo.of(new DoFn<String, String>(){ >>>>>>>> @ProcessElement >>>>>>>> public void processElement(ProcessContext c) { >>>>>>>> c.output(c.element()); >>>>>>>> } >>>>>>>> >>>>>>>> })) >>>>>>>> >>>>>>>> .apply(TextIO.write().to("/tmp/filout/outputfile")); >>>>>>>> >>>>>>>> p.run().waitUntilFinish(); >>>>>>>> >>>>>>>> Thanks >>>>>>>> /Saj >>>>>>>> >>>>>>>> On 16 March 2018 at 04:29, Pablo Estrada <pabl...@google.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi! >>>>>>>>> Quick questions: >>>>>>>>> - which sdk are you using? >>>>>>>>> - is this batch or streaming? >>>>>>>>> >>>>>>>>> As JB mentioned, TextIO is able to work with compressed files that >>>>>>>>> contain text. Nothing currently handles the double decompression that >>>>>>>>> I >>>>>>>>> believe you're looking for. >>>>>>>>> TextIO for Java is also able to"watch" a directory for new files. >>>>>>>>> If you're able to (outside of your pipeline) decompress your first >>>>>>>>> zip file >>>>>>>>> into a directory that your pipeline is watching, you may be able to >>>>>>>>> use >>>>>>>>> that as work around. Does that sound like a good thing? >>>>>>>>> Finally, if you want to implement a transform that does all your >>>>>>>>> logic, well then that sounds like SplittableDoFn material; and in that >>>>>>>>> case, someone that knows SDF better can give you guidance (or clarify >>>>>>>>> if my >>>>>>>>> suggestions are not correct). >>>>>>>>> Best >>>>>>>>> -P. >>>>>>>>> >>>>>>>>> On Thu, Mar 15, 2018, 8:09 PM Jean-Baptiste Onofré < >>>>>>>>> j...@nanthrax.net> wrote: >>>>>>>>> >>>>>>>>>> Hi >>>>>>>>>> >>>>>>>>>> TextIO supports compressed file. Do you want to read files in >>>>>>>>>> text ? >>>>>>>>>> >>>>>>>>>> Can you detail a bit the use case ? >>>>>>>>>> >>>>>>>>>> Thanks >>>>>>>>>> Regards >>>>>>>>>> JB >>>>>>>>>> Le 15 mars 2018, à 18:28, Shirish Jamthe <sjam...@google.com> a >>>>>>>>>> écrit: >>>>>>>>>>> >>>>>>>>>>> Hi, >>>>>>>>>>> >>>>>>>>>>> My input is a tar.gz or .zip file which contains thousands of >>>>>>>>>>> tar.gz files and other files. >>>>>>>>>>> I would lile to extract the tar.gz files from the tar. >>>>>>>>>>> >>>>>>>>>>> Is there a transform that can do that? I couldn't find one. >>>>>>>>>>> If not is it in works? Any pointers to start work on it? >>>>>>>>>>> >>>>>>>>>>> thanks >>>>>>>>>>> >>>>>>>>>> -- >>>>>>>>> Got feedback? go/pabloem-feedback >>>>>>>>> <https://goto.google.com/pabloem-feedback> >>>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>