Hey,

Flink is super useful for Beam development, but I’m having trouble writing data 
to Parquet. Everything works fine on DirectRunner, DataflowRunner, and 
FlinkRunner against a local cluster (1.10.2). However, when I use FlinkRunner 
in embedded mode, only a subset of my data arrive on the file store. Local 
device is a Mac; Beam is 2.25.0; code is Java; runner flavour classic; Gist of 
code and log content here:
https://gist.github.com/chrishinds/aba17d314fa6561b0904ee783090947f

In this test I read ~8000 files, but I only get data back on ~1000 of them; it 
looks like I’ve received only one of what ought to have been several Parquet 
shards.

The logs also make me think this is what’s happening, multiple temp files 
getting copied to the same final filename:

Will copy temporary file 
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/10e28959-2db1-431f-835f-7b3192243852<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/10e28959-2db1-431f-835f-7b3192243852>,
 shard=0, 
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5, 
paneInfo=PaneInfo.NO_FIRING} to final location 
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file 
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/7ab5a9ab-b46f-4bdd-a5c0-9c6b1fdb4638<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/7ab5a9ab-b46f-4bdd-a5c0-9c6b1fdb4638>,
 shard=0, 
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5, 
paneInfo=PaneInfo.NO_FIRING} to final location 
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file 
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/2d3c495a-313b-4ca7-a398-5b32f321f4b7<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/2d3c495a-313b-4ca7-a398-5b32f321f4b7>,
 shard=0, 
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5, 
paneInfo=PaneInfo.NO_FIRING} to final location 
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file 
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/de23ceeb-04d1-4cb6-a0a9-0507c112b93a<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/de23ceeb-04d1-4cb6-a0a9-0507c112b93a>,
 shard=0, 
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5, 
paneInfo=PaneInfo.NO_FIRING} to final location 
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file 
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/a40489ce-7432-443b-9828-9511ab9db0cb<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/a40489ce-7432-443b-9828-9511ab9db0cb>,
 shard=0, 
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5, 
paneInfo=PaneInfo.NO_FIRING} to final location 
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file 
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/c3819566-4edc-4362-b4ca-55e2e2b70f4c<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/c3819566-4edc-4362-b4ca-55e2e2b70f4c>,
 shard=0, 
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5, 
paneInfo=PaneInfo.NO_FIRING} to final location 
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file 
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/3fd67eee-ea50-4b5b-bf3f-fe253dfcf3a4<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/3fd67eee-ea50-4b5b-bf3f-fe253dfcf3a4>,
 shard=0, 
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5, 
paneInfo=PaneInfo.NO_FIRING} to final location 
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>
Will copy temporary file 
FileResult{tempFilename=/Users/chrish/Source/github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/7d5e4096-f5ce-4a79-aec5-e5e4293bc601<http://github.com/wrhs/out/IngestTest/.temp-beam-656dba88-5dce-4697-9c63-805067cecdba/7d5e4096-f5ce-4a79-aec5-e5e4293bc601>,
 shard=0, 
window=org.apache.beam.sdk.transforms.windowing.GlobalWindow@2a3591c5, 
paneInfo=PaneInfo.NO_FIRING} to final location 
/Users/chrish/Source/github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet<http://github.com/wrhs/out/IngestTest/output-00000-of-00001.parquet>

I’m no Java pro, this seems like a catastrophic fail, but maybe I'm doing 
something unspeakably stupid?


Cheers,
Chris.


Reply via email to