Hi!

Thanks Austin for the answer. I agree that FLIP-147 has solved the problem,
just set execution.checkpointing.checkpoints-after-tasks-finish.enabled to
true to enable this feature.

JDBC sinks solves this problem in a different way. It flushes the sink when
closed (see JdbcOutputFormat#close [1]). But please note that
unlike GenericWriteAheadSink, JdbcOutputFormat flushes data once its buffer
threshold is reached, so users might see inconsistent data after a
checkpoint failure and before the job finishes (it requires a primary key
for the sink to reach the eventual consistency).

[1]
https://github.com/apache/flink/blob/d502353acd03428b9befa4ec970191b757b6c8c3/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java#L253

Austin Cawley-Edwards <austin.caw...@gmail.com> 于2021年11月5日周五 上午2:47写道:

> Hi James,
>
> You are correct that since Flink 1.14 [1] (which included FLIP-147 [2])
> there is support for checkpointing after some tasks has finished, which
> sounds like it will solve this use case.
>
> You may also want to look at the JDBC sink[3] which also supports
> batching, as well as some other nice things like retries and batch
> intervals.
>
> Hope that helps,
> Austin
>
>
> [1]:
> https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
> [2]:
> https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
> [3]:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/
>
> On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine <jas...@hotmail.com>
> wrote:
>
>> Hello,
>>
>> I have a Flink workflow where I need to upload the output data into a
>> legacy SQL Server database and so I have read the section in the Flink book
>> about data sinks and utilizing the GenericWriteAheadSink base class. I
>> am currently using Flink 1.12.3 although we plan to upgrade to 1.14
>> shortly.
>>
>> Firstly, given I will be generating a large amount of data I feel it best
>> to use the GenericWriteAheadSink base class so I can bulk copy all the data
>> into my SQL Server database rather than attempt a row by row insertion
>> which would be too slow. Hopefully this is a good use case for this class
>> or is there now a better approach?
>>
>> Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but
>> the program actually exists before a final checkpoint is taken so I miss
>> many of the final rows - I have to put in a Thread.sleep(5000) before
>> allowing the JDBC source to exit. This might be related to FLINK-21215 as I
>> see the following error:
>> *org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC
>> Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)*
>> With the extra Thread.sleep(5000) I see all the rows handled by the
>> sendValues() method.
>>
>> I have included the test code below which just logs the "insertions" for
>> now (and doesn't do real db access) but demonstrates the problem:
>>
>> private void checkpointTest() throws Exception {
>>     Configuration conf = new Configuration();
>>     conf.setBoolean(ConfigConstants.*LOCAL_START_WEBSERVER*, true);
>>     final StreamExecutionEnvironment env = StreamExecutionEnvironment.
>> *createLocalEnvironmentWithWebUI*(conf);
>>     env.setParallelism(1);
>>     env.enableCheckpointing(500);
>>
>>
>>     MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams,
>> fromDttm, toDttm, asOf);
>>     DataStream<MyObj> jdbcStreamIn = env.addSource(myJDBCSource, "My
>> JDBC Source");
>>
>>     jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.*of*
>> (MyObj.class), new SqlServerBulkCopySink(
>>             new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
>>             TypeExtractor.*createTypeInfo*(MyObj.class).createSerializer(new
>> ExecutionConfig()),
>>             UUID.*randomUUID*().toString()));
>>
>>     env.execute();
>> }
>>
>> private static class SqlServerBulkCopySink extends 
>> GenericWriteAheadSink<MyObj>
>> {
>>     public SqlServerBulkCopySink(CheckpointCommitter committer, 
>> TypeSerializer<MyObj>
>> serializer, String jobID) throws Exception {
>>         super(committer, serializer, jobID);
>>     }
>>
>>     @Override
>>     protected boolean sendValues(Iterable<MyObj> objects, long
>> checkpointId, long timestamp) {
>>         *logger*.info("Sending
>> {},{}-----------------------------------------------", checkpointId,
>> timestamp);
>>         for (MyObj myObj: objects)
>>             *logger*.info("  {},{}: {}", checkpointId, timestamp, trade);
>> // this will eventually be a bulk copy insert into the SQL Server database
>>         return true;
>>     }
>> }
>>
>>
>>
>> Am I right in thinking the latest versions of Flink will not suffer from
>> this problem or am I hitting something else? To be clear, I am expecting a
>> checkpoint to be invoked by Flink to cover all the data I want to insert
>> into my DB - how else would I do the final bulk copy if my sendValues() is
>> not called?
>>
>>
>> I have more questions about my data sink but I will wait to hear your
>> answers.
>>
>>
>> Many thanks in advance,
>>
>>
>> James.
>>
>>
>>

Reply via email to