Re: GenericWriteAheadSink, declined checkpoint for a finished source
eive(Actor.scala:537) ~[na:na] > > at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[na:na] > > at > akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[na:na] > > at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[na:na] > > at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[na:na] > > at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[na:na] > > at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[na:na] > > at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[na:na] > > at > java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > ~[na:1.8.0_91] > > at > java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > ~[na:1.8.0_91] > > at > java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > ~[na:1.8.0_91] > > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) > ~[na:1.8.0_91] > > > FYI, if we don't enable this feature we see a different error > consistent with the older version of Flink: > > *INFO [Checkpoint Timer] > o.a.f.r.c.CheckpointCoordinator Failed to trigger > checkpoint for job 532b20d72d407bf82570c86f672ccf2c because Some > tasks of the job have already finished and checkpointing with > finished tasks is not enabled. Failure reason: Not all required > tasks are currently running.* > > > Can anyone advise if this feature is indeed available and working in > 1.14.0 and how to correctly enable? > > Thanks, > > James. > > > *From:* Austin Cawley-Edwards > *Sent:* 04 November 2021 18:46 > *To:* James Sandys-Lumsdaine > *Cc:* user@flink.apache.org > *Subject:* Re: GenericWriteAheadSink, declined checkpoint for a > finished source > > Hi James, > > You are correct that since Flink 1.14 [1] (which included FLIP-147 > [2]) there is support for checkpointing after some tasks has finished, > which sounds like it will solve this use case. > > You may also want to look at the JDBC sink[3] which also supports > batching, as well as some other nice things like retries and batch > intervals. > > Hope that helps, > Austin > > > [1]: > https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams > <https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams> > [2]: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished > <https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished> > [3]: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/ > <https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/> > > > On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine > mailto:jas...@hotmail.com>> wrote: > > Hello, > > I have a Flink workflow where I need to upload the output data > into a legacy SQL Server database and so I have read the section > in the Flink book about data sinks and utilizing the > GenericWriteAheadSink base class. I am currently using Flink > 1.12.3 although we plan to upgrade to 1.14 shortly. > > Firstly, given I will be generating a large amount of data I feel > it best to use the GenericWriteAheadSink base class so I can bulk > copy all the data into my SQL Server database rather than attempt > a row by row insertion which would be too slow. Hopefully this is > a good use case for this class or is there now a better approach? > > Secondly, one thing I noticed is my JDBC source emits ~50,000 rows > but the program actually exists before a final checkpoint is taken > so I miss many of the final rows - I have to put in a > Thread.sleep(5000) before allowing the JDBC source to exit. This > might be related to FLINK-21215 as I see the following error: > /org.apache.flink.util.SerializedThrowable: Task NameSource: Trade > JDBC Source (1/1)#0 Failure reason: Checkpoint was declined (tasks > not ready)/ > With the extra Thread.sleep(5000) I see all the rows handled by > the sendValues() method. > > I have included the test code below which just logs the > "insertions" for now (and doesn't do real db access) but > demonstrates the problem: > > private void checkpointTest() throws Exception { > Configuration conf = new Configuration(); > conf.setBoolean(ConfigConstants./LOCAL_START_WEBSERVER/, true); > final
Re: GenericWriteAheadSink, declined checkpoint for a finished source
Cc: user@flink.apache.org Subject: Re: GenericWriteAheadSink, declined checkpoint for a finished source Hi James, You are correct that since Flink 1.14 [1] (which included FLIP-147 [2]) there is support for checkpointing after some tasks has finished, which sounds like it will solve this use case. You may also want to look at the JDBC sink[3] which also supports batching, as well as some other nice things like retries and batch intervals. Hope that helps, Austin [1]: https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams [2]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished [3]: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/ On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine mailto:jas...@hotmail.com>> wrote: Hello, I have a Flink workflow where I need to upload the output data into a legacy SQL Server database and so I have read the section in the Flink book about data sinks and utilizing the GenericWriteAheadSink base class. I am currently using Flink 1.12.3 although we plan to upgrade to 1.14 shortly. Firstly, given I will be generating a large amount of data I feel it best to use the GenericWriteAheadSink base class so I can bulk copy all the data into my SQL Server database rather than attempt a row by row insertion which would be too slow. Hopefully this is a good use case for this class or is there now a better approach? Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the program actually exists before a final checkpoint is taken so I miss many of the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC source to exit. This might be related to FLINK-21215 as I see the following error: org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready) With the extra Thread.sleep(5000) I see all the rows handled by the sendValues() method. I have included the test code below which just logs the "insertions" for now (and doesn't do real db access) but demonstrates the problem: private void checkpointTest() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); env.enableCheckpointing(500); MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, fromDttm, toDttm, asOf); DataStream jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC Source"); jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.of(MyObj.class), new SqlServerBulkCopySink( new FileCheckpointCommitter("c:\\temp\\FlinkTemp"), TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new ExecutionConfig()), UUID.randomUUID().toString())); env.execute(); } private static class SqlServerBulkCopySink extends GenericWriteAheadSink { public SqlServerBulkCopySink(CheckpointCommitter committer, TypeSerializer serializer, String jobID) throws Exception { super(committer, serializer, jobID); } @Override protected boolean sendValues(Iterable objects, long checkpointId, long timestamp) { logger.info("Sending {},{}---", checkpointId, timestamp); for (MyObj myObj: objects) logger.info(" {},{}: {}", checkpointId, timestamp, trade); // this will eventually be a bulk copy insert into the SQL Server database return true; } } Am I right in thinking the latest versions of Flink will not suffer from this problem or am I hitting something else? To be clear, I am expecting a checkpoint to be invoked by Flink to cover all the data I want to insert into my DB - how else would I do the final bulk copy if my sendValues() is not called? I have more questions about my data sink but I will wait to hear your answers. Many thanks in advance, James.
Re: GenericWriteAheadSink, declined checkpoint for a finished source
Hi! Thanks Austin for the answer. I agree that FLIP-147 has solved the problem, just set execution.checkpointing.checkpoints-after-tasks-finish.enabled to true to enable this feature. JDBC sinks solves this problem in a different way. It flushes the sink when closed (see JdbcOutputFormat#close [1]). But please note that unlike GenericWriteAheadSink, JdbcOutputFormat flushes data once its buffer threshold is reached, so users might see inconsistent data after a checkpoint failure and before the job finishes (it requires a primary key for the sink to reach the eventual consistency). [1] https://github.com/apache/flink/blob/d502353acd03428b9befa4ec970191b757b6c8c3/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/internal/JdbcOutputFormat.java#L253 Austin Cawley-Edwards 于2021年11月5日周五 上午2:47写道: > Hi James, > > You are correct that since Flink 1.14 [1] (which included FLIP-147 [2]) > there is support for checkpointing after some tasks has finished, which > sounds like it will solve this use case. > > You may also want to look at the JDBC sink[3] which also supports > batching, as well as some other nice things like retries and batch > intervals. > > Hope that helps, > Austin > > > [1]: > https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams > [2]: > https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished > [3]: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/ > > On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine > wrote: > >> Hello, >> >> I have a Flink workflow where I need to upload the output data into a >> legacy SQL Server database and so I have read the section in the Flink book >> about data sinks and utilizing the GenericWriteAheadSink base class. I >> am currently using Flink 1.12.3 although we plan to upgrade to 1.14 >> shortly. >> >> Firstly, given I will be generating a large amount of data I feel it best >> to use the GenericWriteAheadSink base class so I can bulk copy all the data >> into my SQL Server database rather than attempt a row by row insertion >> which would be too slow. Hopefully this is a good use case for this class >> or is there now a better approach? >> >> Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but >> the program actually exists before a final checkpoint is taken so I miss >> many of the final rows - I have to put in a Thread.sleep(5000) before >> allowing the JDBC source to exit. This might be related to FLINK-21215 as I >> see the following error: >> *org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC >> Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)* >> With the extra Thread.sleep(5000) I see all the rows handled by the >> sendValues() method. >> >> I have included the test code below which just logs the "insertions" for >> now (and doesn't do real db access) but demonstrates the problem: >> >> private void checkpointTest() throws Exception { >> Configuration conf = new Configuration(); >> conf.setBoolean(ConfigConstants.*LOCAL_START_WEBSERVER*, true); >> final StreamExecutionEnvironment env = StreamExecutionEnvironment. >> *createLocalEnvironmentWithWebUI*(conf); >> env.setParallelism(1); >> env.enableCheckpointing(500); >> >> >> MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, >> fromDttm, toDttm, asOf); >> DataStream jdbcStreamIn = env.addSource(myJDBCSource, "My >> JDBC Source"); >> >> jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.*of* >> (MyObj.class), new SqlServerBulkCopySink( >> new FileCheckpointCommitter("c:\\temp\\FlinkTemp"), >> TypeExtractor.*createTypeInfo*(MyObj.class).createSerializer(new >> ExecutionConfig()), >> UUID.*randomUUID*().toString())); >> >> env.execute(); >> } >> >> private static class SqlServerBulkCopySink extends >> GenericWriteAheadSink >> { >> public SqlServerBulkCopySink(CheckpointCommitter committer, >> TypeSerializer >> serializer, String jobID) throws Exception { >> super(committer, serializer, jobID); >> } >> >> @Override >> protected boolean sendValues(Iterable objects, long >> checkpointId, long timestamp) { >> *logger*.info("Sending >> {},{}---", checkpointId, >> timestamp); >> for (MyObj myObj: objects) >> *logger*.info(" {},{}: {}", checkpointId, timestamp, trade); >> // this will eventually be a bulk copy insert into the SQL Server database >> return true; >> } >> } >> >> >> >> Am I right in thinking the latest versions of Flink will not suffer from >> this problem or am I hitting something else? To be clear, I am expecting a >> checkpoint to be invoked by Flink to cover all the data I want to insert >> into my DB - how else would I do the final bulk copy if my
Re: GenericWriteAheadSink, declined checkpoint for a finished source
Hi James, You are correct that since Flink 1.14 [1] (which included FLIP-147 [2]) there is support for checkpointing after some tasks has finished, which sounds like it will solve this use case. You may also want to look at the JDBC sink[3] which also supports batching, as well as some other nice things like retries and batch intervals. Hope that helps, Austin [1]: https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams [2]: https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished [3]: https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/ On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine wrote: > Hello, > > I have a Flink workflow where I need to upload the output data into a > legacy SQL Server database and so I have read the section in the Flink book > about data sinks and utilizing the GenericWriteAheadSink base class. I am > currently using Flink 1.12.3 although we plan to upgrade to 1.14 shortly. > > Firstly, given I will be generating a large amount of data I feel it best > to use the GenericWriteAheadSink base class so I can bulk copy all the data > into my SQL Server database rather than attempt a row by row insertion > which would be too slow. Hopefully this is a good use case for this class > or is there now a better approach? > > Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the > program actually exists before a final checkpoint is taken so I miss many > of the final rows - I have to put in a Thread.sleep(5000) before allowing > the JDBC source to exit. This might be related to FLINK-21215 as I see the > following error: > *org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC > Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)* > With the extra Thread.sleep(5000) I see all the rows handled by the > sendValues() method. > > I have included the test code below which just logs the "insertions" for > now (and doesn't do real db access) but demonstrates the problem: > > private void checkpointTest() throws Exception { > Configuration conf = new Configuration(); > conf.setBoolean(ConfigConstants.*LOCAL_START_WEBSERVER*, true); > final StreamExecutionEnvironment env = StreamExecutionEnvironment. > *createLocalEnvironmentWithWebUI*(conf); > env.setParallelism(1); > env.enableCheckpointing(500); > > > MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, > fromDttm, toDttm, asOf); > DataStream jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC > Source"); > > jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.*of* > (MyObj.class), new SqlServerBulkCopySink( > new FileCheckpointCommitter("c:\\temp\\FlinkTemp"), > TypeExtractor.*createTypeInfo*(MyObj.class).createSerializer(new > ExecutionConfig()), > UUID.*randomUUID*().toString())); > > env.execute(); > } > > private static class SqlServerBulkCopySink extends > GenericWriteAheadSink > { > public SqlServerBulkCopySink(CheckpointCommitter committer, > TypeSerializer > serializer, String jobID) throws Exception { > super(committer, serializer, jobID); > } > > @Override > protected boolean sendValues(Iterable objects, long > checkpointId, long timestamp) { > *logger*.info("Sending > {},{}---", checkpointId, > timestamp); > for (MyObj myObj: objects) > *logger*.info(" {},{}: {}", checkpointId, timestamp, trade); // > this will eventually be a bulk copy insert into the SQL Server database > return true; > } > } > > > > Am I right in thinking the latest versions of Flink will not suffer from > this problem or am I hitting something else? To be clear, I am expecting a > checkpoint to be invoked by Flink to cover all the data I want to insert > into my DB - how else would I do the final bulk copy if my sendValues() is > not called? > > > I have more questions about my data sink but I will wait to hear your > answers. > > > Many thanks in advance, > > > James. > > >
GenericWriteAheadSink, declined checkpoint for a finished source
Hello, I have a Flink workflow where I need to upload the output data into a legacy SQL Server database and so I have read the section in the Flink book about data sinks and utilizing the GenericWriteAheadSink base class. I am currently using Flink 1.12.3 although we plan to upgrade to 1.14 shortly. Firstly, given I will be generating a large amount of data I feel it best to use the GenericWriteAheadSink base class so I can bulk copy all the data into my SQL Server database rather than attempt a row by row insertion which would be too slow. Hopefully this is a good use case for this class or is there now a better approach? Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the program actually exists before a final checkpoint is taken so I miss many of the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC source to exit. This might be related to FLINK-21215 as I see the following error: org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source (1/1)#0 Failure reason: Checkpoint was declined (tasks not ready) With the extra Thread.sleep(5000) I see all the rows handled by the sendValues() method. I have included the test code below which just logs the "insertions" for now (and doesn't do real db access) but demonstrates the problem: private void checkpointTest() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf); env.setParallelism(1); env.enableCheckpointing(500); MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, fromDttm, toDttm, asOf); DataStream jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC Source"); jdbcTradesStreamIn.transform("SqlServerSink", TypeInformation.of(MyObj.class), new SqlServerBulkCopySink( new FileCheckpointCommitter("c:\\temp\\FlinkTemp"), TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new ExecutionConfig()), UUID.randomUUID().toString())); env.execute(); } private static class SqlServerBulkCopySink extends GenericWriteAheadSink { public SqlServerBulkCopySink(CheckpointCommitter committer, TypeSerializer serializer, String jobID) throws Exception { super(committer, serializer, jobID); } @Override protected boolean sendValues(Iterable objects, long checkpointId, long timestamp) { logger.info("Sending {},{}---", checkpointId, timestamp); for (MyObj myObj: objects) logger.info(" {},{}: {}", checkpointId, timestamp, trade); // this will eventually be a bulk copy insert into the SQL Server database return true; } } Am I right in thinking the latest versions of Flink will not suffer from this problem or am I hitting something else? To be clear, I am expecting a checkpoint to be invoked by Flink to cover all the data I want to insert into my DB - how else would I do the final bulk copy if my sendValues() is not called? I have more questions about my data sink but I will wait to hear your answers. Many thanks in advance, James.