This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new c1e1aa48d [FLINK-34648][cdc] Avoid RPC timeout when applying 
SchemaChangeEvent to downstream external systems
c1e1aa48d is described below

commit c1e1aa48d93933c11ce9629d6f98041338f4d7a0
Author: Kunni <lvyanquan....@alibaba-inc.com>
AuthorDate: Wed Apr 24 15:57:03 2024 +0800

    [FLINK-34648][cdc] Avoid RPC timeout when applying SchemaChangeEvent to 
downstream external systems
    
    This closes  #3128.
---
 .../parser/YamlPipelineDefinitionParserTest.java   | 25 +++++++
 .../definitions/pipeline-definition-full.yaml      |  1 +
 .../flink/cdc/common/pipeline/PipelineOptions.java | 11 +++
 .../cdc/composer/flink/FlinkPipelineComposer.java  |  5 +-
 .../flink/translator/SchemaOperatorTranslator.java | 10 ++-
 .../runtime/operators/schema/SchemaOperator.java   | 48 ++++++++++--
 .../operators/schema/SchemaOperatorFactory.java    |  7 +-
 .../schema/coordinator/SchemaRegistry.java         |  7 ++
 .../coordinator/SchemaRegistryRequestHandler.java  | 86 ++++++++++++++++++----
 .../schema/event/RefreshPendingListsRequest.java}  | 26 ++-----
 .../schema/event/RefreshPendingListsResponse.java} | 25 ++-----
 .../event/SchemaChangeProcessingResponse.java}     | 25 ++-----
 .../schema/event/SchemaChangeResultRequest.java}   | 26 ++-----
 .../operators/schema/SchemaOperatorTest.java       | 46 ++++++++++++
 .../operators/EventOperatorTestHarness.java        | 49 ++++++++++--
 .../schema/CollectingMetadataApplier.java          | 14 ++++
 16 files changed, 308 insertions(+), 103 deletions(-)

diff --git 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
index e29ea332a..863acbdd3 100644
--- 
a/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
+++ 
b/flink-cdc-cli/src/test/java/org/apache/flink/cdc/cli/parser/YamlPipelineDefinitionParserTest.java
@@ -18,6 +18,7 @@
 package org.apache.flink.cdc.cli.parser;
 
 import org.apache.flink.cdc.common.configuration.Configuration;
+import org.apache.flink.cdc.common.pipeline.PipelineOptions;
 import org.apache.flink.cdc.composer.definition.PipelineDef;
 import org.apache.flink.cdc.composer.definition.RouteDef;
 import org.apache.flink.cdc.composer.definition.SinkDef;
@@ -31,6 +32,7 @@ import org.junit.jupiter.api.Test;
 
 import java.net.URL;
 import java.nio.file.Paths;
+import java.time.Duration;
 import java.util.Arrays;
 import java.util.Collections;
 
@@ -89,6 +91,27 @@ class YamlPipelineDefinitionParserTest {
                 .isNotEqualTo(PIPELINE_LOCAL_TIME_ZONE.defaultValue());
     }
 
+    @Test
+    void testEvaluateDefaultRpcTimeOut() throws Exception {
+        URL resource = 
Resources.getResource("definitions/pipeline-definition-minimized.yaml");
+        YamlPipelineDefinitionParser parser = new 
YamlPipelineDefinitionParser();
+        PipelineDef pipelineDef =
+                parser.parse(
+                        Paths.get(resource.toURI()),
+                        Configuration.fromMap(
+                                ImmutableMap.<String, String>builder()
+                                        .put(
+                                                
PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT
+                                                        .key(),
+                                                "1h")
+                                        .build()));
+        assertThat(
+                        pipelineDef
+                                .getConfig()
+                                
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT))
+                .isEqualTo(Duration.ofSeconds(60 * 60));
+    }
+
     @Test
     void testValidTimeZone() throws Exception {
         URL resource = 
Resources.getResource("definitions/pipeline-definition-minimized.yaml");
@@ -202,6 +225,7 @@ class YamlPipelineDefinitionParserTest {
                                     .put("name", "source-database-sync-pipe")
                                     .put("parallelism", "4")
                                     .put("enable-schema-evolution", "false")
+                                    .put("schema-operator.rpc-timeout", "1 h")
                                     .build()));
 
     private final PipelineDef fullDefWithGlobalConf =
@@ -262,6 +286,7 @@ class YamlPipelineDefinitionParserTest {
                                     .put("name", "source-database-sync-pipe")
                                     .put("parallelism", "4")
                                     .put("enable-schema-evolution", "false")
+                                    .put("schema-operator.rpc-timeout", "1 h")
                                     .put("foo", "bar")
                                     .build()));
 
diff --git 
a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml 
b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml
index 5f0f6f8f9..e06ad904b 100644
--- a/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml
+++ b/flink-cdc-cli/src/test/resources/definitions/pipeline-definition-full.yaml
@@ -56,3 +56,4 @@ pipeline:
   name: source-database-sync-pipe
   parallelism: 4
   enable-schema-evolution: false
+  schema-operator.rpc-timeout: 1 h
diff --git 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
index 9ffabbfe2..ed5f6e9c0 100644
--- 
a/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
+++ 
b/flink-cdc-common/src/main/java/org/apache/flink/cdc/common/pipeline/PipelineOptions.java
@@ -23,12 +23,16 @@ import 
org.apache.flink.cdc.common.configuration.ConfigOptions;
 import org.apache.flink.cdc.common.configuration.description.Description;
 import org.apache.flink.cdc.common.configuration.description.ListElement;
 
+import java.time.Duration;
+
 import static 
org.apache.flink.cdc.common.configuration.description.TextElement.text;
 
 /** Predefined pipeline configuration options. */
 @PublicEvolving
 public class PipelineOptions {
 
+    public static final Duration DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT = 
Duration.ofMinutes(3);
+
     public static final ConfigOption<String> PIPELINE_NAME =
             ConfigOptions.key("name")
                     .stringType()
@@ -85,5 +89,12 @@ public class PipelineOptions {
                     .withDescription(
                             "The unique ID for schema operator. This ID will 
be used for inter-operator communications and must be unique across 
operators.");
 
+    public static final ConfigOption<Duration> 
PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT =
+            ConfigOptions.key("schema-operator.rpc-timeout")
+                    .durationType()
+                    .defaultValue(DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT)
+                    .withDescription(
+                            "The timeout time for SchemaOperator to wait 
downstream SchemaChangeEvent applying finished, the default value is 3 
minutes.");
+
     private PipelineOptions() {}
 }
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
index 30ce1ff5e..73910d59b 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/FlinkPipelineComposer.java
@@ -110,7 +110,10 @@ public class FlinkPipelineComposer implements 
PipelineComposer {
                         pipelineDef
                                 .getConfig()
                                 
.get(PipelineOptions.PIPELINE_SCHEMA_CHANGE_BEHAVIOR),
-                        
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID));
+                        
pipelineDef.getConfig().get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_UID),
+                        pipelineDef
+                                .getConfig()
+                                
.get(PipelineOptions.PIPELINE_SCHEMA_OPERATOR_RPC_TIMEOUT));
         OperatorIDGenerator schemaOperatorIDGenerator =
                 new 
OperatorIDGenerator(schemaOperatorTranslator.getSchemaOperatorUid());
 
diff --git 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
index 9513c0349..a69741c7b 100644
--- 
a/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
+++ 
b/flink-cdc-composer/src/main/java/org/apache/flink/cdc/composer/flink/translator/SchemaOperatorTranslator.java
@@ -31,6 +31,7 @@ import org.apache.flink.cdc.runtime.typeutils.EventTypeInfo;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -40,10 +41,15 @@ public class SchemaOperatorTranslator {
     private final SchemaChangeBehavior schemaChangeBehavior;
     private final String schemaOperatorUid;
 
+    private final Duration rpcTimeOut;
+
     public SchemaOperatorTranslator(
-            SchemaChangeBehavior schemaChangeBehavior, String 
schemaOperatorUid) {
+            SchemaChangeBehavior schemaChangeBehavior,
+            String schemaOperatorUid,
+            Duration rpcTimeOut) {
         this.schemaChangeBehavior = schemaChangeBehavior;
         this.schemaOperatorUid = schemaOperatorUid;
+        this.rpcTimeOut = rpcTimeOut;
     }
 
     public DataStream<Event> translate(
@@ -83,7 +89,7 @@ public class SchemaOperatorTranslator {
                 input.transform(
                         "SchemaOperator",
                         new EventTypeInfo(),
-                        new SchemaOperatorFactory(metadataApplier, 
routingRules));
+                        new SchemaOperatorFactory(metadataApplier, 
routingRules, rpcTimeOut));
         stream.uid(schemaOperatorUid).setParallelism(parallelism);
         return stream;
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
index 3648a1e7a..d31fb026d 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
@@ -35,15 +35,18 @@ import org.apache.flink.cdc.common.types.DataTypeRoot;
 import org.apache.flink.cdc.common.utils.ChangeEventUtils;
 import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
 import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
 import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.ChainingStrategy;
@@ -67,8 +70,11 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
+
 /**
  * The operator will evolve schemas in {@link SchemaRegistry} for incoming 
{@link
  * SchemaChangeEvent}s and block the stream for tables before their schema 
changes finish.
@@ -89,9 +95,18 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
     private transient SchemaEvolutionClient schemaEvolutionClient;
     private transient LoadingCache<TableId, Schema> cachedSchemas;
 
+    private final long rpcTimeOutInMillis;
+
     public SchemaOperator(List<Tuple2<String, TableId>> routingRules) {
         this.routingRules = routingRules;
         this.chainingStrategy = ChainingStrategy.ALWAYS;
+        this.rpcTimeOutInMillis = 
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis();
+    }
+
+    public SchemaOperator(List<Tuple2<String, TableId>> routingRules, Duration 
rpcTimeOut) {
+        this.routingRules = routingRules;
+        this.chainingStrategy = ChainingStrategy.ALWAYS;
+        this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
     }
 
     @Override
@@ -127,11 +142,23 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
                                 });
     }
 
+    @Override
+    public void initializeState(StateInitializationContext context) throws 
Exception {
+        if (context.isRestored()) {
+            // Multiple operators may appear during a restart process,
+            // only clear the pendingSchemaChanges when the first operator 
starts.
+            if (getRuntimeContext().getIndexOfThisSubtask() == 0) {
+                sendRequestToCoordinator(new RefreshPendingListsRequest());
+            }
+        }
+    }
+
     /**
      * This method is guaranteed to not be called concurrently with other 
methods of the operator.
      */
     @Override
-    public void processElement(StreamRecord<Event> streamRecord) {
+    public void processElement(StreamRecord<Event> streamRecord)
+            throws InterruptedException, TimeoutException {
         Event event = streamRecord.getValue();
         // Schema changes
         if (event instanceof SchemaChangeEvent) {
@@ -247,7 +274,8 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         return Optional.empty();
     }
 
-    private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
schemaChangeEvent) {
+    private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
schemaChangeEvent)
+            throws InterruptedException, TimeoutException {
         // The request will need to send a FlushEvent or block until flushing 
finished
         SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
         if (!response.getSchemaChangeEvents().isEmpty()) {
@@ -267,8 +295,18 @@ public class SchemaOperator extends 
AbstractStreamOperator<Event>
         return sendRequestToCoordinator(new SchemaChangeRequest(tableId, 
schemaChangeEvent));
     }
 
-    private ReleaseUpstreamResponse requestReleaseUpstream() {
-        return sendRequestToCoordinator(new ReleaseUpstreamRequest());
+    private void requestReleaseUpstream() throws InterruptedException, 
TimeoutException {
+        CoordinationResponse coordinationResponse =
+                sendRequestToCoordinator(new ReleaseUpstreamRequest());
+        long nextRpcTimeOutMillis = System.currentTimeMillis() + 
rpcTimeOutInMillis;
+        while (coordinationResponse instanceof SchemaChangeProcessingResponse) 
{
+            if (System.currentTimeMillis() < nextRpcTimeOutMillis) {
+                Thread.sleep(1000);
+                coordinationResponse = sendRequestToCoordinator(new 
SchemaChangeResultRequest());
+            } else {
+                throw new TimeoutException("TimeOut when requesting release 
upstream");
+            }
+        }
     }
 
     private <REQUEST extends CoordinationRequest, RESPONSE extends 
CoordinationResponse>
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
index 7ee348e79..ecf500171 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
@@ -29,6 +29,7 @@ import 
org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 
+import java.time.Duration;
 import java.util.List;
 
 /** Factory to create {@link SchemaOperator}. */
@@ -42,8 +43,10 @@ public class SchemaOperatorFactory extends 
SimpleOperatorFactory<Event>
     private final List<Tuple2<String, TableId>> routingRules;
 
     public SchemaOperatorFactory(
-            MetadataApplier metadataApplier, List<Tuple2<String, TableId>> 
routingRules) {
-        super(new SchemaOperator(routingRules));
+            MetadataApplier metadataApplier,
+            List<Tuple2<String, TableId>> routingRules,
+            Duration rpcTimeOut) {
+        super(new SchemaOperator(routingRules, rpcTimeOut));
         this.metadataApplier = metadataApplier;
         this.routingRules = routingRules;
     }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
index 82ae21b86..3361c9454 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistry.java
@@ -25,8 +25,10 @@ import 
org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
 import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
 import org.apache.flink.cdc.runtime.operators.schema.event.GetSchemaRequest;
 import org.apache.flink.cdc.runtime.operators.schema.event.GetSchemaResponse;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
 import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 import 
org.apache.flink.runtime.operators.coordination.CoordinationRequestHandler;
@@ -122,6 +124,7 @@ public class SchemaRegistry implements OperatorCoordinator, 
CoordinationRequestH
     @Override
     public void close() throws Exception {
         LOG.info("SchemaRegistry for {} closed.", operatorName);
+        requestHandler.close();
     }
 
     @Override
@@ -175,6 +178,10 @@ public class SchemaRegistry implements 
OperatorCoordinator, CoordinationRequestH
         } else if (request instanceof GetSchemaRequest) {
             return CompletableFuture.completedFuture(
                     wrap(handleGetSchemaRequest(((GetSchemaRequest) 
request))));
+        } else if (request instanceof SchemaChangeResultRequest) {
+            return requestHandler.getSchemaChangeResult();
+        } else if (request instanceof RefreshPendingListsRequest) {
+            return requestHandler.refreshPendingLists();
         } else {
             throw new IllegalArgumentException("Unrecognized 
CoordinationRequest type: " + request);
         }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
index f9dfdb781..1ee06a7e5 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/coordinator/SchemaRegistryRequestHandler.java
@@ -22,8 +22,10 @@ import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.RefreshPendingListsResponse;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.ReleaseUpstreamResponse;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
 import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
 import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
@@ -33,12 +35,16 @@ import org.slf4j.LoggerFactory;
 
 import javax.annotation.concurrent.NotThreadSafe;
 
+import java.io.Closeable;
+import java.io.IOException;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
 import static 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.RequestStatus.RECEIVED_RELEASE_REQUEST;
 import static 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils.wrap;
@@ -46,7 +52,7 @@ import static 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationRe
 /** A handler to deal with all requests and events for {@link SchemaRegistry}. 
*/
 @Internal
 @NotThreadSafe
-public class SchemaRegistryRequestHandler {
+public class SchemaRegistryRequestHandler implements Closeable {
     private static final Logger LOG = 
LoggerFactory.getLogger(SchemaRegistryRequestHandler.class);
 
     /** The {@link MetadataApplier} for every table. */
@@ -66,6 +72,13 @@ public class SchemaRegistryRequestHandler {
     /** Sink writers which have sent flush success events for the request. */
     private final Set<Integer> flushedSinkWriters;
 
+    /** Status of the execution of current schema change request. */
+    private boolean isSchemaChangeApplying;
+    /** Actual exception if failed to apply schema change. */
+    private Exception schemaChangeException;
+    /** Executor service to execute schema change. */
+    private final ExecutorService schemaChangeThreadPool;
+
     public SchemaRegistryRequestHandler(
             MetadataApplier metadataApplier,
             SchemaManager schemaManager,
@@ -76,17 +89,34 @@ public class SchemaRegistryRequestHandler {
         this.pendingSchemaChanges = new LinkedList<>();
         this.schemaManager = schemaManager;
         this.schemaDerivation = schemaDerivation;
+        schemaChangeThreadPool = Executors.newSingleThreadExecutor();
+        isSchemaChangeApplying = false;
     }
 
     /**
      * Apply the schema change to the external system.
      *
      * @param tableId the table need to change schema
-     * @param changeEvent the schema change
+     * @param derivedSchemaChangeEvents list of the schema changes
      */
-    private void applySchemaChange(TableId tableId, SchemaChangeEvent 
changeEvent) {
-        LOG.debug("Apply schema change {} to table {}.", changeEvent, tableId);
-        metadataApplier.applySchemaChange(changeEvent);
+    private void applySchemaChange(
+            TableId tableId, List<SchemaChangeEvent> 
derivedSchemaChangeEvents) {
+        isSchemaChangeApplying = true;
+        schemaChangeException = null;
+        try {
+            for (SchemaChangeEvent changeEvent : derivedSchemaChangeEvents) {
+                metadataApplier.applySchemaChange(changeEvent);
+                LOG.debug("Apply schema change {} to table {}.", changeEvent, 
tableId);
+            }
+            PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
+            if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) 
{
+                startNextSchemaChangeRequest();
+            }
+        } catch (Exception e) {
+            this.schemaChangeException = e;
+        } finally {
+            this.isSchemaChangeApplying = false;
+        }
     }
 
     /**
@@ -131,7 +161,7 @@ public class SchemaRegistryRequestHandler {
     public CompletableFuture<CoordinationResponse> 
handleReleaseUpstreamRequest() {
         CompletableFuture<CoordinationResponse> response =
                 pendingSchemaChanges.get(0).getResponseFuture();
-        if (response.isDone()) {
+        if (response.isDone() && !isSchemaChangeApplying) {
             startNextSchemaChangeRequest();
         } else {
             pendingSchemaChanges.get(0).receiveReleaseRequest();
@@ -155,19 +185,25 @@ public class SchemaRegistryRequestHandler {
      * @param tableId the subtask in SchemaOperator and table that the 
FlushEvent is about
      * @param sinkSubtask the sink subtask succeed flushing
      */
-    public void flushSuccess(TableId tableId, int sinkSubtask) {
+    public void flushSuccess(TableId tableId, int sinkSubtask) throws 
InterruptedException {
         flushedSinkWriters.add(sinkSubtask);
         if (flushedSinkWriters.equals(activeSinkWriters)) {
             LOG.info(
                     "All sink subtask have flushed for table {}. Start to 
apply schema change.",
                     tableId.toString());
             PendingSchemaChange waitFlushSuccess = pendingSchemaChanges.get(0);
-            waitFlushSuccess.derivedSchemaChangeEvents.forEach(
-                    schemaChangeEvent -> applySchemaChange(tableId, 
schemaChangeEvent));
-            waitFlushSuccess.getResponseFuture().complete(wrap(new 
ReleaseUpstreamResponse()));
-
-            if (RECEIVED_RELEASE_REQUEST.equals(waitFlushSuccess.getStatus())) 
{
-                startNextSchemaChangeRequest();
+            schemaChangeThreadPool.submit(
+                    () -> applySchemaChange(tableId, 
waitFlushSuccess.derivedSchemaChangeEvents));
+            Thread.sleep(1000);
+            if (schemaChangeException != null) {
+                throw new RuntimeException("failed to apply schema change.", 
schemaChangeException);
+            }
+            if (isSchemaChangeApplying) {
+                waitFlushSuccess
+                        .getResponseFuture()
+                        .complete(wrap(new SchemaChangeProcessingResponse()));
+            } else {
+                waitFlushSuccess.getResponseFuture().complete(wrap(new 
ReleaseUpstreamResponse()));
             }
         }
     }
@@ -200,6 +236,30 @@ public class SchemaRegistryRequestHandler {
         }
     }
 
+    public CompletableFuture<CoordinationResponse> refreshPendingLists() {
+        pendingSchemaChanges.clear();
+        flushedSinkWriters.clear();
+        return CompletableFuture.completedFuture(wrap(new 
RefreshPendingListsResponse()));
+    }
+
+    public CompletableFuture<CoordinationResponse> getSchemaChangeResult() {
+        if (schemaChangeException != null) {
+            throw new RuntimeException("failed to apply schema change.", 
schemaChangeException);
+        }
+        if (isSchemaChangeApplying) {
+            return CompletableFuture.supplyAsync(() -> wrap(new 
SchemaChangeProcessingResponse()));
+        } else {
+            return CompletableFuture.supplyAsync(() -> wrap(new 
ReleaseUpstreamResponse()));
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (schemaChangeThreadPool != null) {
+            schemaChangeThreadPool.shutdown();
+        }
+    }
+
     private static class PendingSchemaChange {
         private final SchemaChangeRequest changeRequest;
         private List<SchemaChangeEvent> derivedSchemaChangeEvents;
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java
similarity index 52%
copy from 
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
copy to 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java
index 7b403e130..a0496c935 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsRequest.java
@@ -15,27 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.testutils.schema;
+package org.apache.flink.cdc.runtime.operators.schema.event;
 
-import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.sink.MetadataApplier;
+import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 
-import java.util.ArrayList;
-import java.util.List;
+/** Request to refresh the pendingSchemaChanges of {@link 
SchemaRegistryRequestHandler}. */
+public class RefreshPendingListsRequest implements CoordinationRequest {
 
-/**
- * A {@link MetadataApplier} for testing that holds all schema change events 
in a list for further
- * examination.
- */
-public class CollectingMetadataApplier implements MetadataApplier {
-    private final List<SchemaChangeEvent> schemaChangeEvents = new 
ArrayList<>();
-
-    @Override
-    public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
-        schemaChangeEvents.add(schemaChangeEvent);
-    }
-
-    public List<SchemaChangeEvent> getSchemaChangeEvents() {
-        return schemaChangeEvents;
-    }
+    private static final long serialVersionUID = 1L;
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java
similarity index 52%
copy from 
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
copy to 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java
index 7b403e130..ff0221deb 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/RefreshPendingListsResponse.java
@@ -15,27 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.testutils.schema;
+package org.apache.flink.cdc.runtime.operators.schema.event;
 
-import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 
-import java.util.ArrayList;
-import java.util.List;
+/** Response to refresh the pendingSchemaChanges of {@link 
RefreshPendingListsRequest}. */
+public class RefreshPendingListsResponse implements CoordinationResponse {
 
-/**
- * A {@link MetadataApplier} for testing that holds all schema change events 
in a list for further
- * examination.
- */
-public class CollectingMetadataApplier implements MetadataApplier {
-    private final List<SchemaChangeEvent> schemaChangeEvents = new 
ArrayList<>();
-
-    @Override
-    public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
-        schemaChangeEvents.add(schemaChangeEvent);
-    }
-
-    public List<SchemaChangeEvent> getSchemaChangeEvents() {
-        return schemaChangeEvents;
-    }
+    private static final long serialVersionUID = 1L;
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java
similarity index 56%
copy from 
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
copy to 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java
index 7b403e130..b679ab9e5 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeProcessingResponse.java
@@ -15,27 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.testutils.schema;
+package org.apache.flink.cdc.runtime.operators.schema.event;
 
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.sink.MetadataApplier;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
+import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 
 /**
- * A {@link MetadataApplier} for testing that holds all schema change events 
in a list for further
- * examination.
+ * The response for {@link SchemaChangeResultRequest} or {@link 
ReleaseUpstreamRequest} from {@link
+ * SchemaRegistry} to {@link SchemaOperator} if not apply {@link 
SchemaChangeEvent} in time.
  */
-public class CollectingMetadataApplier implements MetadataApplier {
-    private final List<SchemaChangeEvent> schemaChangeEvents = new 
ArrayList<>();
-
-    @Override
-    public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
-        schemaChangeEvents.add(schemaChangeEvent);
-    }
+public class SchemaChangeProcessingResponse implements CoordinationResponse {
 
-    public List<SchemaChangeEvent> getSchemaChangeEvents() {
-        return schemaChangeEvents;
-    }
+    private static final long serialVersionUID = 1L;
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java
similarity index 52%
copy from 
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
copy to 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java
index 7b403e130..e53762c19 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResultRequest.java
@@ -15,27 +15,17 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.testutils.schema;
+package org.apache.flink.cdc.runtime.operators.schema.event;
 
-import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.sink.MetadataApplier;
-
-import java.util.ArrayList;
-import java.util.List;
+import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
+import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 
 /**
- * A {@link MetadataApplier} for testing that holds all schema change events 
in a list for further
- * examination.
+ * request from {@link SchemaOperator} to {@link SchemaRegistry} for getting 
result of applying
+ * schema change.
  */
-public class CollectingMetadataApplier implements MetadataApplier {
-    private final List<SchemaChangeEvent> schemaChangeEvents = new 
ArrayList<>();
-
-    @Override
-    public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
-        schemaChangeEvents.add(schemaChangeEvent);
-    }
+public class SchemaChangeResultRequest implements CoordinationRequest {
 
-    public List<SchemaChangeEvent> getSchemaChangeEvents() {
-        return schemaChangeEvents;
-    }
+    private static final long serialVersionUID = 1L;
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java
index 24ee75f48..3802d14ec 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java
@@ -18,31 +18,48 @@
 package org.apache.flink.cdc.runtime.operators.schema;
 
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
+import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.DataChangeEvent;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
+import 
org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Unit tests for the {@link SchemaOperator}. */
 public class SchemaOperatorTest {
+
+    private static final TableId CUSTOMERS =
+            TableId.tableId("my_company", "my_branch", "customers");
+    private static final Schema CUSTOMERS_SCHEMA =
+            Schema.newBuilder()
+                    .physicalColumn("id", DataTypes.INT())
+                    .physicalColumn("name", DataTypes.STRING())
+                    .physicalColumn("phone", DataTypes.BIGINT())
+                    .primaryKey("id")
+                    .build();
+
     @Test
     void testProcessElement() throws Exception {
         final int maxParallelism = 4;
@@ -93,6 +110,35 @@ public class SchemaOperatorTest {
         }
     }
 
+    @Test
+    void testProcessSchemaChangeEventWithTimeOut() throws Exception {
+        SchemaOperator schemaOperator =
+                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(1));
+        EventOperatorTestHarness<SchemaOperator, Event> harness =
+                new EventOperatorTestHarness<>(schemaOperator, 1, 
Duration.ofSeconds(3));
+        harness.open();
+        Assertions.assertThrowsExactly(
+                TimeoutException.class,
+                () ->
+                        schemaOperator.processElement(
+                                new StreamRecord<>(
+                                        new CreateTableEvent(CUSTOMERS, 
CUSTOMERS_SCHEMA))));
+    }
+
+    @Test
+    void testProcessSchemaChangeEventWithOutTimeOut() throws Exception {
+        SchemaOperator schemaOperator =
+                new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30));
+        EventOperatorTestHarness<SchemaOperator, Event> harness =
+                new EventOperatorTestHarness<>(schemaOperator, 1, 
Duration.ofSeconds(3));
+        harness.open();
+        Assertions.assertDoesNotThrow(
+                () ->
+                        schemaOperator.processElement(
+                                new StreamRecord<>(
+                                        new CreateTableEvent(CUSTOMERS, 
CUSTOMERS_SCHEMA))));
+    }
+
     private OneInputStreamOperatorTestHarness<Event, Event> createTestHarness(
             int maxParallelism, int parallelism, int subtaskIndex, OperatorID 
opID)
             throws Exception {
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
index 240eccada..b96a17b56 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/EventOperatorTestHarness.java
@@ -19,10 +19,13 @@ package org.apache.flink.cdc.runtime.testutils.operators;
 
 import org.apache.flink.cdc.common.event.CreateTableEvent;
 import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
 import org.apache.flink.cdc.common.event.TableId;
 import org.apache.flink.cdc.common.schema.Schema;
 import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+import org.apache.flink.cdc.runtime.operators.schema.event.FlushSuccessEvent;
 import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
+import 
org.apache.flink.cdc.runtime.operators.schema.event.SinkWriterRegisterEvent;
 import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
 import org.apache.flink.cdc.runtime.testutils.schema.CollectingMetadataApplier;
 import 
org.apache.flink.cdc.runtime.testutils.schema.TestingSchemaRegistryGateway;
@@ -40,8 +43,11 @@ import org.apache.flink.streaming.runtime.tasks.StreamTask;
 import org.apache.flink.streaming.runtime.watermarkstatus.WatermarkStatus;
 import org.apache.flink.streaming.util.MockStreamConfig;
 import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.SerializedValue;
 
-import java.util.Collections;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
 import java.util.LinkedList;
 
 /**
@@ -59,6 +65,8 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
         implements AutoCloseable {
     public static final OperatorID SCHEMA_OPERATOR_ID = new OperatorID(15213L, 
15513L);
 
+    public static final OperatorID SINK_OPERATOR_ID = new OperatorID(15214L, 
15514L);
+
     private final OP operator;
     private final int numOutputs;
     private final SchemaRegistry schemaRegistry;
@@ -73,8 +81,21 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
                         "SchemaOperator",
                         new MockOperatorCoordinatorContext(
                                 SCHEMA_OPERATOR_ID, 
Thread.currentThread().getContextClassLoader()),
-                        new CollectingMetadataApplier(),
-                        Collections.emptyList());
+                        new CollectingMetadataApplier(null),
+                        new ArrayList<>());
+        schemaRegistryGateway = new 
TestingSchemaRegistryGateway(schemaRegistry);
+    }
+
+    public EventOperatorTestHarness(OP operator, int numOutputs, Duration 
duration) {
+        this.operator = operator;
+        this.numOutputs = numOutputs;
+        schemaRegistry =
+                new SchemaRegistry(
+                        "SchemaOperator",
+                        new MockOperatorCoordinatorContext(
+                                SCHEMA_OPERATOR_ID, 
Thread.currentThread().getContextClassLoader()),
+                        new CollectingMetadataApplier(duration),
+                        new ArrayList<>());
         schemaRegistryGateway = new 
TestingSchemaRegistryGateway(schemaRegistry);
     }
 
@@ -107,7 +128,9 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
         operator.setup(
                 new MockStreamTask(schemaRegistryGateway),
                 new MockStreamConfig(new Configuration(), numOutputs),
-                new EventCollectingOutput<>(outputRecords));
+                new EventCollectingOutput<>(outputRecords, 
schemaRegistryGateway));
+        schemaRegistryGateway.sendOperatorEventToCoordinator(
+                SINK_OPERATOR_ID, new SerializedValue<>(new 
SinkWriterRegisterEvent(0)));
     }
 
     // ---------------------------------------- Helper classes 
---------------------------------
@@ -115,13 +138,29 @@ public class EventOperatorTestHarness<OP extends 
AbstractStreamOperator<E>, E ex
     private static class EventCollectingOutput<E extends Event> implements 
Output<StreamRecord<E>> {
         private final LinkedList<StreamRecord<E>> outputRecords;
 
-        public EventCollectingOutput(LinkedList<StreamRecord<E>> 
outputRecords) {
+        private final TestingSchemaRegistryGateway schemaRegistryGateway;
+
+        public EventCollectingOutput(
+                LinkedList<StreamRecord<E>> outputRecords,
+                TestingSchemaRegistryGateway schemaRegistryGateway) {
             this.outputRecords = outputRecords;
+            this.schemaRegistryGateway = schemaRegistryGateway;
         }
 
         @Override
         public void collect(StreamRecord<E> record) {
             outputRecords.add(record);
+            Event event = record.getValue();
+            if (event instanceof FlushEvent) {
+                try {
+                    schemaRegistryGateway.sendOperatorEventToCoordinator(
+                            SINK_OPERATOR_ID,
+                            new SerializedValue<>(
+                                    new FlushSuccessEvent(0, ((FlushEvent) 
event).getTableId())));
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            }
         }
 
         @Override
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
index 7b403e130..1ebb5d449 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/schema/CollectingMetadataApplier.java
@@ -20,6 +20,7 @@ package org.apache.flink.cdc.runtime.testutils.schema;
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -30,9 +31,22 @@ import java.util.List;
 public class CollectingMetadataApplier implements MetadataApplier {
     private final List<SchemaChangeEvent> schemaChangeEvents = new 
ArrayList<>();
 
+    private final Duration duration;
+
+    public CollectingMetadataApplier(Duration duration) {
+        this.duration = duration;
+    }
+
     @Override
     public void applySchemaChange(SchemaChangeEvent schemaChangeEvent) {
         schemaChangeEvents.add(schemaChangeEvent);
+        if (duration != null) {
+            try {
+                Thread.sleep(duration.toMillis());
+            } catch (Exception ignore) {
+
+            }
+        }
     }
 
     public List<SchemaChangeEvent> getSchemaChangeEvents() {


Reply via email to