yuxiqian commented on code in PR #3622:
URL: https://github.com/apache/flink-cdc/pull/3622#discussion_r1815882769
##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java:
##########
@@ -169,6 +171,66 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi
sinkApi) throws Exception
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2,
null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}");
}
+ @ParameterizedTest
+ @EnumSource
+ void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi
sinkApi)
+ throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE);
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check result in ValuesDatabase
+ List<String> results = ValuesDatabase.getResults(TABLE_1);
+ assertThat(results)
+ .containsExactly(
Review Comment:
`ValuesDatabase` uses `HashMap` to store records internally, and does not
guarantee orderness IIUC. `containsExactlyInAnyOrder` might be more appropriate.
##########
flink-cdc-composer/src/test/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposerLenientITCase.java:
##########
@@ -169,6 +171,66 @@ void testSingleSplitSingleTable(ValuesDataSink.SinkApi
sinkApi) throws Exception
"DataChangeEvent{tableId=default_namespace.default_schema.table1, before=[2,
null, null, null, ], after=[2, null, null, null, x], op=UPDATE, meta=()}");
}
+ @ParameterizedTest
+ @EnumSource
+ void testSingleSplitSingleTableWithDefaultValue(ValuesDataSink.SinkApi
sinkApi)
+ throws Exception {
+ FlinkPipelineComposer composer = FlinkPipelineComposer.ofMiniCluster();
+
+ // Setup value source
+ Configuration sourceConfig = new Configuration();
+ sourceConfig.set(
+ ValuesDataSourceOptions.EVENT_SET_ID,
+
ValuesDataSourceHelper.EventSetId.SINGLE_SPLIT_SINGLE_TABLE_WITH_DEFAULT_VALUE);
+ SourceDef sourceDef =
+ new SourceDef(ValuesDataFactory.IDENTIFIER, "Value Source",
sourceConfig);
+
+ // Setup value sink
+ Configuration sinkConfig = new Configuration();
+ sinkConfig.set(ValuesDataSinkOptions.MATERIALIZED_IN_MEMORY, true);
+ sinkConfig.set(ValuesDataSinkOptions.SINK_API, sinkApi);
+ SinkDef sinkDef = new SinkDef(ValuesDataFactory.IDENTIFIER, "Value
Sink", sinkConfig);
+
+ // Setup pipeline
+ Configuration pipelineConfig = new Configuration();
+ pipelineConfig.set(PipelineOptions.PIPELINE_PARALLELISM, 1);
+
+ PipelineDef pipelineDef =
+ new PipelineDef(
+ sourceDef,
+ sinkDef,
+ Collections.emptyList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ pipelineConfig);
+
+ // Execute the pipeline
+ PipelineExecution execution = composer.compose(pipelineDef);
+ execution.execute();
+
+ // Check result in ValuesDatabase
+ List<String> results = ValuesDatabase.getResults(TABLE_1);
+ assertThat(results)
+ .containsExactly(
Review Comment:
`ValuesDatabase` uses `HashMap` to store records internally, and does not
guarantee orderliness IIUC. `containsExactlyInAnyOrder` might be more
appropriate.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]