[
https://issues.apache.org/jira/browse/FLINK-39438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39438:
-----------------------------------
Labels: pull-request-available (was: )
> Add configurable sink operation (upsert/append) for MaxCompute sink in Flink
> CDC
> --------------------------------------------------------------------------------
>
> Key: FLINK-39438
> URL: https://issues.apache.org/jira/browse/FLINK-39438
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: Dion Ricky Saputra
> Priority: Major
> Labels: pull-request-available
>
> *Background*
> Currently, the MaxCompute sink operation mode (upsert vs append) is
> implicitly determined based on the destination table type in MaxCompute. The
> writer selects either {{BatchUpsertWriter}} or {{BatchAppendWriter}} using
> internal logic (see {{{}MaxComputeWriter{}}}). This behavior limits
> flexibility, especially when schema evolution is enabled.
> When schema evolution is used, tables are automatically created if they do
> not exist. These tables are created as transactional tables by default, which
> forces the pipeline to always use the upsert mode. As a result, users cannot
> explicitly control the sink behavior even if append mode is desired.
> *Problem*
> * Sink operation mode cannot be explicitly configured via pipeline YAML.
> * Behavior is tightly coupled with MaxCompute table type.
> * Schema evolution auto-creation enforces transactional tables → always
> upsert.
> * No way to override this behavior for append use cases.
> *Proposed Solution*
> Introduce a new configuration option {{sink.operation}} to explicitly control
> the sink mode.
> *Example implementation:*
> {code:java}
> public static final ConfigOption<MaxComputeOptions.SinkOperation>
> SINK_OPERATION =
> ConfigOptions.key("sink.operation")
> .enumType(MaxComputeOptions.SinkOperation.class)
> .defaultValue(MaxComputeOptions.SinkOperation.UPSERT)
> .withDescription(
> "The sink operation type, support 'upsert' and
> 'append', default is 'upsert'."); {code}
> This option should be used in {{MaxComputeWriter}} to decide whether to use:
> * {{{}BatchUpsertWriter{}}}, or
> * {{BatchAppendWriter}}
> instead of relying on table type inference.
> *Example YAML Configuration*
> {code:java}
> sink:
> type: maxcompute
> name: maxcompute
> access-id: ${secret_values.maxcompute_access_id}
> access-key: ${secret_values.maxcompute_access_key}
> endpoint: ${secret_values.maxcompute_endpoint}
> project: maxcompute_project
> quota.name: res_grp
> sink.operation: append {code}
> *Expected Behavior*
> * Users can explicitly define sink behavior ({{{}append{}}} or
> {{{}upsert{}}}) via configuration.
> * Default behavior remains {{upsert}} for backward compatibility.
> * Writer selection logic respects the configured option instead of table
> type.
> *Acceptance Criteria*
> * New config {{sink.operation}} is added and documented.
> * {{MaxComputeWriter}} uses this config to select writer implementation.
> * Backward compatibility is preserved (default = upsert).
> * YAML configuration correctly overrides inferred behavior.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)