Pietro created FLINK-34722:
------------------------------
Summary: Support conditional upserts with Postgres JDBC sink
Key: FLINK-34722
URL: https://issues.apache.org/jira/browse/FLINK-34722
Project: Flink
Issue Type: Improvement
Components: Connectors / JDBC
Affects Versions: jdbc-3.1.2
Reporter: Pietro
The default Postgres dialect used by the JDBC sink for PostgreSQL DBs does not
support custom _WHERE_ conditions inside upsert statements at the moment.
Indeed, upsert statements returned by the
{{[getUpsertStatement()|https://github.com/apache/flink-connector-jdbc/blob/95294ffbc57c93c2af34cda94c27fc5781e06177/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java#L61]}}
method are limited to:
{code:sql}
ON CONFLICT (col1, ..., colN)
DO UPDATE SET (col1=EXCLUDED.col1, ..., colN=EXCLUDED.colN)
{code}
PostgreSQL allows a finer-grained control of upsert statements by specifying a
_WHERE_ statement (see [ON CONFLICT
Clause|https://www.postgresql.org/docs/current/sql-insert.html#SQL-ON-CONFLICT]),
for instance:
{code:sql}
ON CONFLICT (col1, ..., colN)
DO UPDATE SET (col1=EXCLUDED.col1, ..., colN=EXCLUDED.colN)
WHERE colN < EXCLUDED.colN
{code}
This could be useful in many use cases, for instance, in a CDC scenario where a
batch reconciliation process has written records in the destination, which now
face the risk of being overwritten by late arriving, stale, records in the
streaming pipeline (adding a condition on the operation timestamp could protect
from these events).
My proposal is to extend the
{{[AbstractPostgresCompatibleDialect|https://github.com/apache/flink-connector-jdbc/blob/main/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/dialect/AbstractPostgresCompatibleDialect.java]}}
functionalities by making the upsert query support _WHERE_ statements provided
by users.
I'm thinking of two possible approaches, but I'd love to hear your opinion on
this:
# provide the statement through options of the JDBC sink connector.
# allow users to plug custom dialects without them having to rewrite the whole
JDBC sink (about this I'll open a separate issue soon)
Thanks for your consideration
--
This message was sent by Atlassian Jira
(v8.20.10#820010)