lvyanquan commented on code in PR #4377:
URL: https://github.com/apache/flink-cdc/pull/4377#discussion_r3165334762
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/MaxComputeDataSinkOptions.java:
##########
@@ -107,4 +108,11 @@ public class MaxComputeDataSinkOptions {
.intType()
.defaultValue(4)
.withDescription("The number of concurrent with flush
bucket data.");
+
+ public static final ConfigOption<MaxComputeOptions.SinkOperation>
SINK_OPERATION =
Review Comment:
Please add documentation for the newly introduced sink.operation option.
This should explain:
- **upsert (default)**: Requires primary keys in schema. Creates a
transactional table and supports update/delete semantics.
- **append**: Creates a regular table regardless of primary keys. Only
supports insert operations, suitable for append-only scenarios.
This helps users understand the configuration impact and choose the
appropriate mode for their use case.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeOptions.java:
##########
@@ -140,8 +147,46 @@ public Builder withSchemaOperatorUid(String
schemaOperatorUid) {
return this;
}
+ public Builder withSinkOperation(SinkOperation sinkOperation) {
+ this.sinkOperation = sinkOperation;
+ return this;
+ }
+
public MaxComputeOptions build() {
return new MaxComputeOptions(this);
}
}
+
+ /** Sink operation mode for MaxCompute: APPEND or UPSERT. */
+ public enum SinkOperation {
+ APPEND("append"),
+ UPSERT("upsert");
+
+ private final String value;
+
+ SinkOperation(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
+ return value;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ public static SinkOperation fromValue(String value) {
Review Comment:
This method is unused and could be removed.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-maxcompute/src/main/java/org/apache/flink/cdc/connectors/maxcompute/options/MaxComputeOptions.java:
##########
@@ -140,8 +147,46 @@ public Builder withSchemaOperatorUid(String
schemaOperatorUid) {
return this;
}
+ public Builder withSinkOperation(SinkOperation sinkOperation) {
+ this.sinkOperation = sinkOperation;
+ return this;
+ }
+
public MaxComputeOptions build() {
return new MaxComputeOptions(this);
}
}
+
+ /** Sink operation mode for MaxCompute: APPEND or UPSERT. */
+ public enum SinkOperation {
+ APPEND("append"),
+ UPSERT("upsert");
+
+ private final String value;
+
+ SinkOperation(String value) {
+ this.value = value;
+ }
+
+ public String getValue() {
Review Comment:
This method is unused and could be removed.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]