Reading can not be parallelized, but processing can be - so there is value in having our file-based sources automatically decompress .tar and .tar.gz. (also, I suspect that many people use Beam even for cases with a modest amount of data, that don't have or need parallelism, just for the sake of convenience of Beam's APIs and IOs)
On Fri, Mar 16, 2018 at 2:50 PM Chamikara Jayalath <chamik...@google.com> wrote: > FWIW, if you have a concat gzip file [1] TextIO and other file-based > sources should be able to read that. But we don't support tar files. Is it > possible to perform tar extraction before running the pipeline ? This step > probably cannot be parallelized. So not much value in performing within the > pipeline anyways (other than easy access to various file-systems). > > - Cham > > [1] > https://stackoverflow.com/questions/8005114/fast-concatenation-of-multiple-gzip-files > > > On Fri, Mar 16, 2018 at 12:26 PM Sajeevan Achuthan < > achuthan.sajee...@gmail.com> wrote: > >> Eugene - Yes, you are correct. I tried with a text file & Beam wordcount >> example. The TextIO reader reads some illegal characters as seen below. >> >> >> here’s: 1 >> addiction: 1 >> new: 1 >> we: 1 >> mood: 1 >> an: 1 >> incredible: 1 >> swings,: 1 >> known: 1 >> choices.: 1 >> ^@eachsaj^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@eachsaj^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@^@They’re: >> 1 >> already: 2 >> today: 1 >> the: 3 >> generation: 1 >> wordcount-00002 >> >> >> thanks >> Saj >> >> >> On 16 March 2018 at 17:45, Eugene Kirpichov <kirpic...@google.com> wrote: >> >>> To clarify: I think natively supporting .tar and .tar.gz would be quite >>> useful. I'm just saying that currently we don't. >>> >>> On Fri, Mar 16, 2018 at 10:44 AM Eugene Kirpichov <kirpic...@google.com> >>> wrote: >>> >>>> The code behaves as I expected, and the output is corrupt. >>>> Beam unzipped the .gz, but then interpreted the .tar as a text file, >>>> and split the .tar file by \n. >>>> E.g. the first file of the output starts with lines: >>>> A20171012.1145+0200-1200+0200_epg10-1_node.xml/0000755000175000017500000000000013252764467016513 >>>> 5ustar >>>> eachsajeachsajA20171012.1145+0200-1200+0200_epg10-1_node.xml/data0000644000175000017500000000360513252764467017353 >>>> 0ustar eachsajeachsaj<?xml version="1.0" encoding="UTF-8"?> >>>> >>>> which are clearly not the expected input. >>>> >>>> On Fri, Mar 16, 2018 at 10:39 AM Sajeevan Achuthan < >>>> achuthan.sajee...@gmail.com> wrote: >>>> >>>>> Eugene, I ran the code and it works fine. I am very confident in this >>>>> case. I appreciate you guys for the great work. >>>>> >>>>> The code supposed to show that Beam TextIO can read the double >>>>> compressed files and write output without any processing. so ignored the >>>>> processing steps. I agree with you the further processing is not easy in >>>>> this case. >>>>> >>>>> >>>>> import org.apache.beam.sdk.Pipeline; >>>>> import org.apache.beam.sdk.io.TextIO; >>>>> import org.apache.beam.sdk.options.PipelineOptions; >>>>> import org.apache.beam.sdk.options.PipelineOptionsFactory; >>>>> import org.apache.beam.sdk.transforms.DoFn; >>>>> import org.apache.beam.sdk.transforms.ParDo; >>>>> >>>>> public class ReadCompressedTextFile { >>>>> >>>>> public static void main(String[] args) { >>>>> PipelineOptions optios = >>>>> PipelineOptionsFactory.fromArgs(args).withValidation().create(); >>>>> Pipeline p = Pipeline.create(optios); >>>>> >>>>> p.apply("ReadLines", >>>>> TextIO.read().from("./dataset.tar.gz") >>>>> >>>>> ).apply(ParDo.of(new DoFn<String, String>(){ >>>>> @ProcessElement >>>>> public void processElement(ProcessContext c) { >>>>> c.output(c.element()); >>>>> // Just write the all content to "/tmp/filout/outputfile" >>>>> } >>>>> >>>>> })) >>>>> >>>>> .apply(TextIO.write().to("/tmp/filout/outputfile")); >>>>> >>>>> p.run().waitUntilFinish(); >>>>> } >>>>> >>>>> } >>>>> >>>>> The full code, data file & output contents are attached. >>>>> >>>>> thanks >>>>> Saj >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> On 16 March 2018 at 16:56, Eugene Kirpichov <kirpic...@google.com> >>>>> wrote: >>>>> >>>>>> Sajeevan - I'm quite confident that TextIO can handle .gz, but can >>>>>> not handle properly .tar. Did you run this code? Did your test .tar.gz >>>>>> file >>>>>> contain multiple files? Did you obtain the expected output, identical to >>>>>> the input except for order of lines? >>>>>> (also, the ParDo in this code doesn't do anything - it outputs its >>>>>> input - so it can be removed) >>>>>> >>>>>> On Fri, Mar 16, 2018 at 9:06 AM Sajeevan Achuthan < >>>>>> achuthan.sajee...@gmail.com> wrote: >>>>>> >>>>>>> Hi Guys, >>>>>>> >>>>>>> The TextIo can handle the tar.gz type double compressed files. See >>>>>>> the code test code. >>>>>>> >>>>>>> PipelineOptions optios = >>>>>>> PipelineOptionsFactory.fromArgs(args).withValidation().create(); >>>>>>> Pipeline p = Pipeline.create(optios); >>>>>>> >>>>>>> * p.apply("ReadLines", TextIO.read().from("/dataset.tar.gz"))* >>>>>>> .apply(ParDo.of(new DoFn<String, String>(){ >>>>>>> @ProcessElement >>>>>>> public void processElement(ProcessContext c) { >>>>>>> c.output(c.element()); >>>>>>> } >>>>>>> >>>>>>> })) >>>>>>> >>>>>>> .apply(TextIO.write().to("/tmp/filout/outputfile")); >>>>>>> >>>>>>> p.run().waitUntilFinish(); >>>>>>> >>>>>>> Thanks >>>>>>> /Saj >>>>>>> >>>>>>> On 16 March 2018 at 04:29, Pablo Estrada <pabl...@google.com> wrote: >>>>>>> >>>>>>>> Hi! >>>>>>>> Quick questions: >>>>>>>> - which sdk are you using? >>>>>>>> - is this batch or streaming? >>>>>>>> >>>>>>>> As JB mentioned, TextIO is able to work with compressed files that >>>>>>>> contain text. Nothing currently handles the double decompression that I >>>>>>>> believe you're looking for. >>>>>>>> TextIO for Java is also able to"watch" a directory for new files. >>>>>>>> If you're able to (outside of your pipeline) decompress your first zip >>>>>>>> file >>>>>>>> into a directory that your pipeline is watching, you may be able to use >>>>>>>> that as work around. Does that sound like a good thing? >>>>>>>> Finally, if you want to implement a transform that does all your >>>>>>>> logic, well then that sounds like SplittableDoFn material; and in that >>>>>>>> case, someone that knows SDF better can give you guidance (or clarify >>>>>>>> if my >>>>>>>> suggestions are not correct). >>>>>>>> Best >>>>>>>> -P. >>>>>>>> >>>>>>>> On Thu, Mar 15, 2018, 8:09 PM Jean-Baptiste Onofré <j...@nanthrax.net> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> Hi >>>>>>>>> >>>>>>>>> TextIO supports compressed file. Do you want to read files in text >>>>>>>>> ? >>>>>>>>> >>>>>>>>> Can you detail a bit the use case ? >>>>>>>>> >>>>>>>>> Thanks >>>>>>>>> Regards >>>>>>>>> JB >>>>>>>>> Le 15 mars 2018, à 18:28, Shirish Jamthe <sjam...@google.com> a >>>>>>>>> écrit: >>>>>>>>>> >>>>>>>>>> Hi, >>>>>>>>>> >>>>>>>>>> My input is a tar.gz or .zip file which contains thousands of >>>>>>>>>> tar.gz files and other files. >>>>>>>>>> I would lile to extract the tar.gz files from the tar. >>>>>>>>>> >>>>>>>>>> Is there a transform that can do that? I couldn't find one. >>>>>>>>>> If not is it in works? Any pointers to start work on it? >>>>>>>>>> >>>>>>>>>> thanks >>>>>>>>>> >>>>>>>>> -- >>>>>>>> Got feedback? go/pabloem-feedback >>>>>>>> <https://goto.google.com/pabloem-feedback> >>>>>>>> >>>>>>> >>>>>>> >>>>> >>