Hello, I have a Flink workflow where I need to upload the output data into a legacy SQL Server database and so I have read the section in the Flink book about data sinks and utilizing the GenericWriteAheadSink base class. I am currently using Flink 1.12.3 although we plan to upgrade to 1.14 shortly.
Firstly, given I will be generating a large amount of data I feel it best to use the GenericWriteAheadSink base class so I can bulk copy all the data into my SQL Server database rather than attempt a row by row insertion which would be too slow. Hopefully this is a good use case for this class or is there now a better approach? Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the program actually exists before a final checkpoint is taken so I miss many of the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC source to exit. This might be related to FLINK-21215 as I see the following error: org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready) With the extra Thread.sleep(5000) I see all the rows handled by the sendValues() method. I have included the test code below which just logs the "insertions" for now (and doesn't do real db access) but demonstrates the problem: private void checkpointTest() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); env.enableCheckpointing(500); MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, fromDttm, toDttm, asOf); DataStream<MyObj> jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC Source"); jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.of(MyObj.class), new SqlServerBulkCopySink( new FileCheckpointCommitter("c:\\temp\\FlinkTemp"), TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new ExecutionConfig()), UUID.randomUUID().toString())); env.execute(); } private static class SqlServerBulkCopySink extends GenericWriteAheadSink<MyObj> { public SqlServerBulkCopySink(CheckpointCommitter committer, TypeSerializer<MyObj> serializer, String jobID) throws Exception { super(committer, serializer, jobID); } @Override protected boolean sendValues(Iterable<MyObj> objects, long checkpointId, long timestamp) { logger.info("Sending {},{}-----------------------------------------------", checkpointId, timestamp); for (MyObj myObj: objects) logger.info(" {},{}: {}", checkpointId, timestamp, trade); // this will eventually be a bulk copy insert into the SQL Server database return true; } } Am I right in thinking the latest versions of Flink will not suffer from this problem or am I hitting something else? To be clear, I am expecting a checkpoint to be invoked by Flink to cover all the data I want to insert into my DB - how else would I do the final bulk copy if my sendValues() is not called? I have more questions about my data sink but I will wait to hear your answers. Many thanks in advance, James.