Hi, Marek. Sorry for this late reply because of the Spring Festival in China.


When the upsert keys is empty that can't be deduced or are different with the 
pk in. sink, flink will genereta upsert materializer when 
`table.exec.sink.upsert-materialize = FORCE`. You can see the code here[1].


You're right. Upsert materializer will encounter some problems, especially in 
scenarios with non-deterministic functions, etc. Here are two documents that 
are slightly involved[2][3]. As an improvement, you can also create a jira[4] 
to continuously improve the adviser with the EXPLAIN PLAN_ADVICE[3] syntax.


[1] 
https://github.com/apache/flink/blob/050503c65f5c5c18bb573748ccbf5aecce4ec1a5/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/optimize/program/FlinkChangelogModeInferenceProgram.scala#L881
[2] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/determinism/#32-non-deterministic-update-in-streaming
[3] 
https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/explain/#explaindetails
[4] https://issues.apache.org/jira/projects/FLINK/summary



--

    Best!
    Xuyang




在 2024-02-08 19:53:42,"Marek Maj" <marekm...@gmail.com> 写道:

Hi Xuyang,
Thank you for the explanation, table.exec.sink.upsert-materialize = FORCE 
config was set unnecessarily, I just redeployed the job and confirmed that when 
using default AUTO, materializer is still on


Thank you for the example you provided. My understanding of upsert key was 
exactly as yours before, but I have not been able to reproduce that


When executing EXPLAIN CHANGELOG_MODE statement only information about using 
materializer is visible, no information about the upsert key is printed. After 
slightly modifying flink lib to add log statements in SinkUpsertMaterializer, 
at runtime tm logs show that variable inputUpsertKey is empty, hasUpsertKey 
boolean value is false


Let me describe my observations based on two examples


EXAMPLE 1
First example is taken directly from this article [1]:
-- CDC source tables:  s1 & s2
s1: id BIGINT, level BIGINT, PRIMARY KEY(id)
s2: id BIGINT, attr VARCHAR, PRIMARY KEY(id)
-- sink table: t1
t1: id BIGINT, level BIGINT, attr VARCHAR, PRIMARY KEY(id)
-- join s1 and s2 and insert the result into t1
INSERT INTO t1
SELECT
  s1.*, s2.attr
FROM s1 JOIN s2
  ON s1.level = s2.id


When I run this simplified example, that just joins two tables, I get an empty 
upsert key exactly as it is stated in the article. Again at runtime tm logs 
show that variable inputUpsertKey is empty, hasUpsertKey boolean value is 
false. I do not see information about upsert key when running EXPLAIN 
statement, however an information about join seems to be important:
leftInputSpec=[HasUniqueKey], rightInputSpec=[JoinKeyContainsUniqueKey]


EXAMPLE 2

Second example is slightly modified. Unique keys are used in the join and sink 
PK differs from join key:
s1: id BIGINT, level BIGINT, PRIMARY KEY(id)
s2: id BIGINT, attr VARCHAR, PRIMARY KEY(id)
t1: id BIGINT, level BIGINT, attr VARCHAR, PRIMARY KEY( level )
INSERT INTO t1
SELECT
  s1.*, s2.attr
FROM s1 JOIN s2
  ON s1.id = s2.id


This time upsert key is defined. In the above inputUpsertKey is defined as 
'id', hasUpsertKey boolean value is true. Additionally, information about join 
using unique key is printed in the plan when executing EXPLAIN:
leftInputSpec=[JoinKeyContainsUniqueKey], 
rightInputSpec=[JoinKeyContainsUniqueKey]


That seems to be correct behavior. SinkUpsertMaterializer will use that upsert 
key when comparing incoming event with the historical events in its state


According to the SinkUpsertMaterializer code, whenever upsert key is empty 
(first example) whole row equaliser is used to find last matched value in 
state. If the whole row needs to be matched, in some jobs it may potentially 
lead to undesirable final ordering due to using TemporalJoins (some state gets 
cleared when watermark progresses, even if global ttl = 0) and/or 
non-deterministic calculations for some columns (like adding column with value 
LOCALTIMESTAMP just before the sink). At the same time, sink upsert 
materializer will still be turned on automatically which may suggest to the 
user that it is ordering events correctly. Maybe we could add more 
documentation on that use cases?


I am eager to hear what do you think


best regards
Marek


[1] 
https://www.ververica.com/blog/flink-sql-secrets-mastering-the-art-of-changelog-event-out-of-orderness


pt., 2 lut 2024 o 12:30 Xuyang <xyzhong...@163.com> napisał(a):

Hi, Maj.


> 1. Does the materializer support jobs containing different types of joins 
> (more specifically regular and temporal joins)? 
> 2. Does the materializer support different types of input connectors: kafka 
> with both debezium-avro-confluent and avro-confluent formats and upsert-kafka 
> with avro-confluent format? All with well defined primary key (PK)

The common answer to both questions is "no." The upsert materializer is only 
related to the sink and the node before the sink (usually a join or an 
aggregation, etc.).

By default (with table.exec.sink.upsert-materialize = AUTO), the upsert 
materializer will appear when the upsert key of the upstream node before the 
sink and the pk of the sink do not match. Usually, we do not need to manually 
set this parameter to FORCE.




Suppose we have a source table T1, with a schema of "a", "b", "c", and "a" is 
the pk. Downstream, "b" is used as the join key to join with table T2, and the 
result is written into table T3, where "a" is also the pk. The global 
parallelism is set to 2. 

The source will issue (+I, a1, b1, c1), (-U, a1, b1, c1), (+U, a1, b2, c2). 
Because the join key is "b", the upsert key for the join becomes "b", which 
does not match the sink's pk "a", hence a sink materializer is produced.

Since the join key is "b", (+I, a1, b1, c1) and (-U, a1, b1, c1) will be sent 
to the first parallel instance of the join "join1", and (+U, a1, b2, c2) will 
be sent to the second parallel instance of the join "join2". At the same time, 
since the sink's pk is "a", these three pieces of data are actually related in 
sequence at the sink.




In practice, due to different processing speeds of join1 and join2, the sink 
may receive the following three possible sequences:

(+I, a1, b1, c1), (-U, a1, b1, c1), (+U, a1, b2, c2)
(+U, a1, b2, c2), (+I, a1, b1, c1), (-U, a1, b1, c1)
(+I, a1, b1, c1), (+U, a1, b2, c2), (-U, a1, b1, c1)

(Note that the two pieces of data (+I, a1, b1, c1) and (-U, a1, b1, c1) must be 
in order because they are processed by the single parallel "join1".)




Without an upsert materializer, in cases 2 and 3, the sink would ultimately 
receive -U, leading to data deletion.

The upsert materializer is used to correctly the finally issue (+U, a1, b2, c2) 
in cases 2 and 3.




Regrettably, Flink currently does not have a means of online debugging. To 
confirm the logic related to the upsert materializer, you may need to download 
the repo from the Flink repository, build & compile it, and then run the 
SinkUpsertMaterializerTest test class to observe and test youself.




Regarding the upsert key, you can use EXPLAIN CHANGELOG_MODE ... to view them 
in the plan.




If there are any issues with the above, please correct me.







--

    Best!
    Xuyang




At 2024-01-31 20:24:57, "Marek Maj" <marekm...@gmail.com> wrote:

Hello Flink Community,
In our Flink SQL job we are experiencing undesirable behavior that is related 
to events reordering (more below in background section)
I have a few questions related to sink upsert materializer, the answer to them 
should help me understand its capabilities:


1. Does the materializer support jobs containing different types of joins (more 
specifically regular and temporal joins)? 
2. Does the materializer support different types of input connectors: kafka 
with both debezium-avro-confluent and avro-confluent formats and upsert-kafka 
with avro-confluent format? All with well defined primary key (PK)
3. What is the recommended way to debug sink materializer? I outputed compiled 
plan for SQL job and I can see that upsert materializer is on, but I am not 
sure if I can extract more information about its behavior


Flink version we are using: 1.16.1


best regards
Marek




Background:
We have deployed Flink SQL job that uses multiple joins to enrich data coming 
from main table. Altogether we have 11 different joins used for enrichment: 
temporal joins as well as regular joins (both: left and inner).
All source tables and output table use kafka topics under the hood. Grain of 
the main table does not change: main table and output table are using the same 
non-nullable column for their PK. Job parallelism is 16


We are experiencing data reorder that is presented below
Data in kafka input topic for main table (correct order, all in the same 
partition):




Data in kafka output topic after enrichment (events reordered, all in the same 
partition):




Highlighted event in the source becomes the last in the output and as a 
consequence incorrectly overrides enriched values to nulls.
Reordering happens probably due to one slow join where there is a significant 
data skew and output from one parallel task of this operator is delayed 
compared to other parallel tasks. This join uses as its key value that is null 
in first event, and non-null in all other 4 events



However, my understanding is that events reordering should be corrected by 
SinkUpsertMaterializer operator. Our configuration contains:
table.exec.sink.upsert-materialize: FORCE

table.exec.state.ttl (not configured, default = 0)



Sink upsert materializer is visible in the compiled plan info and in the flink 
dashboard job graph after deployment. It looks like it is not behaving as I 
would expect and I would like to understand the reason

Reply via email to