Hi, if the processing logic is modified, then the representation of the 
topology would change. Consequently, the UIDs that are determined by the 
topological order might change as well, which could potentially cause state 
recovery to fail. For further details, you can refer to [1].

Currently, the Table API does not have the capability to customize UIDs. You 
might consider creating a feature request on JIRA [2], and then initiate a 
discussion on the dev mailing list.




[1] 
https://github.com/apache/flink/blob/92eef24d4cc531d6474252ef909fc6d431285dd9/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/ExecNodeBase.java#L243C38-L243C62

[2] https://issues.apache.org/jira/projects/FLINK/issues/




--

    Best!
    Xuyang




在 2024-05-08 06:13:29,"Talat Uyarer via user" <user@flink.apache.org> 写道:

Hi Keith,


When you add a new insert statement to your EXECUTE STATEMENT you change your 
job graph with independent two graphs.Unfortunately, Flink doesn't currently 
provide a way to directly force specific UIDs for operators through 
configuration or SQL hints. This is primarily due to how Flink's internal 
planner optimizes execution plans.


Talat




On Tue, May 7, 2024 at 8:42 AM Keith Lee <leekeiabstract...@gmail.com> wrote:

Hello,


I'm running into issues restoring from savepoint after changing SQL statement.

[ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalStateException: Failed to rollback to checkpoint/savepoint 
file:/tmp/flink-savepoints/savepoint-52c344-3bedb8204ff0. Cannot map 
checkpoint/savepoint state for operator cbc357ccb763df2852fee8c4fc7d55f2 to the 
new program, because the operator is not available in the new program. If you 
want to allow to skip this, you can set the --allowNonRestoredState option on 
the CLI.

EXECUTE STATEMENT SET was used, where an additional INSERT is added to the 
initial set with the first INSERT unmodified. I appreciate that under the hood, 
Flink does the planning and assigning of random uid for operator. Under this 
scenario, I'd expect the restore to be fine as the first statement remain 
unchanged, are there any ways around this. Also is it possible to force uid 
using configuration or SQL hint?


Initial SQL statement set:


EXECUTE STATEMENT SET BEGIN

INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, interaction_type, 
interaction_target, interaction_tags, event_time) SELECT user_id, user_session, 
interaction_type, interaction_target, interaction_tags, event_time FROM 
UserBehaviourKafkaSource WHERE interaction_result Like '%ERROR%';

END


Updated SQL statement set:

EXECUTE STATEMENT SET BEGIN

INSERT INTO UserErrorExperienceS3Sink (user_id, user_session, interaction_type, 
interaction_target, interaction_tags, event_time) SELECT user_id, user_session, 
interaction_type, interaction_target, interaction_tags, event_time FROM 
UserBehaviourKafkaSource WHERE interaction_result Like '%ERROR%';

  INSERT INTO CampaignAggregationsJDBC
  SELECT
    CONCAT_WS('/', interaction_tags, interaction_result, 
DATE_FORMAT(window_start, 'YYYY-MM-DD HH:mm:ss.SSS'), DATE_FORMAT(window_end, 
'YYYY-MM-DD HH:mm:ss.SSS')) AS id,
    interaction_tags as campaign,
    interaction_result,
    COUNT(*) AS interaction_count,
    window_start,
    window_end
  FROM TABLE(
    TUMBLE(TABLE UserBehaviourKafkaSource, DESCRIPTOR(event_time), INTERVAL 
'10' SECONDS))
  GROUP BY window_start, window_end, interaction_tags, interaction_result;

END;


This was done on Flink 1.18 SQL Client.



Much appreciated

Keith

Reply via email to