[ 
https://issues.apache.org/jira/browse/FLINK-39438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39438:
-----------------------------------
    Labels: pull-request-available  (was: )

> Add configurable sink operation (upsert/append) for MaxCompute sink in Flink 
> CDC
> --------------------------------------------------------------------------------
>
>                 Key: FLINK-39438
>                 URL: https://issues.apache.org/jira/browse/FLINK-39438
>             Project: Flink
>          Issue Type: Improvement
>          Components: Flink CDC
>    Affects Versions: cdc-3.5.0
>            Reporter: Dion Ricky Saputra
>            Priority: Major
>              Labels: pull-request-available
>
> *Background*
> Currently, the MaxCompute sink operation mode (upsert vs append) is 
> implicitly determined based on the destination table type in MaxCompute. The 
> writer selects either {{BatchUpsertWriter}} or {{BatchAppendWriter}} using 
> internal logic (see {{{}MaxComputeWriter{}}}). This behavior limits 
> flexibility, especially when schema evolution is enabled.
> When schema evolution is used, tables are automatically created if they do 
> not exist. These tables are created as transactional tables by default, which 
> forces the pipeline to always use the upsert mode. As a result, users cannot 
> explicitly control the sink behavior even if append mode is desired.
> *Problem*
>  * Sink operation mode cannot be explicitly configured via pipeline YAML.
>  * Behavior is tightly coupled with MaxCompute table type.
>  * Schema evolution auto-creation enforces transactional tables → always 
> upsert.
>  * No way to override this behavior for append use cases.
> *Proposed Solution*
> Introduce a new configuration option {{sink.operation}} to explicitly control 
> the sink mode.
> *Example implementation:*
> {code:java}
> public static final ConfigOption<MaxComputeOptions.SinkOperation> 
> SINK_OPERATION =
>         ConfigOptions.key("sink.operation")
>                 .enumType(MaxComputeOptions.SinkOperation.class)
>                 .defaultValue(MaxComputeOptions.SinkOperation.UPSERT)
>                 .withDescription(
>                         "The sink operation type, support 'upsert' and 
> 'append', default is 'upsert'."); {code}
> This option should be used in {{MaxComputeWriter}} to decide whether to use:
>  * {{{}BatchUpsertWriter{}}}, or
>  * {{BatchAppendWriter}}
> instead of relying on table type inference.
> *Example YAML Configuration*
> {code:java}
> sink:
>   type: maxcompute
>   name: maxcompute
>   access-id: ${secret_values.maxcompute_access_id}
>   access-key: ${secret_values.maxcompute_access_key}
>   endpoint: ${secret_values.maxcompute_endpoint}
>   project: maxcompute_project
>   quota.name: res_grp
>   sink.operation: append {code}
> *Expected Behavior*
>  * Users can explicitly define sink behavior ({{{}append{}}} or 
> {{{}upsert{}}}) via configuration.
>  * Default behavior remains {{upsert}} for backward compatibility.
>  * Writer selection logic respects the configured option instead of table 
> type.
> *Acceptance Criteria*
>  * New config {{sink.operation}} is added and documented.
>  * {{MaxComputeWriter}} uses this config to select writer implementation.
>  * Backward compatibility is preserved (default = upsert).
>  * YAML configuration correctly overrides inferred behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to