Hello,

I have a Flink workflow where I need to upload the output data into a legacy 
SQL Server database and so I have read the section in the Flink book about data 
sinks and utilizing the GenericWriteAheadSink base class. I am currently using 
Flink 1.12.3 although we plan to upgrade to 1.14 shortly.

Firstly, given I will be generating a large amount of data I feel it best to 
use the GenericWriteAheadSink base class so I can bulk copy all the data into 
my SQL Server database rather than attempt a row by row insertion which would 
be too slow. Hopefully this is a good use case for this class or is there now a 
better approach?

Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the 
program actually exists before a final checkpoint is taken so I miss many of 
the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC 
source to exit. This might be related to FLINK-21215 as I see the following 
error:
org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source 
(1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)
With the extra Thread.sleep(5000) I see all the rows handled by the 
sendValues() method.

I have included the test code below which just logs the "insertions" for now 
(and doesn't do real db access) but demonstrates the problem:

private void checkpointTest() throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    env.setParallelism(1);
    env.enableCheckpointing(500);

    MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, 
fromDttm, toDttm, asOf);
    DataStream<MyObj> jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC 
Source");

    jdbcTradesStreamIn.transform("SqlServerSink", 
TypeInformation.of(MyObj.class), new SqlServerBulkCopySink(
            new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
            TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new 
ExecutionConfig()),
            UUID.randomUUID().toString()));


    env.execute();
}

private static class SqlServerBulkCopySink extends GenericWriteAheadSink<MyObj> 
{
    public SqlServerBulkCopySink(CheckpointCommitter committer, 
TypeSerializer<MyObj> serializer, String jobID) throws Exception {
        super(committer, serializer, jobID);
    }

    @Override
    protected boolean sendValues(Iterable<MyObj> objects, long checkpointId, 
long timestamp) {
        logger.info("Sending 
{},{}-----------------------------------------------", checkpointId, timestamp);
        for (MyObj myObj: objects)
            logger.info("  {},{}: {}", checkpointId, timestamp, trade); // this 
will eventually be a bulk copy insert into the SQL Server database
        return true;
    }
}



Am I right in thinking the latest versions of Flink will not suffer from this 
problem or am I hitting something else? To be clear, I am expecting a 
checkpoint to be invoked by Flink to cover all the data I want to insert into 
my DB - how else would I do the final bulk copy if my sendValues() is not 
called?


I have more questions about my data sink but I will wait to hear your answers.


Many thanks in advance,


James.

Reply via email to