Copilot commented on code in PR #4277:
URL: https://github.com/apache/flink-cdc/pull/4277#discussion_r2845714553
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java:
##########
@@ -53,6 +60,66 @@ public static Types.NestedField
convertCdcColumnToIcebergField(
column.getComment());
}
+ /**
+ * Parse a CDC default value expression string into an Iceberg {@link
Literal}.
+ *
+ * @return the parsed Literal, or null if the expression is null or cannot
be parsed for the
+ * given type.
+ */
+ @Nullable
+ public static Literal<?> parseDefaultValue(
+ @Nullable String defaultValueExpression, DataType cdcType) {
+ if (defaultValueExpression == null) {
+ return null;
+ }
+ try {
+ switch (cdcType.getTypeRoot()) {
Review Comment:
`defaultValueExpression` is parsed without trimming. If the upstream default
contains leading/trailing whitespace (common in some JDBC metadata), numeric
and boolean parsing will fail and silently skip defaults. Consider applying
`trim()` (and possibly handling surrounding parentheses) before the switch to
make parsing more robust.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java:
##########
@@ -53,6 +60,66 @@ public static Types.NestedField
convertCdcColumnToIcebergField(
column.getComment());
}
+ /**
+ * Parse a CDC default value expression string into an Iceberg {@link
Literal}.
+ *
+ * @return the parsed Literal, or null if the expression is null or cannot
be parsed for the
+ * given type.
+ */
+ @Nullable
+ public static Literal<?> parseDefaultValue(
+ @Nullable String defaultValueExpression, DataType cdcType) {
+ if (defaultValueExpression == null) {
+ return null;
+ }
+ try {
+ switch (cdcType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return Literal.of(defaultValueExpression);
+ case BOOLEAN:
Review Comment:
For CHAR/VARCHAR defaults, this returns the raw `defaultValueExpression`
as-is. Some sources (e.g., Oracle/Postgres) may provide quoted literals (e.g.,
`'active'`) or expressions/casts (e.g., `'active'::varchar`), which would end
up persisted verbatim as an Iceberg default. Consider normalizing string
literals (strip surrounding quotes/escape) and rejecting non-literal
expressions (casts/functions) to avoid writing incorrect defaults.
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/IcebergMetadataApplier.java:
##########
@@ -212,24 +237,28 @@ private void applyAddColumnEventWithPosition(Table table,
AddColumnEvent event)
FlinkSchemaUtil.convert(
DataTypeUtils.toFlinkDataType(addColumn.getType())
.getLogicalType());
+ Literal<?> defaultValue =
+ IcebergTypeUtils.parseDefaultValue(
+ addColumn.getDefaultValueExpression(),
addColumn.getType());
+ if (defaultValue != null) {
+ updateSchema.addColumn(columnName, icebergType,
columnComment, defaultValue);
+ } else {
+ updateSchema.addColumn(columnName, icebergType,
columnComment);
+ }
switch (columnWithPosition.getPosition()) {
case FIRST:
- updateSchema.addColumn(columnName, icebergType,
columnComment);
- table.updateSchema().moveFirst(columnName);
+ updateSchema.moveFirst(columnName);
break;
case LAST:
- updateSchema.addColumn(columnName, icebergType,
columnComment);
break;
case BEFORE:
Review Comment:
In the BEFORE case, `columnWithPosition.getExistedColumnName()` is used
without a null check, while AFTER explicitly validates it. Since
`ColumnWithPosition` allows a null `existedColumnName`, consider adding a
`checkNotNull` for BEFORE as well (matching AFTER) to fail fast with a clearer
error if an invalid event is provided.
```suggestion
case BEFORE:
checkNotNull(
columnWithPosition.getExistedColumnName(),
"Existing column name must be provided for
BEFORE position");
```
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-iceberg/src/main/java/org/apache/flink/cdc/connectors/iceberg/sink/utils/IcebergTypeUtils.java:
##########
@@ -53,6 +60,66 @@ public static Types.NestedField
convertCdcColumnToIcebergField(
column.getComment());
}
+ /**
+ * Parse a CDC default value expression string into an Iceberg {@link
Literal}.
+ *
+ * @return the parsed Literal, or null if the expression is null or cannot
be parsed for the
+ * given type.
+ */
+ @Nullable
+ public static Literal<?> parseDefaultValue(
+ @Nullable String defaultValueExpression, DataType cdcType) {
+ if (defaultValueExpression == null) {
+ return null;
+ }
+ try {
+ switch (cdcType.getTypeRoot()) {
+ case CHAR:
+ case VARCHAR:
+ return Literal.of(defaultValueExpression);
+ case BOOLEAN:
+ if ("true".equalsIgnoreCase(defaultValueExpression)) {
+ return Literal.of(true);
+ } else if
("false".equalsIgnoreCase(defaultValueExpression)) {
+ return Literal.of(false);
+ } else {
+ LOG.warn(
+ "Invalid boolean default value '{}', skipping
default value.",
+ defaultValueExpression);
+ return null;
+ }
+ case TINYINT:
+ case SMALLINT:
+ case INTEGER:
+ return
Literal.of(Integer.parseInt(defaultValueExpression));
+ case BIGINT:
+ return Literal.of(Long.parseLong(defaultValueExpression));
+ case FLOAT:
+ return
Literal.of(Float.parseFloat(defaultValueExpression));
+ case DOUBLE:
+ return
Literal.of(Double.parseDouble(defaultValueExpression));
+ case DECIMAL:
+ int scale = DataTypes.getScale(cdcType).orElse(0);
+ return Literal.of(
+ new java.math.BigDecimal(defaultValueExpression)
+ .setScale(scale,
java.math.RoundingMode.HALF_UP));
+ default:
+ LOG.warn(
+ "Unsupported default value type {} for expression
'{}', skipping default value.",
+ cdcType.getTypeRoot(),
+ defaultValueExpression);
+ return null;
Review Comment:
The `default` branch logs a WARN for every unsupported type/expression.
Given the PR explicitly doesn't support DATE/TIME/TIMESTAMP defaults, this will
likely emit frequent warnings for common defaults like `CURRENT_TIMESTAMP`
during table creation/evolution. Consider lowering this to DEBUG or only
logging when the expression looks like a constant literal but the type is
unsupported, to reduce log noise in production.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]