[
https://issues.apache.org/jira/browse/FLINK-38734?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Nikola Stanisavljevic updated FLINK-38734:
------------------------------------------
Description:
The idea is to be able to customize how update set expression will look like
when doing upserts. At the moment update set expression is a complete row and
there is no customization. If we want to do conditional update only of
particular column or just partial update, we would need to keep in state and
create statefull processing in order to correctly upsert. If we are able to
customize update set expression, we dont need to keep things in state, and we
can rely on database.
Option could like this
{code:java}
public static final ConfigOption<String> SINK_UPSERT_UPDATE_SET_EXPRESSION =
ConfigOptions.key("sink.upsert.update-set-expression")
.stringType()
.noDefaultValue()
.withDescription("Update set expression to be used in upsert statement. If not
specified default is used which will upsert with whole row");{code}
Usage in sql code could look like this:
{code:java}
INSERT INTO catalog.db.table
/*+ OPTIONS( 'sink.upsert.update-set-expression' =
'load_time = VALUES(load_time),
epoch = LEAST(epoch, VALUES(epoch)),
last_event_time = GREATEST(last_event_time, VALUES(last_event_time))'
) */
...{code}
I would appreciate input on proposal.
Regarding implementation i already have working code in my fork for mysql.
was:
The idea is to be able to customize how update set expression will look like
when doing upserts. At the moment update set expression is a complete row and
there is no customization. If we want to do conditional update only of
particular column or just partial update, we would need to keep in state and
create statefull processing in order to correctly upsert. If we are able to
customize update set expression, we dont need to keep things in state, and we
can rely on database.
Option could like this
{code:java}
public static final ConfigOption<String> SINK_UPSERT_UPDATE_SET_EXPRESSION =
ConfigOptions.key("sink.upsert.update-set-expression")
.stringType()
.noDefaultValue()
.withDescription("Update set expression to be used in upsert statement. If not
specified default is used which will upsert with whole row");{code}
Usage in sql code could look like this:
{code:java}
INSERT INTO catalog.db.table
/*+ OPTIONS( 'sink.upsert.update-set-expression' =
'load_time = VALUES(load_time),
epoch = LEAST(epoch, VALUES(epoch)),
last_event_time = GREATEST(last_event_time, VALUES(last_event_time))'
) */
...{code}
I would appreciate input on proposal.
Regarding implementation i already have working code in my fork.
> Add option to customize update set expression when doing upserts with table
> sink
> --------------------------------------------------------------------------------
>
> Key: FLINK-38734
> URL: https://issues.apache.org/jira/browse/FLINK-38734
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / JDBC
> Reporter: Nikola Stanisavljevic
> Priority: Not a Priority
>
> The idea is to be able to customize how update set expression will look like
> when doing upserts. At the moment update set expression is a complete row and
> there is no customization. If we want to do conditional update only of
> particular column or just partial update, we would need to keep in state and
> create statefull processing in order to correctly upsert. If we are able to
> customize update set expression, we dont need to keep things in state, and we
> can rely on database.
> Option could like this
> {code:java}
> public static final ConfigOption<String> SINK_UPSERT_UPDATE_SET_EXPRESSION =
> ConfigOptions.key("sink.upsert.update-set-expression")
> .stringType()
> .noDefaultValue()
> .withDescription("Update set expression to be used in upsert statement. If
> not specified default is used which will upsert with whole row");{code}
> Usage in sql code could look like this:
> {code:java}
> INSERT INTO catalog.db.table
> /*+ OPTIONS( 'sink.upsert.update-set-expression' =
> 'load_time = VALUES(load_time),
> epoch = LEAST(epoch, VALUES(epoch)),
> last_event_time = GREATEST(last_event_time, VALUES(last_event_time))'
> ) */
> ...{code}
> I would appreciate input on proposal.
> Regarding implementation i already have working code in my fork for mysql.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)