Hi James, You are correct that since Flink 1.14 [1] (which included FLIP-147 [2]) there is support for checkpointing after some tasks has finished, which sounds like it will solve this use case.
You may also want to look at the JDBC sink[3] which also supports batching, as well as some other nice things like retries and batch intervals. Hope that helps, Austin [1]: https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams [2]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished [3]: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/ On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine <jas...@hotmail.com> wrote: > Hello, > > I have a Flink workflow where I need to upload the output data into a > legacy SQL Server database and so I have read the section in the Flink book > about data sinks and utilizing the GenericWriteAheadSink base class. I am > currently using Flink 1.12.3 although we plan to upgrade to 1.14 shortly. > > Firstly, given I will be generating a large amount of data I feel it best > to use the GenericWriteAheadSink base class so I can bulk copy all the data > into my SQL Server database rather than attempt a row by row insertion > which would be too slow. Hopefully this is a good use case for this class > or is there now a better approach? > > Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the > program actually exists before a final checkpoint is taken so I miss many > of the final rows - I have to put in a Thread.sleep(5000) before allowing > the JDBC source to exit. This might be related to FLINK-21215 as I see the > following error: > *org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC > Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)* > With the extra Thread.sleep(5000) I see all the rows handled by the > sendValues() method. > > I have included the test code below which just logs the "insertions" for > now (and doesn't do real db access) but demonstrates the problem: > > private void checkpointTest() throws Exception { > Configuration conf = new Configuration(); > conf.setBoolean(ConfigConstants.*LOCAL_START_WEBSERVER*, true); > final StreamExecutionEnvironment env = StreamExecutionEnvironment. > *createLocalEnvironmentWithWebUI*(conf); > env.setParallelism(1); > env.enableCheckpointing(500); > > > MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, > fromDttm, toDttm, asOf); > DataStream<MyObj> jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC > Source"); > > jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.*of* > (MyObj.class), new SqlServerBulkCopySink( > new FileCheckpointCommitter("c:\\temp\\FlinkTemp"), > TypeExtractor.*createTypeInfo*(MyObj.class).createSerializer(new > ExecutionConfig()), > UUID.*randomUUID*().toString())); > > env.execute(); > } > > private static class SqlServerBulkCopySink extends > GenericWriteAheadSink<MyObj> > { > public SqlServerBulkCopySink(CheckpointCommitter committer, > TypeSerializer<MyObj> > serializer, String jobID) throws Exception { > super(committer, serializer, jobID); > } > > @Override > protected boolean sendValues(Iterable<MyObj> objects, long > checkpointId, long timestamp) { > *logger*.info("Sending > {},{}-----------------------------------------------", checkpointId, > timestamp); > for (MyObj myObj: objects) > *logger*.info(" {},{}: {}", checkpointId, timestamp, trade); // > this will eventually be a bulk copy insert into the SQL Server database > return true; > } > } > > > > Am I right in thinking the latest versions of Flink will not suffer from > this problem or am I hitting something else? To be clear, I am expecting a > checkpoint to be invoked by Flink to cover all the data I want to insert > into my DB - how else would I do the final bulk copy if my sendValues() is > not called? > > > I have more questions about my data sink but I will wait to hear your > answers. > > > Many thanks in advance, > > > James. > > >