Hello again,

We recently upgraded from Flink 1.12.3 to 1.14.0 and we were hoping it would 
solve our issue with checkpointing with finished data sources. We need the 
checkpointing to work to trigger Flink's GenericWriteAheadSink class.

Firstly, the constant mentioned on FLIP-147 that enables the feature isn't 
available as far as we can see (ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH). It's 
not in ConfigConstants or CheckpointConfig for example. So instead we enabled 
with the following:

conf.setBoolean("execution.checkpointing.checkpoints-after-tasks-finish.enabled",
 true);
StreamExecutionEnvironment env = StreamExecutionEnvironment 
.createLocalEnvironmentWithWebUI(config)
env.enableCheckpointing(30 * 1000);
...

We can see the constant available in 1.15 on Google but not the version we were 
expecting (1.14.0).

Previously we had to have long Thread.sleep(x) in to keep the sources alive 
when checkpoints were taken. When we enable this feature using the explicit 
string and removed these hacks we start seeing these errors:


INFO  [flink-akka.actor.default-dispatcher-7] o.a.f.r.e.ExecutionGraph Source: 
Order JDBC Source (1/1) (e015c4f0910fb27e15fec063616ab785) switched from 
RUNNING to FINISHED.



[some lines removed for brevity]



INFO  [flink-akka.actor.default-dispatcher-7] o.a.f.r.c.CheckpointCoordinator 
Triggering Checkpoint 5 for job 53d42ae669fad6cc8df2fe8f5706b48d failed due to 
{}

org.apache.flink.runtime.checkpoint.CheckpointException: TaskManager received a 
checkpoint request for unknown task e015c4f0910fb27e15fec063616ab785. Failure 
reason: Task local checkpoint failure.

     at 
org.apache.flink.runtime.taskexecutor.TaskExecutor.triggerCheckpoint(TaskExecutor.java:966)
 ~[flink-runtime-1.14.0.jar:1.14.0]

     at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source) ~[na:na]

     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[na:1.8.0_91]

     at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_91]

     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:316)
 ~[na:na]

     at 
org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[na:na]

     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:314)
 ~[na:na]

     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)
 ~[na:na]

     at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)
 ~[na:na]

     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24) ~[na:na]

     at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20) ~[na:na]

     at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
~[scala-library-2.11.12.jar:na]

     at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
~[scala-library-2.11.12.jar:na]

     at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20) 
~[na:na]

     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
~[scala-library-2.11.12.jar:na]

     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[scala-library-2.11.12.jar:na]

     at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
~[scala-library-2.11.12.jar:na]

     at akka.actor.Actor.aroundReceive(Actor.scala:537) ~[na:na]

     at akka.actor.Actor.aroundReceive$(Actor.scala:535) ~[na:na]

     at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220) ~[na:na]

     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580) ~[na:na]

     at akka.actor.ActorCell.invoke(ActorCell.scala:548) ~[na:na]

     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270) ~[na:na]

     at akka.dispatch.Mailbox.run(Mailbox.scala:231) ~[na:na]

     at akka.dispatch.Mailbox.exec(Mailbox.scala:243) ~[na:na]

     at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) 
~[na:1.8.0_91]

     at 
java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) 
~[na:1.8.0_91]

     at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) 
~[na:1.8.0_91]

     at 
java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) 
~[na:1.8.0_91]

FYI, if we don't enable this feature we see a different error consistent with 
the older version of Flink:

INFO  [Checkpoint Timer] o.a.f.r.c.CheckpointCoordinator           Failed to 
trigger checkpoint for job 532b20d72d407bf82570c86f672ccf2c because Some tasks 
of the job have already finished and checkpointing with finished tasks is not 
enabled. Failure reason: Not all required tasks are currently running.

Can anyone advise if this feature is indeed available and working in 1.14.0 and 
how to correctly enable?

Thanks,

James.

________________________________
From: Austin Cawley-Edwards <austin.caw...@gmail.com>
Sent: 04 November 2021 18:46
To: James Sandys-Lumsdaine <jas...@hotmail.com>
Cc: user@flink.apache.org <user@flink.apache.org>
Subject: Re: GenericWriteAheadSink, declined checkpoint for a finished source

Hi James,

You are correct that since Flink 1.14 [1] (which included FLIP-147 [2]) there 
is support for checkpointing after some tasks has finished, which sounds like 
it will solve this use case.

You may also want to look at the JDBC sink[3] which also supports batching, as 
well as some other nice things like retries and batch intervals.

Hope that helps,
Austin


[1]: 
https://flink.apache.org/news/2021/09/29/release-1.14.0.html#checkpointing-and-bounded-streams
[2]: 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-147%3A+Support+Checkpoints+After+Tasks+Finished
[3]: 
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/jdbc/

On Wed, Nov 3, 2021 at 12:25 PM James Sandys-Lumsdaine 
<jas...@hotmail.com<mailto:jas...@hotmail.com>> wrote:
Hello,

I have a Flink workflow where I need to upload the output data into a legacy 
SQL Server database and so I have read the section in the Flink book about data 
sinks and utilizing the GenericWriteAheadSink base class. I am currently using 
Flink 1.12.3 although we plan to upgrade to 1.14 shortly.

Firstly, given I will be generating a large amount of data I feel it best to 
use the GenericWriteAheadSink base class so I can bulk copy all the data into 
my SQL Server database rather than attempt a row by row insertion which would 
be too slow. Hopefully this is a good use case for this class or is there now a 
better approach?

Secondly, one thing I noticed is my JDBC source emits ~50,000 rows but the 
program actually exists before a final checkpoint is taken so I miss many of 
the final rows - I have to put in a Thread.sleep(5000) before allowing the JDBC 
source to exit. This might be related to FLINK-21215 as I see the following 
error:
org.apache.flink.util.SerializedThrowable: Task NameSource: Trade JDBC Source 
(1/1)#0 Failure reason: Checkpoint was declined (tasks not ready)
With the extra Thread.sleep(5000) I see all the rows handled by the 
sendValues() method.

I have included the test code below which just logs the "insertions" for now 
(and doesn't do real db access) but demonstrates the problem:

private void checkpointTest() throws Exception {
    Configuration conf = new Configuration();
    conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
    final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
    env.setParallelism(1);
    env.enableCheckpointing(500);

    MyJDBCSource myJDBCSource = new MyJDBCSource(tradesDBConnectionParams, 
fromDttm, toDttm, asOf);
    DataStream<MyObj> jdbcStreamIn = env.addSource(myJDBCSource, "My JDBC 
Source");

    jdbcTradesStreamIn.transform("SqlServerSink", 
TypeInformation.of(MyObj.class), new SqlServerBulkCopySink(
            new FileCheckpointCommitter("c:\\temp\\FlinkTemp"),
            TypeExtractor.createTypeInfo(MyObj.class).createSerializer(new 
ExecutionConfig()),
            UUID.randomUUID().toString()));


    env.execute();
}

private static class SqlServerBulkCopySink extends GenericWriteAheadSink<MyObj> 
{
    public SqlServerBulkCopySink(CheckpointCommitter committer, 
TypeSerializer<MyObj> serializer, String jobID) throws Exception {
        super(committer, serializer, jobID);
    }

    @Override
    protected boolean sendValues(Iterable<MyObj> objects, long checkpointId, 
long timestamp) {
        logger.info("Sending 
{},{}-----------------------------------------------", checkpointId, timestamp);
        for (MyObj myObj: objects)
            logger.info("  {},{}: {}", checkpointId, timestamp, trade); // this 
will eventually be a bulk copy insert into the SQL Server database
        return true;
    }
}



Am I right in thinking the latest versions of Flink will not suffer from this 
problem or am I hitting something else? To be clear, I am expecting a 
checkpoint to be invoked by Flink to cover all the data I want to insert into 
my DB - how else would I do the final bulk copy if my sendValues() is not 
called?


I have more questions about my data sink but I will wait to hear your answers.


Many thanks in advance,


James.

Reply via email to