[
https://issues.apache.org/jira/browse/FLINK-39438?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dion Ricky Saputra updated FLINK-39438:
---------------------------------------
Description:
*Background*
Currently, the MaxCompute sink operation mode (upsert vs append) is implicitly
determined based on the destination table type in MaxCompute. The writer
selects either {{BatchUpsertWriter}} or {{BatchAppendWriter}} using internal
logic (see {{{}MaxComputeWriter{}}}). This behavior limits flexibility,
especially when schema evolution is enabled.
When schema evolution is used, tables are automatically created if they do not
exist. These tables are created as transactional tables by default, which
forces the pipeline to always use the upsert mode. As a result, users cannot
explicitly control the sink behavior even if append mode is desired.
*Problem*
* Sink operation mode cannot be explicitly configured via pipeline YAML.
* Behavior is tightly coupled with MaxCompute table type.
* Schema evolution auto-creation enforces transactional tables → always upsert.
* No way to override this behavior for append use cases.
*Proposed Solution*
Introduce a new configuration option {{sink.operation}} to explicitly control
the sink mode.
*Example implementation:*
{code:java}
public static final ConfigOption<MaxComputeOptions.SinkOperation>
SINK_OPERATION =
ConfigOptions.key("sink.operation")
.enumType(MaxComputeOptions.SinkOperation.class)
.defaultValue(MaxComputeOptions.SinkOperation.UPSERT)
.withDescription(
"The sink operation type, support 'upsert' and
'append', default is 'upsert'."); {code}
This option should be used in {{MaxComputeWriter}} to decide whether to use:
* {{{}BatchUpsertWriter{}}}, or
* {{BatchAppendWriter}}
instead of relying on table type inference.
*Example YAML Configuration*
{code:java}
sink:
type: maxcompute
name: maxcompute
access-id: ${secret_values.maxcompute_access_id}
access-key: ${secret_values.maxcompute_access_key}
endpoint: ${secret_values.maxcompute_endpoint}
project: maxcompute_project
quota.name: res_grp
sink.operation: append {code}
*Expected Behavior*
* Users can explicitly define sink behavior ({{{}append{}}} or {{{}upsert{}}})
via configuration.
* Default behavior remains {{upsert}} for backward compatibility.
* Writer selection logic respects the configured option instead of table type.
*Acceptance Criteria*
* New config {{sink.operation}} is added and documented.
* {{MaxComputeWriter}} uses this config to select writer implementation.
* Backward compatibility is preserved (default = upsert).
* YAML configuration correctly overrides inferred behavior.
was:
*Background*
Currently, the MaxCompute sink operation mode (upsert vs append) is implicitly
determined based on the destination table type in MaxCompute. The writer
selects either {{BatchUpsertWriter}} or {{BatchAppendWriter}} using internal
logic (see {{{}MaxComputeWriter{}}}). This behavior limits flexibility,
especially when schema evolution is enabled.
When schema evolution is used, tables are automatically created if they do not
exist. These tables are created as transactional tables by default, which
forces the pipeline to always use the upsert mode. As a result, users cannot
explicitly control the sink behavior even if append mode is desired.
*Problem*
* Sink operation mode cannot be explicitly configured via pipeline YAML.
* Behavior is tightly coupled with MaxCompute table type.
* Schema evolution auto-creation enforces transactional tables → always upsert.
* No way to override this behavior for append use cases.
*Proposed Solution*
Introduce a new configuration option {{sink.operation}} to explicitly control
the sink mode.
*Example implementation:*
public static final ConfigOption<MaxComputeOptions.SinkOperation>
SINK_OPERATION =
ConfigOptions.key("sink.operation")
.enumType(MaxComputeOptions.SinkOperation.class)
.defaultValue(MaxComputeOptions.SinkOperation.UPSERT)
.withDescription(
"The sink operation type, support 'upsert' and
'append', default is 'upsert'.");
This option should be used in {{MaxComputeWriter}} to decide whether to use:
* {{{}BatchUpsertWriter{}}}, or
* {{BatchAppendWriter}}
instead of relying on table type inference.
*Example YAML Configuration*
sink:
type: maxcompute
name: maxcompute
access-id: ${secret_values.maxcompute_access_id}
access-key: ${secret_values.maxcompute_access_key}
endpoint: ${secret_values.maxcompute_endpoint}
project: maxcompute_project
quota.name: res_grp
sink.operation: append
*Expected Behavior*
* Users can explicitly define sink behavior ({{{}append{}}} or {{{}upsert{}}})
via configuration.
* Default behavior remains {{upsert}} for backward compatibility.
* Writer selection logic respects the configured option instead of table type.
*Acceptance Criteria*
* New config {{sink.operation}} is added and documented.
* {{MaxComputeWriter}} uses this config to select writer implementation.
* Backward compatibility is preserved (default = upsert).
* YAML configuration correctly overrides inferred behavior.
> Add configurable sink operation (upsert/append) for MaxCompute sink in Flink
> CDC
> --------------------------------------------------------------------------------
>
> Key: FLINK-39438
> URL: https://issues.apache.org/jira/browse/FLINK-39438
> Project: Flink
> Issue Type: Improvement
> Components: Flink CDC
> Affects Versions: cdc-3.5.0
> Reporter: Dion Ricky Saputra
> Priority: Major
>
> *Background*
> Currently, the MaxCompute sink operation mode (upsert vs append) is
> implicitly determined based on the destination table type in MaxCompute. The
> writer selects either {{BatchUpsertWriter}} or {{BatchAppendWriter}} using
> internal logic (see {{{}MaxComputeWriter{}}}). This behavior limits
> flexibility, especially when schema evolution is enabled.
> When schema evolution is used, tables are automatically created if they do
> not exist. These tables are created as transactional tables by default, which
> forces the pipeline to always use the upsert mode. As a result, users cannot
> explicitly control the sink behavior even if append mode is desired.
> *Problem*
> * Sink operation mode cannot be explicitly configured via pipeline YAML.
> * Behavior is tightly coupled with MaxCompute table type.
> * Schema evolution auto-creation enforces transactional tables → always
> upsert.
> * No way to override this behavior for append use cases.
> *Proposed Solution*
> Introduce a new configuration option {{sink.operation}} to explicitly control
> the sink mode.
> *Example implementation:*
> {code:java}
> public static final ConfigOption<MaxComputeOptions.SinkOperation>
> SINK_OPERATION =
> ConfigOptions.key("sink.operation")
> .enumType(MaxComputeOptions.SinkOperation.class)
> .defaultValue(MaxComputeOptions.SinkOperation.UPSERT)
> .withDescription(
> "The sink operation type, support 'upsert' and
> 'append', default is 'upsert'."); {code}
> This option should be used in {{MaxComputeWriter}} to decide whether to use:
> * {{{}BatchUpsertWriter{}}}, or
> * {{BatchAppendWriter}}
> instead of relying on table type inference.
> *Example YAML Configuration*
> {code:java}
> sink:
> type: maxcompute
> name: maxcompute
> access-id: ${secret_values.maxcompute_access_id}
> access-key: ${secret_values.maxcompute_access_key}
> endpoint: ${secret_values.maxcompute_endpoint}
> project: maxcompute_project
> quota.name: res_grp
> sink.operation: append {code}
> *Expected Behavior*
> * Users can explicitly define sink behavior ({{{}append{}}} or
> {{{}upsert{}}}) via configuration.
> * Default behavior remains {{upsert}} for backward compatibility.
> * Writer selection logic respects the configured option instead of table
> type.
> *Acceptance Criteria*
> * New config {{sink.operation}} is added and documented.
> * {{MaxComputeWriter}} uses this config to select writer implementation.
> * Backward compatibility is preserved (default = upsert).
> * YAML configuration correctly overrides inferred behavior.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)