Hi! Thanks Austin for the answer. I agree that FLIP-147 has solved the problem, just set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true to enable this feature.
JDBC sinks solves this problem in a different way. It flushes the sink when closed (see JdbcOutputFormat#close [1]). But please note that unlike GenericWriteAheadSink, JdbcOutputFormat flushes data once its buffer threshold is reached, so users might see inconsistent data after a checkpoint failure and before the job finishes (it requires a primary key for the sink to reach the eventual consistency). [1] https://github.com/apache/flink/blob/d502353acd03428b9befa4ec970191b757b6c8c3/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java#L253 Austin Cawley-Edwards <austin.caw...@gmail.com> 于2021年11月5日周五 上午2:47写道: > Hi James, > > You are correct that since Flink 1.14 [1] (which included FLIP-147 [2]) > there is support for checkpointing after some tasks has finished, which > sounds like it will solve this use case. > > You may also want to look at the JDBC sink[3] which also supports > batching, as well as some other nice things like retries and batch > intervals. > > Hope that helps, > Austin > > > [1]: > https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams > [2]: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished > [3]: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/ > > On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine <jas...@hotmail.com> > wrote: > >> Hello, >> >> I have a Flink workflow where I need to upload the output data into a >> legacy SQL Server database and so I have read the section in the Flink book >> about data sinks and utilizing the GenericWriteAheadSink base class. I >> am currently using Flink 1.12.3 although we plan to upgrade to 1.14 >> shortly. >> >> Firstly, given I will be generating a large amount of data I feel it best >> to use the GenericWriteAheadSink base class so I can bulk copy all the data >> into my SQL Server database rather than attempt a row by row insertion >> which would be too slow. Hopefully this is a good use case for this class >> or is there now a better approach? >> >> Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but >> the program actually exists before a final checkpoint is taken so I miss >> many of the final rows - I have to put in a Thread.sleep(5000) before >> allowing the JDBC source to exit. This might be related to FLINK-21215 as I >> see the following error: >> *org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC >> Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)* >> With the extra Thread.sleep(5000) I see all the rows handled by the >> sendValues() method. >> >> I have included the test code below which just logs the "insertions" for >> now (and doesn't do real db access) but demonstrates the problem: >> >> private void checkpointTest() throws Exception { >> Configuration conf = new Configuration(); >> conf.setBoolean(ConfigConstants.*LOCAL_START_WEBSERVER*, true); >> final StreamExecutionEnvironment env = StreamExecutionEnvironment. >> *createLocalEnvironmentWithWebUI*(conf); >> env.setParallelism(1); >> env.enableCheckpointing(500); >> >> >> MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, >> fromDttm, toDttm, asOf); >> DataStream<MyObj> jdbcStreamIn = env.addSource(myJDBCSource, "My >> JDBC Source"); >> >> jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.*of* >> (MyObj.class), new SqlServerBulkCopySink( >> new FileCheckpointCommitter("c:\\temp\\FlinkTemp"), >> TypeExtractor.*createTypeInfo*(MyObj.class).createSerializer(new >> ExecutionConfig()), >> UUID.*randomUUID*().toString())); >> >> env.execute(); >> } >> >> private static class SqlServerBulkCopySink extends >> GenericWriteAheadSink<MyObj> >> { >> public SqlServerBulkCopySink(CheckpointCommitter committer, >> TypeSerializer<MyObj> >> serializer, String jobID) throws Exception { >> super(committer, serializer, jobID); >> } >> >> @Override >> protected boolean sendValues(Iterable<MyObj> objects, long >> checkpointId, long timestamp) { >> *logger*.info("Sending >> {},{}-----------------------------------------------", checkpointId, >> timestamp); >> for (MyObj myObj: objects) >> *logger*.info(" {},{}: {}", checkpointId, timestamp, trade); >> // this will eventually be a bulk copy insert into the SQL Server database >> return true; >> } >> } >> >> >> >> Am I right in thinking the latest versions of Flink will not suffer from >> this problem or am I hitting something else? To be clear, I am expecting a >> checkpoint to be invoked by Flink to cover all the data I want to insert >> into my DB - how else would I do the final bulk copy if my sendValues() is >> not called? >> >> >> I have more questions about my data sink but I will wait to hear your >> answers. >> >> >> Many thanks in advance, >> >> >> James. >> >> >>