lvyanquan commented on code in PR #4081:
URL: https://github.com/apache/flink-cdc/pull/4081#discussion_r2339022229
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java:
##########
@@ -197,4 +202,101 @@ public static org.apache.flink.table.types.DataType
toFlinkDataType(DataType typ
throw new IllegalArgumentException("Illegal type: " + type);
}
}
+
+ /**
+ * Convert Flink's internal {@link org.apache.flink.table.types.DataType}
to CDC's {@link
+ * DataType}.
+ */
+ public static DataType
fromFlinkDataType(org.apache.flink.table.types.DataType flinkType) {
+ LogicalType logicalType = flinkType.getLogicalType();
+ List<org.apache.flink.table.types.DataType> children =
flinkType.getChildren();
+ DataType dataType;
+ switch (logicalType.getTypeRoot()) {
+ case CHAR:
+ dataType = DataTypes.CHAR(getLength(logicalType));
+ break;
+ case VARCHAR:
+ dataType = DataTypes.VARCHAR(getLength(logicalType));
+ break;
+ case BOOLEAN:
+ dataType = DataTypes.BOOLEAN();
+ break;
+ case BINARY:
+ dataType = DataTypes.BINARY(getLength(logicalType));
+ break;
+ case VARBINARY:
+ dataType = DataTypes.VARBINARY(getLength(logicalType));
+ break;
+ case DECIMAL:
+ dataType = DataTypes.DECIMAL(getPrecision(logicalType),
getScale(logicalType));
+ break;
+ case TINYINT:
+ dataType = DataTypes.TINYINT();
+ break;
+ case SMALLINT:
+ dataType = DataTypes.SMALLINT();
+ break;
+ case INTEGER:
+ dataType = DataTypes.INT();
+ break;
+ case BIGINT:
+ dataType = DataTypes.BIGINT();
+ break;
+ case FLOAT:
+ dataType = DataTypes.FLOAT();
+ break;
+ case DOUBLE:
+ dataType = DataTypes.DOUBLE();
+ break;
+ case DATE:
+ dataType = DataTypes.DATE();
+ break;
+ case TIME_WITHOUT_TIME_ZONE:
+ dataType = DataTypes.TIME(getPrecision(logicalType));
+ break;
+ case TIMESTAMP_WITHOUT_TIME_ZONE:
+ dataType = DataTypes.TIMESTAMP(getPrecision(logicalType));
+ break;
+ case TIMESTAMP_WITH_TIME_ZONE:
+ dataType = DataTypes.TIMESTAMP_TZ(getPrecision(logicalType));
+ break;
+ case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
+ dataType = DataTypes.TIMESTAMP_LTZ(getPrecision(logicalType));
+ break;
+ case ARRAY:
+ Preconditions.checkState(children != null &&
!children.isEmpty());
Review Comment:
I tend to keep the status quo as this class is copy from
https://github.com/apache/flink-cdc/blob/db27ba6ae8ccc0be1e6464835790613d02639ecd/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/types/utils/DataTypeUtils.java#L40
and add some changes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]