[ https://issues.apache.org/jira/browse/BEAM-2490?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16061713#comment-16061713 ]
Guillermo RodrÃguez Cano commented on BEAM-2490: ------------------------------------------------ Hello again, as I commented before, and after fixing the shell's expansion I think I am having a similar issue, in both Dataflow and Direct runners. I am not sure if it is the glob operator or the combination with the gzip compression. I simplified my pipeline to emulate a simple grep of some JSON files this time: {code:none} with beam.Pipeline(options=pipeline_options) as p: raw_events = p | 'Read input' >> ReadFromText(known_args.input) events = raw_events | 'Generate events' >> beam.ParDo(ExtractEventsFn()) filtered_events = (events | 'Filter for a specific user' >> beam.Filter(lambda e: e['user'] == '123') | 'Filter for a specific video' >> beam.Filter(lambda e: e['video'] == '456') ) output = (filtered_events | 'Format output events' >> beam.Map(lambda e: '%s @ %s (%s - %s - %s)' % (datetime.fromtimestamp(e['timestamp']/1000).isoformat(), e['type'], e['user'], e['video'], e['device'])) | 'Write results' >> WriteToText(known_args.output) ) {code} When I run the pipeline with the input files decompressed with either the Direct or Dataflow runners I obtain the expected result (as compared with me parsing the input files in the command line with the grep command) while when I run the pipeline with the files compressed (gzip) I obtain, with both runners, a rather minimal subset (as in <2%) of the expected result. When running with Dataflow runner I was using the Google Cloud Storage, while with the Direct runner I was using my local hard-disk (I had to reduce the initial subset of files from 48 files to just 8 files where I know the result is generated from due to high swap memory used by the Python process in the laptop for the uncompressed scenario). The similarities with [~oliviernguyenquoc] are that the compressed files I am using are around the same size (200 MB, decompressed are about 1.5-2 GB) and text files (JSON in my case). Interestingly enough I also tried loading each file as a PCollection directly in the source code and then merge them with a Flatten transform. I got similar unsuccessful results with the Direct runner (I did not try with the Dataflow runner). Similar because the output was slightly different than when using the glob operator in the directory where the files are. It feels as if Apache Beam is sampling the files when they are gzip compressed > ReadFromText function is not taking all data with glob operator (*) > -------------------------------------------------------------------- > > Key: BEAM-2490 > URL: https://issues.apache.org/jira/browse/BEAM-2490 > Project: Beam > Issue Type: Bug > Components: sdk-py > Affects Versions: 2.0.0 > Environment: Usage with Google Cloud Platform: Dataflow runner > Reporter: Olivier NGUYEN QUOC > Assignee: Chamikara Jayalath > Fix For: Not applicable > > > I run a very simple pipeline: > * Read my files from Google Cloud Storage > * Split with '\n' char > * Write in on a Google Cloud Storage > I have 8 files that match with the pattern: > * my_files_2016090116_20160902_060051_xxxxxxxxxx.csv.gz (229.25 MB) > * my_files_2016090117_20160902_060051_xxxxxxxxxx.csv.gz (184.1 MB) > * my_files_2016090118_20160902_060051_xxxxxxxxxx.csv.gz (171.73 MB) > * my_files_2016090119_20160902_060051_xxxxxxxxxx.csv.gz (151.34 MB) > * my_files_2016090120_20160902_060051_xxxxxxxxxx.csv.gz (129.69 MB) > * my_files_2016090121_20160902_060051_xxxxxxxxxx.csv.gz (151.7 MB) > * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (346.46 MB) > * my_files_2016090122_20160902_060051_xxxxxxxxxx.csv.gz (222.57 MB) > This code should take them all: > {code:python} > beam.io.ReadFromText( > "gs://XXXX_folder1/my_files_20160901*.csv.gz", > skip_header_lines=1, > compression_type=beam.io.filesystem.CompressionTypes.GZIP > ) > {code} > It runs well but there is only a 288.62 MB file in output of this pipeline > (instead of a 1.5 GB file). > The whole pipeline code: > {code:python} > data = (p | 'ReadMyFiles' >> beam.io.ReadFromText( > "gs://XXXX_folder1/my_files_20160901*.csv.gz", > skip_header_lines=1, > compression_type=beam.io.filesystem.CompressionTypes.GZIP > ) > | 'SplitLines' >> beam.FlatMap(lambda x: x.split('\n')) > ) > output = ( > data| "Write" >> beam.io.WriteToText('gs://XXX_folder2/test.csv', > num_shards=1) > ) > {code} > Dataflow indicates me that the estimated size of the output after the > ReadFromText step is 602.29 MB only, which not correspond to any unique input > file size nor the overall file size matching with the pattern. -- This message was sent by Atlassian JIRA (v6.4.14#64029)