[ 
https://issues.apache.org/jira/browse/BEAM-11341?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17548989#comment-17548989
 ] 

Danny McCormick commented on BEAM-11341:
----------------------------------------

This issue has been migrated to https://github.com/apache/beam/issues/20599

> Pipeline using GenerateSequence not working with SDF
> ----------------------------------------------------
>
>                 Key: BEAM-11341
>                 URL: https://issues.apache.org/jira/browse/BEAM-11341
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-flink
>    Affects Versions: 2.25.0
>            Reporter: Kyle Weaver
>            Priority: P3
>
> Originally reported by Tao Li on the mailing list: 
> https://lists.apache.org/thread.html/r20ec55733a0b54018e23e43fe2ca3359b5adf7ad1f98c5ff7a35254a%40%3Cuser.beam.apache.org%3E
> ----
> I am running into a problem with 
> “org.apache.beam:beam-runners-flink-1.11:2.25.0” and 
> “org.apache.beam:beam-runners-flink-1.10:2.25.0”. I am doing some local 
> testing with the flink runners in embedded mode. The problem is that I cannot 
> save data into local files using those artifact versions. However when I 
> switched to “org.apache.beam:beam-runners-flink-1.10:2.24.0”, it worked fine 
> and output files were saved successfully.
>  
> I am basically generating unbounded data in memory using GenerateSequence 
> transform and saving it into local files. Here is the code that generates 
> unlimited data in memory:
>  
> Pipeline.apply(GenerateSequence.from(0).withRate(1, new Duration(10)))
> .apply(Window.into[java.lang.Long](FixedWindows.of(Duration.standardSeconds(1))))
>  
> I compared the logs and noticed that there is no write operation found in the 
> logs with “beam-runners-flink-1.11:2.25.0” and 
> “beam-runners-flink-1.10:2.25.0”. With the working version 
> “beam-runners-flink-1.10:2.24.0”, I could find below logs that was obviously 
> doing the write operation:
>  
> [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)
>  -> 
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) 
> -> 
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair 
> with random key/ParMultiDo(AssignShard) (9/12)] INFO 
> org.apache.beam.sdk.io.WriteFiles - Finalizing 1 file results
> [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)
>  -> 
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) 
> -> 
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair 
> with random key/ParMultiDo(AssignShard) (9/12)] INFO 
> org.apache.beam.sdk.io.FileBasedSink - Will copy temporary file 
> FileResult{tempFilename=/Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2,
>  shard=2, window=[2020-11-24T01:33:59.000Z..2020-11-24T01:34:00.000Z), 
> paneInfo=PaneInfo{isFirst=true, isLast=true, timing=ON_TIME, index=0, 
> onTimeIndex=0}} to final location 
> /Users/taol/data/output/output-2020-11-24T01:33:59.000Z-2020-11-24T01:34:00.000Z-00002-of-00010.parquet
> [FileIO.Write/WriteFiles/GatherTempFileResults/Reshuffle.ViaRandomKey/Values/Values/Map/ParMultiDo(Anonymous)
>  -> 
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Finalize/ParMultiDo(Finalize) 
> -> 
> FileIO.Write/WriteFiles/FinalizeTempFileBundles/Reshuffle.ViaRandomKey/Pair 
> with random key/ParMultiDo(AssignShard) (9/12)] INFO 
> org.apache.beam.sdk.io.FileBasedSink - Will remove known temporary file 
> /Users/taol/data/output/.temp-beam-819dbd7c-b9f7-4c8c-9d8b-20091d2eef94/010abb5e-92b0-4e95-a85d-30984e769fe2



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to