Hello,

After bumping 'table.exec.resource.default-parallelism' from 1 to 4, I am
observing the following exception on restoring job from savepoint with an
unmodified statement set.

[ERROR] Could not execute SQL statement. Reason:
> java.lang.IllegalStateException: Failed to rollback to
> checkpoint/savepoint
> file:/tmp/flink-savepoints/savepoint-4392e6-575fa6b692ff. Cannot map
> checkpoint/savepoint state for operator 46ba9b22862c3bbe9373c6abee964b2a to
> the new program, because the operator is not available in the new program.
> If you want to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
>

When started without savepoints, the jobgraph differs for the jobs despite
identical statements being ran.

There are 2 operators when default parallelism is 1.

> A: Source: UserBehaviourKafkaSource[68] -> (Calc[69] ->
> StreamRecordTimestampInserter[70] -> StreamingFileWriter -> Sink: end,
> Calc[71] -> LocalWindowAggregate[72])
> B: GlobalWindowAggregate[74] -> Calc[75] -> Sink:
> CampaignAggregationsJDBC[76]


Three operators when default parallelism is 4.

> A: Source: UserBehaviourKafkaSource[86] -> (Calc[87] ->
> StreamRecordTimestampInserter[88] -> StreamingFileWriter, Calc[89] ->
> LocalWindowAggregate[90])
> B: Sink: end
> C: GlobalWindowAggregate[92] -> Calc[93] -> Sink:
> CampaignAggregationsJDBC[94]


Notice that the operator 'Sink: end' is separated out when parallelism is
set to 4, causing the incompatibility in job graph. EXPLAIN PLAN did not
show any difference between syntax tree, physical plan or execution plan.

I have attempted various configurations in `table.optimizer.*`. Am I
missing something or a configuration that may allow the restore to work? I
can include the DDL and DMLs if that'd help provide context.

Would appreciate pointers here.

Thank you
Keith

Reply via email to