Hi James,

You are correct that since Flink 1.14 [1] (which included FLIP-147 [2])
there is support for checkpointing after some tasks has finished, which
sounds like it will solve this use case.

You may also want to look at the JDBC sink[3] which also supports batching,
as well as some other nice things like retries and batch intervals.

Hope that helps,
Austin


[1]:
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
[2]:
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
[3]:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/

On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine <jas...@hotmail.com>
wrote:

> Hello,
>
> I have a Flink workflow where I need to upload the output data into a
> legacy SQL Server database and so I have read the section in the Flink book
> about data sinks and utilizing the GenericWriteAheadSink base class. I am
> currently using Flink 1.12.3 although we plan to upgrade to 1.14 shortly.
>
> Firstly, given I will be generating a large amount of data I feel it best
> to use the GenericWriteAheadSink base class so I can bulk copy all the data
> into my SQL Server database rather than attempt a row by row insertion
> which would be too slow. Hopefully this is a good use case for this class
> or is there now a better approach?
>
> Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the
> program actually exists before a final checkpoint is taken so I miss many
> of the final rows - I have to put in a Thread.sleep(5000) before allowing
> the JDBC source to exit. This might be related to FLINK-21215 as I see the
> following error:
> *org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC
> Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)*
> With the extra Thread.sleep(5000) I see all the rows handled by the
> sendValues() method.
>
> I have included the test code below which just logs the "insertions" for
> now (and doesn't do real db access) but demonstrates the problem:
>
> private void checkpointTest() throws Exception {
>     Configuration conf = new Configuration();
>     conf.setBoolean(ConfigConstants.*LOCAL_START_WEBSERVER*, true);
>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.
> *createLocalEnvironmentWithWebUI*(conf);
>     env.setParallelism(1);
>     env.enableCheckpointing(500);
>
>
>     MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams,
> fromDttm, toDttm, asOf);
>     DataStream<MyObj> jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC
> Source");
>
>     jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.*of*
> (MyObj.class), new SqlServerBulkCopySink(
>             new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
>             TypeExtractor.*createTypeInfo*(MyObj.class).createSerializer(new
> ExecutionConfig()),
>             UUID.*randomUUID*().toString()));
>
>     env.execute();
> }
>
> private static class SqlServerBulkCopySink extends 
> GenericWriteAheadSink<MyObj>
> {
>     public SqlServerBulkCopySink(CheckpointCommitter committer, 
> TypeSerializer<MyObj>
> serializer, String jobID) throws Exception {
>         super(committer, serializer, jobID);
>     }
>
>     @Override
>     protected boolean sendValues(Iterable<MyObj> objects, long
> checkpointId, long timestamp) {
>         *logger*.info("Sending
> {},{}-----------------------------------------------", checkpointId,
> timestamp);
>         for (MyObj myObj: objects)
>             *logger*.info("  {},{}: {}", checkpointId, timestamp, trade); //
> this will eventually be a bulk copy insert into the SQL Server database
>         return true;
>     }
> }
>
>
>
> Am I right in thinking the latest versions of Flink will not suffer from
> this problem or am I hitting something else? To be clear, I am expecting a
> checkpoint to be invoked by Flink to cover all the data I want to insert
> into my DB - how else would I do the final bulk copy if my sendValues() is
> not called?
>
>
> I have more questions about my data sink but I will wait to hear your
> answers.
>
>
> Many thanks in advance,
>
>
> James.
>
>
>

Reply via email to