Hi Dawid,

thank you very much for working and proposing this FLIP. This is an excellent design document that shows the deep research you have conducted. Both the linked resources as well as the examples are very helpful to get the big picture.

Sink upsert materializer is a long-standing problem for Flink SQL pipelines. Only a few people really understand why it exists and why it is so expensive. I also know that some users have simply disabled it because they are fine with the output results (potentially ignoring corner cases intentially or by accident).

In general the FLIP is in a very good shape, and you definitely get my +1 on this. Just some last questions:

1) Will we support ON CONFLICT syntax also for append-only inputs?

2) Regarding naming, I find the config option is too long. "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval" could we simplify it to something less internal? Maybe table.exec.sink.upserts.compaction-interval?

Cheers,
Timo


On 08.01.26 12:42, Dawid Wysakowicz wrote:

Today there are still many retract sources, such as some sources in the
Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some
formats, etc.These can be further divided into two categories.
One is like Debezium: there is only a single UPDATE record in the physical
storage, and the corresponding Flink source connector further splits it
into UA/UB. The other is where UA and UB are already two separate changelog
records in the physical storage.
For the former, we could generate a watermark boundary before the source
just like checkpoint barrier, so that UB and UA are guaranteed to fall
within the same boundary. This should actually be supportable. It’s okay if
we don’t support it in the first version, but it may affect the overall
design—for example, how to generate the system watermark boundary.
For the latter, it’s probably more troublesome. I think it’s also fine not
to support it. What do you think?


When designing the FLIP I considered the debezium case and it's actually
not so much of a problem as you correctly pointed out. The only requirement
is that the watermark is generated before the message split. I'd start
without support for those sources and we can improve on that later on.

3. Oh, what I meant is how about renaming it to something like
"table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"?
Because I think it may be not a “watermark”; it’s a compaction barrier, and
this compaction can be 1) replaced by watermark, or 2) replaced by
checkpoint, or 3) generated by the Flink system internally. What do you
think?


Fine by me. We can call
it "table.exec.sink.upsert-materialize-barrier-mode.compaction-interval".

4. I’m also wondering whether we don’t even need the state about “a single
result per key for a cross-watermark-boundary handover”?


I am pretty sure we do in order to adhere to the ON CONFLICT ERROR
behaviour. Bear in mind two "active" keys may come as part of two separate
barriers.

On Thu, 8 Jan 2026 at 05:14, Xuyang <[email protected]> wrote:

1. I agree it, at least it won’t be worse. Currently, for data that
contains non-deterministic functions without UK, SUM cannot properly handle
correcting out-of-order records.


2. It’s fine if we first don’t support it. Let me add some more context.
Today there are still many retract sources, such as some sources in the
Flink CDC project (e.g., PG CDC, MySQL CDC), Paimon, Hudi, and some
formats, etc.These can be further divided into two categories.
One is like Debezium: there is only a single UPDATE record in the physical
storage, and the corresponding Flink source connector further splits it
into UA/UB. The other is where UA and UB are already two separate changelog
records in the physical storage.
For the former, we could generate a watermark boundary before the source
just like checkpoint barrier, so that UB and UA are guaranteed to fall
within the same boundary. This should actually be supportable. It’s okay if
we don’t support it in the first version, but it may affect the overall
design—for example, how to generate the system watermark boundary.
For the latter, it’s probably more troublesome. I think it’s also fine not
to support it. What do you think?


3. Oh, what I meant is how about renaming it to something like
"table.exec.sink.upsert-materialize-barrier-mode.compaction-interval"?
Because I think it may be not a “watermark”; it’s a compaction barrier, and
this compaction can be 1) replaced by watermark, or 2) replaced by
checkpoint, or 3) generated by the Flink system internally. What do you
think?


4. I’m also wondering whether we don’t even need the state about “a single
result per key for a cross-watermark-boundary handover”?





--

     Best!
     Xuyang



在 2026-01-07 19:54:47,"Dawid Wysakowicz" <[email protected]> 写道:
1. Without upsert key, how do we determine the order of multiple records
within the same watermark boundary when non-deterministic functions are
involved? For example, if we receive data like the “disorder (2)” case
below, and the upsert key is lost after a join, what will the final output
be (without internal consistency issues)?

If you have non-deterministic functions like in your example the
retraction
does not work anyhow. For a retraction to work if there is no primary key
defined we require all columns to be deterministic:

https://github.com/apache/flink/blob/fec9336e7d99500aeb9097e441d8da0e6bde5943/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/optimize/StreamNonDeterministicUpdatePlanVisitor.java#L142

2. To address internal consistency issues caused by emitting -U, we need
each watermark boundary to contain paired -U/+U. That means we have to
track what the source sends, and only emit the "watermark boundary" after
the +U has been sent, right? For an upsert source, this is easy because we
have ChangelogNormalize to output the -U/+U pairs. But for a retract
source, we may need to introduce a new stateful operator. This seems
unavoidable unless the source to output -U and +U together.

Yes, this assumption does not hold for retracting sources. So far we don't
have any such sources. I'll introduce a check that would fail for a
combination of a retracting source and DO ERROR/DO NOTHING.

3. About
"table.exec.sink.upsert-materialize-barrier-mode.watermark-interval", it’s
not actually called “watermark internal”, because it doesn't have
watermark
semantics to drop late data. It’s actually a compaction barrier, right?

I think I don't fully understand this point. Could you please explain it
one more time? Are you suggesting a different name?

4. The state in SUM is used to handle rollback under out-of-order
scenarios. Since we resolve out-of-orderness within a watermark boundary,
does that mean we don’t need state anymore? More clearly, we only need a
"temporary" state that lives within each watermark boundary. (Or what I
can
think of is: you’re using this persistent state to support the subsequent
`ON CONFLICT conflict_action`.)

Yes, I think more or less that is correct. We need the "temporary" state
within boundary + a single result per key for a cross watermark boundary
handover. I explain that in the FLIP.

Best,
Dawid

On Mon, 5 Jan 2026 at 09:39, Xuyang <[email protected]> wrote:

Thank you for the explanation. I think I understand what you mean now. I
have a few questions I’d like to confirm:
1. Without upsert key, how do we determine the order of multiple records
within the same watermark boundary when non-deterministic functions are
involved? For example, if we receive data like the “disorder (2)” case
below, and the upsert key is lost after a join, what will the final
output
be (without internal consistency issues)?


+U(id=1, level=20, attr='b1', rand='1.5')
+U(id=1, level=10, attr='a1', rand='1') // originally +I; I slightly
modified it
-U(id=1, level=10, attr='a1', rand='2')


The watermark is an internal consistency boundary. You will always
have
a UB for the old value and an UA for the new value.
2. To address internal consistency issues caused by emitting -U, we need
each watermark boundary to contain paired -U/+U. That means we have to
track what the source sends, and only emit the "watermark boundary"
after
the +U has been sent, right? For an upsert source, this is easy because
we
have ChangelogNormalize to output the -U/+U pairs. But for a retract
source, we may need to introduce a new stateful operator. This seems
unavoidable unless the source to output -U and +U together.


3. About
"table.exec.sink.upsert-materialize-barrier-mode.watermark-interval",
it’s
not actually called “watermark internal”, because it doesn't have
watermark
semantics to drop late data. It’s actually a compaction barrier, right?


4. The state in SUM is used to handle rollback under out-of-order
scenarios. Since we resolve out-of-orderness within a watermark
boundary,
does that mean we don’t need state anymore? More clearly, we only need a
"temporary" state that lives within each watermark boundary. (Or what I
can
think of is: you’re using this persistent state to support the
subsequent
`ON CONFLICT conflict_action`.)



--

     Best!
     Xuyang



在 2025-12-29 17:51:40,"Dawid Wysakowicz" <[email protected]> 写道:
But I’d like to add a clarification. Take the “Changelog Disorder”
example described in the FLIP (


https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder
).
Let’s look at disorder (2) and disorder (3). Under the default ON
CONFLICT
ERROR, IIUC, the expected behavior is that Flink should fail. However,
those inserts and updates actually all come from the same PK on table
s1
(id = 1). From a relational-algebra perspective, this does not violate
the
PK constraint; it only happens because we shuffle by level and end up
with
out-of-order issues under multi parallelisms. In other words, if we run
this SQL in batch, the pk conflict will not happen and ON CONFLICT
ERROR
should not fail. If the streaming job fails, users will be confused.
For
disorder issues introduced by Flink internally, I believe Flink should
handle them internally.

Let's first clarify this point, because I think it's vital for the
understanding of the proposal so we must be on the same page before we
talk
about other points.

No, the example you pointed out would not throw an error in `ON
CONFLICT
ERROR`. As you pointed out yourself those come from the same PK on
table
S1, therefore you will not have two active records with (id = 1) in the
sink on the watermark boundary. The watermark is an internal
consistency
boundary. You will always have a UB for the old value and an UA for the
new
value. Therefore you will only ever have a single value after the
compaction. We would throw only if we try to upsert into a single row
in
the sink from two rows from s1 with different ids e.g. {source_id=1,
sink_id=1}, {source_id=2, sink_id=1}.

Before I talk about 3, let me talk about 4 first. If I’m not
mistaken,
we
need a deterministic boundary to determine that upstream data will no
longer be updated, so that we can output the “final” result.

No, that's not what I understand as internal consistency. An internal
consistency is that a single change in the source produces a single
change
in the sink, without incorrect intermediate states caused by
processing UB
separately from UA. I don't want to process multiple source changes in
a
single batch. I'd rather call it "external" consistency or snapshot
processing as you are suggesting, but in my mind this is an orthogonal
topic from what I am trying to solve here.

On Tue, 23 Dec 2025 at 12:28, Xuyang <[email protected]> wrote:

Hi, Dawid. Thank you for your detailed explanation and the update.
Let
me
share my thoughts.
1. In fact, I agree with what you said: for clearly problematic
queries,
we should fail fast — for example, the case you mentioned (writing
data
from a source table whose PK is id into a sink table whose PK is
name).
We
can fail like a traditional database: PK conflict. That’s totally
fine.
2. But I’d like to add a clarification. Take the “Changelog Disorder”
example described in the FLIP (


https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=399279158#FLIP558:ImprovementstoSinkUpsertMaterializerandchangelogdisorder-ExampleofChangelogDisorder
).
Let’s look at disorder (2) and disorder (3). Under the default ON
CONFLICT
ERROR, IIUC, the expected behavior is that Flink should fail.
However,
those inserts and updates actually all come from the same PK on
table s1
(id = 1). From a relational-algebra perspective, this does not
violate
the
PK constraint; it only happens because we shuffle by level and end up
with
out-of-order issues under multi parallelisms. In other words, if we
run
this SQL in batch, the pk conflict will not happen and ON CONFLICT
ERROR
should not fail. If the streaming job fails, users will be confused.
For
disorder issues introduced by Flink internally, I believe Flink
should
handle them internally.
4. Before I talk about 3, let me talk about 4 first. If I’m not
mistaken,
we need a deterministic boundary to determine that upstream data
will no
longer be updated, so that we can output the “final” result. I think
our
disagreement is about where that “data boundary” is. In this FLIP,
the
boundary is described as: 1) watermark or 2) checkpoint. But I think
such a
boundary is tied to the table, for example, “the creation of a
specific
historical snapshot version of a table.” Since data in the snapshot
is
immutable, we can output results at that point. What do you think?
3. If we must choose one of the introduced options, I lean toward
Option
1, because we already have a clear definition for watermarks defined
on
a
table: “allowing for consistent results despite out-of-order or late
events” (


https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/time_attributes/#event-time
).
This “drop late events” semantic does not exist in checkpoint.
However,
my
concern is that in most scenarios, a CDC source may produce multiple
updates for the same PK over a long time span, so the watermark
should
be
defined very large, which will cause the job to produce no output
for a
long time.



--

     Best!
     Xuyang



At 2025-12-17 16:52:26, "Dawid Wysakowicz" <[email protected]>
wrote:
Hey Gustavo, Xuyang
I tried incorporating your suggestions into the FLIP. Please take
another
look.
Best,
Dawid

On Fri, 12 Dec 2025 at 16:05, Dawid Wysakowicz <
[email protected]

wrote:

1. The default behavior changes if no ON CONFLICT is defined. I
am a
little concerned that this may cause errors in a large number of
existing
cases.

I can be convinced to leave the default behaviour as it is now. I
am
worried though, very rarely the current behaviour of SUM is what
people
actually want. As mentioned in the FLIP I wholeheartedly believe
there
are
very little if any real world scenarios where you need the
deduplicate
behaviour. I try to elaborate a bit more in 2)

2. Regarding On Conflict Errors, in the context of CDC streams,
it is
expected that the vast majority of cases cannot generate only one
record
with one primary key. The only solutions I can think of are
append-only
top1, deduplication, or aggregating the first row.

I disagree with that statement. I don't think CDC streams change
anything
in that regard. Maybe there is some misunderstanding about what a
one
record means in this context.

I agree almost certainly there will be a sequence of UA, UB for a
single
sink's primary key.

My claim is that users almost never want a situation where they
have
more
than one "active" upsert key/record for one sink's primary key. I
tried
to
explain that in the FLIP, but let me try to give one more example
here.

Imagine two tables:
CREATE TABLE source (
   id bigint PRIMARY KEY,
   name string,
   value string
)

CREATE TABLE sink (
   name string PRIMARY KEY,
   value string
)

INSERT INTO sink SELECT name, value;

=== Input
(1, "Apple", "ABC")
(2, "Apple", "DEF")

In the scenario above a SUM is inserted which will deduplicate the
rows
and override the value for "Apple" with "DEF". In my opinion it's
entirely
wrong, instead an exception should be thrown that there is
actually a
constraint validation.

I am absolutely more than happy to be proved wrong. If you do
have a
real
world scenario where the deduplication logic is actually correct
and
expected please, please do share. So far I have not seen one, nor
was I
able to come up with one. And yet I am not suggesting to remove
the
deduplication logic entirely, users can still use it with ON
CONFLICT
DEDUPLICATE.

3. The special watermark generation interval affects the
visibility
of
results. How can users configure this generation interval?


That's a fair question I'll try to elaborate on in the FLIP. I can
see
two
options:
1. We piggyback on existing watermarks in the query, if there are
no
watermarks (tables don't have a watermark definition) we fail
during
planning
2. We add a new parameter option for a specialized generalized
watermark

Let me think for some more on that and I'll come back with a more
concrete
proposal.


4. I believe that resolving out-of-order issues and addressing
internal
consistency are two separate problems. As I understand the
current
solution, it does not  really resolve the internal consistency
issue.
We
could first resolve the out-of-order problem. For most scenarios
that
require real-time response, we can directly output intermediate
results
promptly.


Why doesn't it solve it? It does. Given a pair of UB/UA we won't
emit
the
temporary state after processing the UB.

5. How can we compact data with the same custom watermark? If
detailed
comparisons are necessary, I think we still need to preserve all
key
data;
we would just be compressing this data further at time t.

Yes, we need to preserve all key data, but only between two
watermarks.
Assuming frequent watermarks, that's for a very short time.

6. If neither this proposed solution nor the reject solution can
resolve
internal consistency, we need to reconsider the differences
between
the two
approaches.

I'll copy the explanation why the rejected alternative should be
rejected
from the FLIP:

The solution can help us to solve the changelog disorder problem,
but it
does not help with the *internal consistency *issue. If we want to
fix
that as well, we still need the compaction on watermarks. At the
same
time
it increases the size of all flowing records. Therefore it was
rejected
in
favour of simply compacting all records once on the progression of
watermarks.

Best,
Dawid






Reply via email to