[ 
https://issues.apache.org/jira/browse/FLINK-26427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Martijn Visser closed FLINK-26427.
----------------------------------
    Resolution: Resolved

Thanks for confirming that [~johnboy84] - Closing this ticket

> Streaming File Sink Uploading Smaller Versions Of The Same Part File To S3 
> (Race Condition)
> -------------------------------------------------------------------------------------------
>
>                 Key: FLINK-26427
>                 URL: https://issues.apache.org/jira/browse/FLINK-26427
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.13.1
>            Reporter: JOHN ERVINE
>            Priority: Blocker
>
> I'm experiencing some odd behaviour when writing ORC files to S3 using flinks 
> Streaming File Sink.
>  
>  
> {code:java}
> // set up the streaming execution environment
> final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.enableCheckpointing(PARAMETER_TOOL_CONFIG.getInt("flink.checkpoint.frequency.ms"),
>  CheckpointingMode.EXACTLY_ONCE);
> env.getConfig().enableObjectReuse();
> Properties writerProperties = new Properties();
> writerProperties.put("orc.compress", "SNAPPY");
> //Order Book Sink
> StreamingFileSink<ArmadaRow> orderBookSink = StreamingFileSink
> .forBulkFormat(new Path(PARAMETER_TOOL_CONFIG.get("order.book.sink")),
> new OrcBulkWriterFactory<>(new 
> OrderBookRowVectorizer(F_MD_ORDER_BOOK_GLOBEX_SCHEMA), writerProperties, new 
> Configuration()))
> .withBucketAssigner(new OrderBookBucketingAssigner())
> .withRollingPolicy(OnCheckpointRollingPolicy.build())
> .build();{code}
>  
> I noticed when running queries during ingest of the data, that my row counts 
> were being decremented as the job progressed. I've had a look at S3 and I can 
> seem multiple versions of the same part file. The example below shows part 
> file 15-7 has two versions. The first file is 20.7mb and the last file that's 
> committed is smaller at 5.1mb. In most cases the current file is normally 
> larger but in my instance there are a few examples in the screenshot below 
> where this is not the case.
>  
> !https://i.stack.imgur.com/soU4b.png|width=2173,height=603!
>  
> This looks like a typical race condition or failure to upload commits to S3 
> successfully because the log below shows two commits for the same file very 
> close together. The last commit is at 20:44 but the last modified date in S3 
> is at 20:43. I don't see any logs indicating a failure to commit. This is 
> currently a blocker for us.
>  
> {code:java}
> 2022-02-28T20:44:03.526+0000 INFO  APP=${sys:AppID} COMP=${sys:CompID} 
> APPNAME=${sys:AppName} S3Committer:64 - Committing 
> staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
>  with MPU ID 
> vVhVRh5XtEDmJNrqBCAp.4vcS34FBGoQQjPsE64kBmhkSJJB8T7ZY9codF994n7FBUquF_ls9oFxwoYPl5ZHfP0rkQgJ7aPmHzlB8omIH2ZFbeFNHbXpYS27U9Gl7LOMcEhlekMog4D2eeYUUjr9oA--
> 2022-02-28T20:44:03.224+0000 INFO  APP=${sys:AppID} COMP=${sys:CompID} 
> APPNAME=${sys:AppName} S3Committer:64 - Committing 
> staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7
>  with MPU ID 
> jPnNvBwHtiRBLdDbH6W7duV2Fx1lxsOsPV4IfskMkPygpuVXF9DWsp4xZGxejI8mEVbcrIqF6hC9Tff9IzciK0lMUkTNrXHfRfG3tgkMwbX35T3chbXRN8Tjl0tsUF.oSBhgrGFpKxRxyi3CjRknxA--{code}
>  
> {{ }}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to