[ 
https://issues.apache.org/jira/browse/BEAM-12202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17549261#comment-17549261
 ] 

Danny McCormick commented on BEAM-12202:
----------------------------------------

This issue has been migrated to https://github.com/apache/beam/issues/20868

> writeDynamic on EMR/Spark is not finishing writes to S3, leaves out temp files
> ------------------------------------------------------------------------------
>
>                 Key: BEAM-12202
>                 URL: https://issues.apache.org/jira/browse/BEAM-12202
>             Project: Beam
>          Issue Type: Bug
>          Components: io-java-files, runner-spark
>    Affects Versions: 2.14.0, 2.28.0
>            Reporter: Pawel Walczak
>            Priority: P3
>
> Filing a bug as suggested on 
> [StackOverflow|https://stackoverflow.com/questions/67163902/writedynamic-in-apache-beam-on-emr-spark-is-not-finishing-writes-to-s3-leaves-o?noredirect=1#comment118755306_67163902].
> I have a bounded PCollection and would like to persist the output to S3 
> bucket with dynamic file naming scheme. Unfortunately, when running on EMR & 
> Spark Runner (tried emr-6.2.0/Spark 3.0.1/Beam 2.28.0 and emr-5.30.1/Spark 
> 2.4.5/Beam 2.14.0), after all steps in the pipeline finish and cluster 
> terminates, output on S3 contains only some of expected contents in main 
> output directory, most of it though is placed in .temp-beam folder and never 
> moved to the main output dir. Most of it = 90% of expected lines are not 
> persisted in correctly named files and spot checks indicate the expected 
> lines are in files inside .temp-beam folder. 
> Here's a relevant pipeline declaration part:
> {noformat}
> PCollection<SomeObject> input; // is a bounded PCollection 
>  
> FileIO.Write<String, SomeObject> write = FileIO.<String, 
> SomeObject>writeDynamic()
>  .by(SomeObject::key)
>  .withDestinationCoder(StringUtf8Coder.of())
>  .withCompression(Compression.GZIP)
>  .withNaming((SerializableFunction<String, FileIO.Write.FileNaming>) key
>  -> (FileIO.Write.FileNaming) (window, pane, numShards, shardIndex, 
> compression)
>  -> String.format("some_object_%s_%d.csv.gz", key, shardIndex))
>  .via(Contextful.fn(SomeObject::toCsvLine), Contextful.fn(x -> 
> TextIO.sink().withHeader(SomeObject.HEADER)))
>  .to("s3://some-bucket/some-output-path");
> input.apply("write-a-pcollection", write);
> {noformat}
> With this code I get an S3 bucket that looks like:
> * some_object_key1_0.csv.gz
> * some_object_key1_1.csv.gz
> * some_object_key2_0.csv.gz
> * some_object_key3_0.csv.gz
> * .temp-beam-\<uuid> with 90% of the expected content persisted inside 
> objects named with random uuids
> However, when I add `.withIgnoreWindowing()` to the writeDynamic 
> configuration, output seems to be fully correct and no .temp-beam directory 
> is left over. This method is deprecated though and no replacement has been 
> ever provided in the javadocs (at least I couldn't find any). 
> Input PCollection does not contain any windowing and it does not solve the 
> issue, when I force global windowing right before the writeDynamic transform.
> This might be a bug in Beam or a consideration to leave out 
> `.withIgnoreWindowing()` in the future versions. Kindly please investigate 
> the issue.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to