Hi Keith,

When you add a new insert statement to your EXECUTE STATEMENT you change
your job graph with independent two graphs.Unfortunately, Flink doesn't
currently provide a way to directly force specific UIDs for operators
through configuration or SQL hints. This is primarily due to how Flink's
internal planner optimizes execution plans.

Talat


On Tue, May 7, 2024 at 8:42 AM Keith Lee <leekeiabstract...@gmail.com>
wrote:

> Hello,
>
> I'm running into issues restoring from savepoint after changing SQL
> statement.
>
> [ERROR] Could not execute SQL statement. Reason:
>> java.lang.IllegalStateException: Failed to rollback to
>> checkpoint/savepoint
>> file:/tmp/flink-savepoints/savepoint-52c344-3bedb8204ff0. Cannot map
>> checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to
>> the new program, because the operator is not available in the new program.
>> If you want to allow to skip this, you can set the --allowNonRestoredState
>> option on the CLI.
>
>
> EXECUTE STATEMENT SET was used, where an additional INSERT is added to the
> initial set with the first INSERT unmodified. I appreciate that under the
> hood, Flink does the planning and assigning of random uid for operator.
> Under this scenario, I'd expect the restore to be fine as the first
> statement remain unchanged, are there any ways around this. Also is it
> possible to force uid using configuration or SQL hint?
>
> Initial SQL statement set:
>
> EXECUTE STATEMENT SET BEGIN
>>
>> INSERT INTO UserErrorExperienceS3Sink (user_id, user_session,
>> interaction_type, interaction_target, interaction_tags, event_time) SELECT
>> user_id, user_session, interaction_type, interaction_target,
>> interaction_tags, event_time FROM UserBehaviourKafkaSource WHERE
>> interaction_result Like '%ERROR%';
>>
>> END
>
>
> Updated SQL statement set:
>
> EXECUTE STATEMENT SET BEGIN
>>
>> INSERT INTO UserErrorExperienceS3Sink (user_id, user_session,
>> interaction_type, interaction_target, interaction_tags, event_time) SELECT
>> user_id, user_session, interaction_type, interaction_target,
>> interaction_tags, event_time FROM UserBehaviourKafkaSource WHERE
>> interaction_result Like '%ERROR%';
>>
>>   INSERT INTO CampaignAggregationsJDBC
>>   SELECT
>>     CONCAT_WS('/', interaction_tags, interaction_result,
>> DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'),
>> DATE_FORMAT(window_end, 'YYYY-MM-DD HH:mm:ss.SSS')) AS id,
>>     interaction_tags as campaign,
>>     interaction_result,
>>     COUNT(*) AS interaction_count,
>>     window_start,
>>     window_end
>>   FROM TABLE(
>>     TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time),
>> INTERVAL '10' SECONDS))
>>   GROUP BY window_start, window_end, interaction_tags,
>> interaction_result;
>>
>> END;
>
>
> This was done on Flink 1.18 SQL Client.
>
> Much appreciated
> Keith
>

Reply via email to