yuxiqian commented on code in PR #4060:
URL: https://github.com/apache/flink-cdc/pull/4060#discussion_r2206771566
##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java:
##########
@@ -135,11 +141,21 @@ void cleanup() {
System.setOut(standardOut);
}
+ private static Stream<Arguments> parameterProvider() {
+ return Stream.of(
+ Arguments.of(ValuesDataSink.SinkApi.SINK_FUNCTION, true),
+ Arguments.of(ValuesDataSink.SinkApi.SINK_FUNCTION, false),
+ Arguments.of(ValuesDataSink.SinkApi.SINK_V2, true),
+ Arguments.of(ValuesDataSink.SinkApi.SINK_V2, false));
+ }
+
/** This tests if we can append calculated columns based on existing
columns. */
- @ParameterizedTest
- @EnumSource
- void testCalculatedColumns(ValuesDataSink.SinkApi sinkApi) throws
Exception {
+ @ParameterizedTest(name = "API version: {0}, initializeMode: {1}")
+ @MethodSource(value = "parameterProvider")
+ void testCalculatedColumns(ValuesDataSink.SinkApi sinkApi, boolean
initializeMode)
Review Comment:
What is `initializeMode`? Seems it doesn't affect test cases at all.
##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java:
##########
@@ -85,6 +93,37 @@
*/
@PublicEvolving
public class SchemaMergingUtils {
+
Review Comment:
We may handle these changes in another PR and focus on supporting `DATE` and
`TIME` formats here
##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java:
##########
@@ -48,6 +48,7 @@ public class DebeziumSchemaDataTypeInference implements
SchemaDataTypeInference,
private static final long serialVersionUID = 1L;
+ public static final String DEBEZIUM_DATE_SCHEMA_NAME =
"io.debezium.time.Date";
Review Comment:
Seems redundant as it's an alias of `io.debezium.time.Date.SCHEMA_NAME`?
##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java:
##########
@@ -141,6 +143,7 @@ public void
testMySql8JsonDataTypesWithUseLegacyJsonFormat() throws Throwable {
@Test
void testMysql57TimeDataTypes() throws Throwable {
+ UniqueDatabase usedDd = fullTypesMySql57Database;
Review Comment:
Please double check if these changes are necessary, and revert these to keep
this PR minimum.
##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java:
##########
@@ -98,9 +105,21 @@ static void write(
writer.writeShort(pos, (short) o);
break;
case INTEGER:
+ writer.writeInt(pos, (int) o);
+ break;
case DATE:
+ if (o instanceof DateData) {
+ writer.writeDate(pos, (DateData) o);
+ } else {
+ writer.writeInt(pos, (int) o);
+ }
+ break;
Review Comment:
We may add some comments here, clarifying that the `writeInt` paths are kept
for compatibility.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]