yuxiqian commented on code in PR #4060:
URL: https://github.com/apache/flink-cdc/pull/4060#discussion_r2206771566


##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineTransformITCase.java:
##########
@@ -135,11 +141,21 @@ void cleanup() {
         System.setOut(standardOut);
     }
 
+    private static Stream<Arguments> parameterProvider() {
+        return Stream.of(
+                Arguments.of(ValuesDataSink.SinkApi.SINK_FUNCTION, true),
+                Arguments.of(ValuesDataSink.SinkApi.SINK_FUNCTION, false),
+                Arguments.of(ValuesDataSink.SinkApi.SINK_V2, true),
+                Arguments.of(ValuesDataSink.SinkApi.SINK_V2, false));
+    }
+
     /** This tests if we can append calculated columns based on existing 
columns. */
-    @ParameterizedTest
-    @EnumSource
-    void testCalculatedColumns(ValuesDataSink.SinkApi sinkApi) throws 
Exception {
+    @ParameterizedTest(name = "API version: {0}, initializeMode: {1}")
+    @MethodSource(value = "parameterProvider")
+    void testCalculatedColumns(ValuesDataSink.SinkApi sinkApi, boolean 
initializeMode)

Review Comment:
   What is `initializeMode`? Seems it doesn't affect test cases at all.



##########
flink-cdc-common/src/main/java/org/apache/flink/cdc/common/utils/SchemaMergingUtils.java:
##########
@@ -85,6 +93,37 @@
  */
 @PublicEvolving
 public class SchemaMergingUtils {
+

Review Comment:
   We may handle these changes in another PR and focus on supporting `DATE` and 
`TIME` formats here



##########
flink-cdc-connect/flink-cdc-source-connectors/flink-connector-debezium/src/main/java/org/apache/flink/cdc/debezium/event/DebeziumSchemaDataTypeInference.java:
##########
@@ -48,6 +48,7 @@ public class DebeziumSchemaDataTypeInference implements 
SchemaDataTypeInference,
 
     private static final long serialVersionUID = 1L;
 
+    public static final String DEBEZIUM_DATE_SCHEMA_NAME = 
"io.debezium.time.Date";

Review Comment:
   Seems redundant as it's an alias of `io.debezium.time.Date.SCHEMA_NAME`?



##########
flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/MySqlFullTypesITCase.java:
##########
@@ -141,6 +143,7 @@ public void 
testMySql8JsonDataTypesWithUseLegacyJsonFormat() throws Throwable {
 
     @Test
     void testMysql57TimeDataTypes() throws Throwable {
+        UniqueDatabase usedDd = fullTypesMySql57Database;

Review Comment:
   Please double check if these changes are necessary, and revert these to keep 
this PR minimum.



##########
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/serializer/data/writer/BinaryWriter.java:
##########
@@ -98,9 +105,21 @@ static void write(
                 writer.writeShort(pos, (short) o);
                 break;
             case INTEGER:
+                writer.writeInt(pos, (int) o);
+                break;
             case DATE:
+                if (o instanceof DateData) {
+                    writer.writeDate(pos, (DateData) o);
+                } else {
+                    writer.writeInt(pos, (int) o);
+                }
+                break;

Review Comment:
   We may add some comments here, clarifying that the `writeInt` paths are kept 
for compatibility.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to