Hi Fabian,
Thanks for your reply, it helps a lot.
Best Regards,
Jie
| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制
On 7/8/2020 18:17,Fabian Hueske wrote:
Hi Jie,
The auto-ID generation is not done by the SQL translation component but on a
lower level, i.e., it's independent of Flink's SQL translation.
The ID generation only depends on the topology / graph structure of the
program's operators.
The ID of an operator depends on the IDs of its predecessors (and not on its
own processing logic or operator name).
So, as long as the operator graph structure of a program remains the same, it
will be compatible with an earlier savepoint.
However, preserving the operator graph structure is out of the user's control.
The operator graph is automatically generated by the SQL optimizer and slight
changes of a query can result in a different graph while other changes do not
affect the structure.
In your example, the graph structure should remain the same because there is
already a Filter operator (due to "where id == '001'") in the first query and
the second query just extends the filter predicate ("id == '001' and age >=
'28'").
If there was no WHERE clause in the first query, the plan might have been
changed.
In order to reason about which query changes are savepoint compatible, you need
in-depth knowledge about the optimizer's translation process.
I would not rely on being able to start a query from a savepoint of a
(slightly) modified query.
First because it is very fragile given the query translation process and second
because it results in incorrect results.
Given your example query, I would start it from scratch and add a predicate to
continue after the latest result of the previous query:
select id, name, sum(salary) from user_info where id == '001' and age >= '28'
and rowtime >= 'xxx' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
If the last result of the first query was for '2020-07-07' I would set xxx to
'2020-07-08-00:00:00.000'.
Of course this only works for queries with hard temporary boundaries, but it
gives correct results.
Best, Fabian
Am Mi., 8. Juli 2020 um 04:50 Uhr schrieb shadowell :
Hi Fabian,
Thanks for your information!
Actually, I am not clear about the mechanism of auto-generated IDs in Flink SQL
and the mechanism of how does the operator state mapping back from savepoint.
I hope to get some detail information by giving an example bellow.
I have two sql as samples:
old sql : select id, name, sum(salary) from user_info where id == '001' group
by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
new sql: select id, name, sum(salary) from user_info where id == '001' and
age >= '28' group by TUMBLE(rowtime, INTERVAL '1' DAY), id, name;
I just add some age limitation in new SQL. Now, I want to switch the job from
old one to the new one by trigger a savepoint. Flink will generate operator IDs
for operators in new SQL.
In this case, just from a technical point of view, the operator IDs in the
savepoint of the old SQL job can match the operator IDs in the new SQL job?
My understanding is that Flink will reorder the operators and generate new IDs
for operators. The new IDs may not match the old IDs.
This will cause some states failed to be mapped back from the old job
savepoint, which naturally leads to inaccurate calculation results.
I wonder if my understanding is correct.
Thanks~
Jie
| |
shadowell
|
|
shadow...@126.com
|
签名由网易邮箱大师定制
On 7/7/2020 17:23,Fabian Hueske wrote:
Hi Jie Feng,
As you said, Flink translates SQL queries into streaming programs with
auto-generated operator IDs.
In order to start a SQL query from a savepoint, the operator IDs in the
savepoint must match the IDs in the newly translated program.
Right now this can only be guaranteed if you translate the same query with the
same Flink version (optimizer changes might change the structure of the
resulting plan even if the query is the same).
This is of course a significant limitation, that the community is aware of and
planning to improve in the future.
I'd also like to add that it can be very difficult to assess whether it is
meaningful to start a query from a savepoint that was generated with a
different query.
A savepoint holds intermediate data that is needed to compute the result of a
query.
If you update a query it is very well possible that the result computed by
Flink won't be equal to the actual result of the new query.
Best, Fabian
Am Mo., 6. Juli 2020 um 10:50 Uhr schrieb shadowell :
Hello, everyone,
I have some unclear points when using Flink SQL. I hope to get an
answer or tell me where I can find the answer.
When using the DataStream API, in order to ensure that the job can
recover the state from savepoint after adjustment, it is necessary to specify
the uid for the operator. However, when using Flink SQL, the uid of the
operator is automatically generated. If