Hello again, We recently upgraded from Flink 1.12.3 to 1.14.0 and we were hoping it would solve our issue with checkpointing with finished data sources. We need the checkpointing to work to trigger Flink's GenericWriteAheadSink class.
Firstly, the constant mentioned on FLIP-147 that enables the feature isn't available as far as we can see (ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH). It's not in ConfigConstants or CheckpointConfig for example. So instead we enabled with the following: conf.setBoolean("execution.checkpointing.checkpoints-after-tasks-finish.enabled", true); StreamExecutionEnvironment env = StreamExecutionEnvironment .createLocalEnvironmentWithWebUI(config) env.enableCheckpointing(30 * 1000); ... We can see the constant available in 1.15 on Google but not the version we were expecting (1.14.0). Previously we had to have long Thread.sleep(x) in to keep the sources alive when checkpoints were taken. When we enable this feature using the explicit string and removed these hacks we start seeing these errors: INFO [flink-akka.actor.default-dispatcher-7] o.a.f.r.e.ExecutionGraph Source: Order JDBC Source (1/1) (e015c4f0910fb27e15fec063616ab785) switched from RUNNING to FINISHED. [some lines removed for brevity] INFO [flink-akka.actor.default-dispatcher-7] o.a.f.r.c.CheckpointCoordinator Triggering Checkpoint 5 for job 53d42ae669fad6cc8df2fe8f5706b48d failed due to {} org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager received a checkpoint request for unknown task e015c4f0910fb27e15fec063616ab785. Failure reason: Task local checkpoint failure. at org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:966) ~[flink-runtime-1.14.0.jar:1.14.0] at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source) ~[na:na] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_91] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316) ~[na:na] at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) ~[na:na] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314) ~[na:na] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217) ~[na:na] at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163) ~[na:na] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[na:na] at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[na:na] at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) ~[scala-library-2.11.12.jar:na] at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) ~[scala-library-2.11.12.jar:na] at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) ~[na:na] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) ~[scala-library-2.11.12.jar:na] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[scala-library-2.11.12.jar:na] at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) ~[scala-library-2.11.12.jar:na] at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[na:na] at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[na:na] at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[na:na] at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[na:na] at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[na:na] at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[na:na] at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[na:na] at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[na:na] at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) ~[na:1.8.0_91] at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) ~[na:1.8.0_91] at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) ~[na:1.8.0_91] at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) ~[na:1.8.0_91] FYI, if we don't enable this feature we see a different error consistent with the older version of Flink: INFO [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator Failed to trigger checkpoint for job 532b20d72d407bf82570c86f672ccf2c because Some tasks of the job have already finished and checkpointing with finished tasks is not enabled. Failure reason: Not all required tasks are currently running. Can anyone advise if this feature is indeed available and working in 1.14.0 and how to correctly enable? Thanks, James. ________________________________ From: Austin Cawley-Edwards <austin.caw...@gmail.com> Sent: 04 November 2021 18:46 To: James Sandys-Lumsdaine <jas...@hotmail.com> Cc: user@flink.apache.org <user@flink.apache.org> Subject: Re: GenericWriteAheadSink, declined checkpoint for a finished source Hi James, You are correct that since Flink 1.14 [1] (which included FLIP-147 [2]) there is support for checkpointing after some tasks has finished, which sounds like it will solve this use case. You may also want to look at the JDBC sink[3] which also supports batching, as well as some other nice things like retries and batch intervals. Hope that helps, Austin [1]: https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams [2]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished [3]: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/ On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine <jas...@hotmail.com<mailto:jas...@hotmail.com>> wrote: Hello, I have a Flink workflow where I need to upload the output data into a legacy SQL Server database and so I have read the section in the Flink book about data sinks and utilizing the GenericWriteAheadSink base class. I am currently using Flink 1.12.3 although we plan to upgrade to 1.14 shortly. Firstly, given I will be generating a large amount of data I feel it best to use the GenericWriteAheadSink base class so I can bulk copy all the data into my SQL Server database rather than attempt a row by row insertion which would be too slow. Hopefully this is a good use case for this class or is there now a better approach? Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the program actually exists before a final checkpoint is taken so I miss many of the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC source to exit. This might be related to FLINK-21215 as I see the following error: org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready) With the extra Thread.sleep(5000) I see all the rows handled by the sendValues() method. I have included the test code below which just logs the "insertions" for now (and doesn't do real db access) but demonstrates the problem: private void checkpointTest() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); env.enableCheckpointing(500); MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, fromDttm, toDttm, asOf); DataStream<MyObj> jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC Source"); jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.of(MyObj.class), new SqlServerBulkCopySink( new FileCheckpointCommitter("c:\\temp\\FlinkTemp"), TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new ExecutionConfig()), UUID.randomUUID().toString())); env.execute(); } private static class SqlServerBulkCopySink extends GenericWriteAheadSink<MyObj> { public SqlServerBulkCopySink(CheckpointCommitter committer, TypeSerializer<MyObj> serializer, String jobID) throws Exception { super(committer, serializer, jobID); } @Override protected boolean sendValues(Iterable<MyObj> objects, long checkpointId, long timestamp) { logger.info("Sending {},{}-----------------------------------------------", checkpointId, timestamp); for (MyObj myObj: objects) logger.info(" {},{}: {}", checkpointId, timestamp, trade); // this will eventually be a bulk copy insert into the SQL Server database return true; } } Am I right in thinking the latest versions of Flink will not suffer from this problem or am I hitting something else? To be clear, I am expecting a checkpoint to be invoked by Flink to cover all the data I want to insert into my DB - how else would I do the final bulk copy if my sendValues() is not called? I have more questions about my data sink but I will wait to hear your answers. Many thanks in advance, James.