[ 
https://issues.apache.org/jira/browse/BEAM-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16061713#comment-16061713
 ] 

Guillermo Rodríguez Cano commented on BEAM-2490:
------------------------------------------------

  Hello again,

 as I commented before, and after fixing the shell's expansion I think I am 
having a similar issue, in both Dataflow and Direct runners. I am not sure if 
it is the glob operator or the combination with the gzip compression.
I simplified my pipeline to emulate a simple grep of some JSON files this time:

{code:none}
    with beam.Pipeline(options=pipeline_options) as p:
        raw_events = p | 'Read input' >> ReadFromText(known_args.input)
        
        events = raw_events | 'Generate events' >> beam.ParDo(ExtractEventsFn())

        filtered_events = (events
                           | 'Filter for a specific user' >> beam.Filter(lambda 
e: e['user'] == '123')
                           | 'Filter for a specific video' >> 
beam.Filter(lambda e: e['video'] == '456')
                          )

        output = (filtered_events 
                  | 'Format output events' >> beam.Map(lambda e: '%s @ %s (%s - 
%s - %s)' % (datetime.fromtimestamp(e['timestamp']/1000).isoformat(), 
e['type'], e['user'], e['video'], e['device']))
                  | 'Write results' >> WriteToText(known_args.output)
                 )
{code}

When I run the pipeline with the input files decompressed with either the 
Direct or Dataflow runners I obtain the expected result (as compared with me 
parsing the input files in the command line with the grep command) while when I 
run the pipeline with the files compressed (gzip) I obtain, with both runners, 
a rather minimal subset (as in <2%) of the expected result.
When running with Dataflow runner I was using the Google Cloud Storage, while 
with the Direct runner I was using my local hard-disk (I had to reduce the 
initial subset of files from 48 files to just 8 files where I know the result 
is generated from due to high swap memory used by the Python process in the 
laptop for the uncompressed scenario).

The similarities with [~oliviernguyenquoc] are that the compressed files I am 
using are around the same size (200 MB, decompressed are about 1.5-2 GB) and 
text files (JSON in my case).

Interestingly enough I also tried loading each file as a PCollection directly 
in the source code and then merge them with a Flatten transform. I got similar 
unsuccessful results with the Direct runner (I did not try with the Dataflow 
runner). Similar because the output was slightly different than when using the 
glob operator in the directory where the files are.

It feels as if Apache Beam is sampling the files when they are gzip compressed

> ReadFromText function is not taking all data with glob operator (*) 
> --------------------------------------------------------------------
>
>                 Key: BEAM-2490
>                 URL: https://issues.apache.org/jira/browse/BEAM-2490
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-py
>    Affects Versions: 2.0.0
>         Environment: Usage with Google Cloud Platform: Dataflow runner
>            Reporter: Olivier NGUYEN QUOC
>            Assignee: Chamikara Jayalath
>             Fix For: Not applicable
>
>
> I run a very simple pipeline:
> * Read my files from Google Cloud Storage
> * Split with '\n' char
> * Write in on a Google Cloud Storage
> I have 8 files that match with the pattern:
> * my_files_2016090116_20160902_060051_xxxxxxxxxx.csv.gz (229.25 MB)
> * my_files_2016090117_20160902_060051_xxxxxxxxxx.csv.gz (184.1 MB)
> * my_files_2016090118_20160902_060051_xxxxxxxxxx.csv.gz (171.73 MB)
> * my_files_2016090119_20160902_060051_xxxxxxxxxx.csv.gz (151.34 MB)
> * my_files_2016090120_20160902_060051_xxxxxxxxxx.csv.gz (129.69 MB)
> * my_files_2016090121_20160902_060051_xxxxxxxxxx.csv.gz (151.7 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (346.46 MB)
> * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (222.57 MB)
> This code should take them all:
> {code:python}
> beam.io.ReadFromText(
>       "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>       skip_header_lines=1,
>       compression_type=beam.io.filesystem.CompressionTypes.GZIP
>       )
> {code}
> It runs well but there is only a 288.62 MB file in output of this pipeline 
> (instead of a 1.5 GB file).
> The whole pipeline code:
> {code:python}
> data = (p | 'ReadMyFiles' >> beam.io.ReadFromText(
>           "gs://XXXX_folder1/my_files_20160901*.csv.gz",
>           skip_header_lines=1,
>           compression_type=beam.io.filesystem.CompressionTypes.GZIP
>           )
>                        | 'SplitLines' >> beam.FlatMap(lambda x: x.split('\n'))
>                     )
> output = (
>           data| "Write" >> beam.io.WriteToText('gs://XXX_folder2/test.csv', 
> num_shards=1)
>             )
> {code}
> Dataflow indicates me that the estimated size         of the output after the 
> ReadFromText step is 602.29 MB only, which not correspond to any unique input 
> file size nor the overall file size matching with the pattern.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to