loserwang1024 commented on code in PR #2571:
URL: https://github.com/apache/flink-cdc/pull/2571#discussion_r1828650330
##########
flink-connector-postgres-cdc/src/main/java/io/debezium/connector/postgresql/PostgresStreamingChangeEventSource.java:
##########
@@ -113,12 +112,17 @@ public PostgresStreamingChangeEventSource(
@Override
public void init() {
+ // It's not necessary to refresh schema again, which is very
time-consuming.
+ // The schema of taskContext is the reference of
PostgresSourceFetchTaskContext#schema, and
+ // has been initialized when submit StreamSplit fetch task by
+ // IncrementalSourceStreamFetcher#submitTask ->
PostgresSourceFetchTaskContext#configure.
+
// refresh the schema so we have a latest view of the DB tables
- try {
- taskContext.refreshSchema(connection, true);
Review Comment:
It's copied form debezium. Maybe we can provide a
subclass(StreamSplitReadTask) to override this method?
##########
flink-connector-postgres-cdc/src/main/java/com/ververica/cdc/connectors/postgres/source/PostgresDialect.java:
##########
@@ -214,6 +218,21 @@ public JdbcSourceFetchTaskContext createFetchTaskContext(
return new PostgresSourceFetchTaskContext(taskSourceConfig, this);
}
+ @Override
+ public FetchTask.Context createFetchTaskContext(
+ SourceSplitBase sourceSplitBase, JdbcSourceConfig
taskSourceConfig, boolean reuse) {
+ if (!reuse) {
+ return createFetchTaskContext(sourceSplitBase, taskSourceConfig);
+ }
+ if (this.taskSourceConfig == null
+ || this.fetchTaskContext == null
+ || !this.taskSourceConfig.equals(taskSourceConfig)) {
+ this.taskSourceConfig = taskSourceConfig;
+ this.fetchTaskContext = createFetchTaskContext(sourceSplitBase,
taskSourceConfig);
Review Comment:
It seem that fetchTaskContext will be shared, what happens if the first one
invoke FetchTask.Context#close to close the connection and then another task
use this fetchTaskContext?
##########
flink-cdc-base/src/main/java/com/ververica/cdc/connectors/base/dialect/DataSourceDialect.java:
##########
@@ -71,6 +71,12 @@ public interface DataSourceDialect<C extends SourceConfig>
/** The task context used for fetch task to fetch data from external
systems. */
FetchTask.Context createFetchTaskContext(SourceSplitBase sourceSplitBase,
C sourceConfig);
+ /** Try to reuse Context if reuse is true. */
Review Comment:
I wonder which situation reuse is true , and which situation resuse is
false? If always reuse, no need the param resuse?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]