Nathan created FLINK-28622:
------------------------------
Summary: Can't restore a flink job that uses Table API and Kafka
connector with savepoint
Key: FLINK-28622
URL: https://issues.apache.org/jira/browse/FLINK-28622
Project: Flink
Issue Type: Bug
Components: Table SQL / API
Affects Versions: 1.15.0
Reporter: Nathan
I canceled a flink job with a savepoint, then tried to restore the job with the
savepoint (just using the same jar file) but it said it cannot map savepoint
state. I was just using the same jar file so I think the execution plan and
generated operator ID should be the same? (Flink version has not been changed)
Related errors:
{code:java}
used by: java.util.concurrent.CompletionException:
java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint
file:/root/flink-savepoints/savepoint-5f285c-c2749410db07. Cannot map
checkpoint/savepoint state for operator dd5fc1f28f42d777f818e2e8ea18c331 to the
new program, because the operator is not available in the new program. If you
want to allow to skip this, you can set the --allowNonRestoredState option on
the CLI.
used by: java.lang.IllegalStateException: Failed to rollback to
checkpoint/savepoint file:/root/flink-savepoints/savepoint-5f285c-c2749410db07.
Cannot map checkpoint/savepoint state for operator
dd5fc1f28f42d777f818e2e8ea18c331 to the new program, because the operator is
not available in the new program. If you want to allow to skip this, you can
set the --allowNonRestoredState option on the CLI. {code}
My code:
{code:java}
public final class FlinkJob {
public static void main(String[] args) {
final String JOB_NAME = "FlinkJob";
final EnvironmentSettings settings =
EnvironmentSettings.inStreamingMode();
final TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.getConfig().set("pipeline.name", JOB_NAME);
tEnv.getConfig().setLocalTimeZone(ZoneId.of("UTC"));
tEnv.executeSql("CREATE TEMPORARY TABLE ApiLog (" +
" `_timestamp` TIMESTAMP(3) METADATA FROM 'timestamp'
VIRTUAL," +
" `_partition` INT METADATA FROM 'partition' VIRTUAL," +
" `_offset` BIGINT METADATA FROM 'offset' VIRTUAL," +
" `Data` STRING," +
" `Action` STRING," +
" `ProduceDateTime` TIMESTAMP_LTZ(6)," +
" `OffSet` INT" +
") WITH (" +
" 'connector' = 'kafka'," +
" 'topic' = 'api.log'," +
" 'properties.group.id' = 'flink'," +
" 'properties.bootstrap.servers' = '<mykafkahost...>'," +
" 'format' = 'json'," +
" 'json.timestamp-format.standard' = 'ISO-8601'" +
")");
tEnv.executeSql("CREATE TABLE print_table (" +
" `_timestamp` TIMESTAMP(3)," +
" `_partition` INT," +
" `_offset` BIGINT," +
" `Data` STRING," +
" `Action` STRING," +
" `ProduceDateTime` TIMESTAMP(6)," +
" `OffSet` INT" +
") WITH ('connector' = 'print')");
tEnv.executeSql("INSERT INTO print_table" +
" SELECT * FROM ApiLog");
}
} {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)