[ https://issues.apache.org/jira/browse/FLINK-26427?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Martijn Visser closed FLINK-26427. ---------------------------------- Resolution: Resolved Thanks for confirming that [~johnboy84] - Closing this ticket > Streaming File Sink Uploading Smaller Versions Of The Same Part File To S3 > (Race Condition) > ------------------------------------------------------------------------------------------- > > Key: FLINK-26427 > URL: https://issues.apache.org/jira/browse/FLINK-26427 > Project: Flink > Issue Type: Bug > Components: API / DataStream > Affects Versions: 1.13.1 > Reporter: JOHN ERVINE > Priority: Blocker > > I'm experiencing some odd behaviour when writing ORC files to S3 using flinks > Streaming File Sink. > > > {code:java} > // set up the streaming execution environment > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.enableCheckpointing(PARAMETER_TOOL_CONFIG.getInt("flink.checkpoint.frequency.ms"), > CheckpointingMode.EXACTLY_ONCE); > env.getConfig().enableObjectReuse(); > Properties writerProperties = new Properties(); > writerProperties.put("orc.compress", "SNAPPY"); > //Order Book Sink > StreamingFileSink<ArmadaRow> orderBookSink = StreamingFileSink > .forBulkFormat(new Path(PARAMETER_TOOL_CONFIG.get("order.book.sink")), > new OrcBulkWriterFactory<>(new > OrderBookRowVectorizer(F_MD_ORDER_BOOK_GLOBEX_SCHEMA), writerProperties, new > Configuration())) > .withBucketAssigner(new OrderBookBucketingAssigner()) > .withRollingPolicy(OnCheckpointRollingPolicy.build()) > .build();{code} > > I noticed when running queries during ingest of the data, that my row counts > were being decremented as the job progressed. I've had a look at S3 and I can > seem multiple versions of the same part file. The example below shows part > file 15-7 has two versions. The first file is 20.7mb and the last file that's > committed is smaller at 5.1mb. In most cases the current file is normally > larger but in my instance there are a few examples in the screenshot below > where this is not the case. > > !https://i.stack.imgur.com/soU4b.png|width=2173,height=603! > > This looks like a typical race condition or failure to upload commits to S3 > successfully because the log below shows two commits for the same file very > close together. The last commit is at 20:44 but the last modified date in S3 > is at 20:43. I don't see any logs indicating a failure to commit. This is > currently a blocker for us. > > {code:java} > 2022-02-28T20:44:03.526+0000 INFO APP=${sys:AppID} COMP=${sys:CompID} > APPNAME=${sys:AppName} S3Committer:64 - Committing > staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7 > with MPU ID > vVhVRh5XtEDmJNrqBCAp.4vcS34FBGoQQjPsE64kBmhkSJJB8T7ZY9codF994n7FBUquF_ls9oFxwoYPl5ZHfP0rkQgJ7aPmHzlB8omIH2ZFbeFNHbXpYS27U9Gl7LOMcEhlekMog4D2eeYUUjr9oA-- > 2022-02-28T20:44:03.224+0000 INFO APP=${sys:AppID} COMP=${sys:CompID} > APPNAME=${sys:AppName} S3Committer:64 - Committing > staging/marketdata/t_stg_globex_order_book_test2/cyc_dt=2021-11-15/inst_exch_mrkt_id=XNYM/inst_ast_sub_clas=Energy/part-15-7 > with MPU ID > jPnNvBwHtiRBLdDbH6W7duV2Fx1lxsOsPV4IfskMkPygpuVXF9DWsp4xZGxejI8mEVbcrIqF6hC9Tff9IzciK0lMUkTNrXHfRfG3tgkMwbX35T3chbXRN8Tjl0tsUF.oSBhgrGFpKxRxyi3CjRknxA--{code} > > {{ }} -- This message was sent by Atlassian Jira (v8.20.1#820001)