This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 84244672399e21585217589e205d881e05562cf8
Author: yuxiqian <34335406+yuxiq...@users.noreply.github.com>
AuthorDate: Mon Dec 23 18:44:13 2024 +0800

    [FLINK-36690][cdc-runtime] Fix schema operator hanging under extreme 
parallelized pressure
    This closes #3680
---
 .../runtime/operators/schema/SchemaOperator.java   | 725 ---------------------
 .../schema/regular/SchemaCoordinator.java          | 499 ++++++++++++++
 .../SchemaCoordinatorProvider.java}                |  53 +-
 .../operators/schema/regular/SchemaOperator.java   | 296 +++++++++
 .../{ => regular}/SchemaOperatorFactory.java       |  18 +-
 .../{ => regular}/event/SchemaChangeRequest.java   |  23 +-
 .../{ => regular}/event/SchemaChangeResponse.java  |  72 +-
 .../schema/{ => regular}/SchemaEvolveTest.java     | 214 +++---
 .../schema/{ => regular}/SchemaOperatorTest.java   |  60 +-
 9 files changed, 1063 insertions(+), 897 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
deleted file mode 100644
index 6ada032d5..000000000
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperator.java
+++ /dev/null
@@ -1,725 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.cdc.runtime.operators.schema;
-
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.annotation.VisibleForTesting;
-import org.apache.flink.cdc.common.data.DecimalData;
-import org.apache.flink.cdc.common.data.LocalZonedTimestampData;
-import org.apache.flink.cdc.common.data.RecordData;
-import org.apache.flink.cdc.common.data.StringData;
-import org.apache.flink.cdc.common.data.TimestampData;
-import org.apache.flink.cdc.common.data.ZonedTimestampData;
-import org.apache.flink.cdc.common.event.DataChangeEvent;
-import org.apache.flink.cdc.common.event.DropTableEvent;
-import org.apache.flink.cdc.common.event.Event;
-import org.apache.flink.cdc.common.event.FlushEvent;
-import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.common.event.SchemaChangeEventType;
-import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
-import org.apache.flink.cdc.common.route.RouteRule;
-import org.apache.flink.cdc.common.schema.Column;
-import org.apache.flink.cdc.common.schema.Schema;
-import org.apache.flink.cdc.common.schema.Selectors;
-import org.apache.flink.cdc.common.types.DataType;
-import org.apache.flink.cdc.common.types.DataTypeFamily;
-import org.apache.flink.cdc.common.types.DataTypeRoot;
-import org.apache.flink.cdc.common.types.DecimalType;
-import org.apache.flink.cdc.common.utils.ChangeEventUtils;
-import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.CoordinationResponseUtils;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeProcessingResponse;
-import org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeRequest;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResponse;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultRequest;
-import 
org.apache.flink.cdc.runtime.operators.schema.event.SchemaChangeResultResponse;
-import 
org.apache.flink.cdc.runtime.operators.schema.metrics.SchemaOperatorMetrics;
-import org.apache.flink.cdc.runtime.operators.sink.SchemaEvolutionClient;
-import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
-import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
-import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
-import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
-import org.apache.flink.runtime.state.StateSnapshotContext;
-import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
-import org.apache.flink.streaming.api.operators.ChainingStrategy;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
-import org.apache.flink.streaming.api.operators.Output;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTask;
-import org.apache.flink.util.SerializedValue;
-
-import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder;
-import org.apache.flink.shaded.guava31.com.google.common.cache.CacheLoader;
-import org.apache.flink.shaded.guava31.com.google.common.cache.LoadingCache;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.annotation.Nullable;
-
-import java.io.Serializable;
-import java.math.BigDecimal;
-import java.time.Duration;
-import java.time.LocalDateTime;
-import java.time.ZoneId;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
-
-/**
- * The operator will evolve schemas in {@link SchemaRegistry} for incoming 
{@link
- * SchemaChangeEvent}s and block the stream for tables before their schema 
changes finish.
- */
-@Internal
-public class SchemaOperator extends AbstractStreamOperator<Event>
-        implements OneInputStreamOperator<Event, Event>, Serializable {
-
-    private static final long serialVersionUID = 1L;
-
-    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaOperator.class);
-    private static final Duration CACHE_EXPIRE_DURATION = Duration.ofDays(1);
-
-    private final List<RouteRule> routingRules;
-
-    private final String timezone;
-
-    /**
-     * Storing route source table selector, sink table name (before symbol 
replacement), and replace
-     * symbol in a tuple.
-     */
-    private transient List<Tuple3<Selectors, String, String>> routes;
-
-    private transient TaskOperatorEventGateway toCoordinator;
-    private transient SchemaEvolutionClient schemaEvolutionClient;
-    private transient LoadingCache<TableId, Schema> originalSchema;
-    private transient LoadingCache<TableId, Schema> evolvedSchema;
-    private transient LoadingCache<TableId, Boolean> schemaDivergesMap;
-
-    /**
-     * Storing mapping relations between upstream tableId (source table) 
mapping to downstream
-     * tableIds (sink tables).
-     */
-    private transient LoadingCache<TableId, List<TableId>> tableIdMappingCache;
-
-    private final long rpcTimeOutInMillis;
-    private final SchemaChangeBehavior schemaChangeBehavior;
-
-    private transient SchemaOperatorMetrics schemaOperatorMetrics;
-    private transient int subTaskId;
-
-    @VisibleForTesting
-    public SchemaOperator(List<RouteRule> routingRules) {
-        this.routingRules = routingRules;
-        this.chainingStrategy = ChainingStrategy.ALWAYS;
-        this.rpcTimeOutInMillis = 
DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT.toMillis();
-        this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE;
-        this.timezone = "UTC";
-    }
-
-    @VisibleForTesting
-    public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut) {
-        this.routingRules = routingRules;
-        this.chainingStrategy = ChainingStrategy.ALWAYS;
-        this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
-        this.schemaChangeBehavior = SchemaChangeBehavior.EVOLVE;
-        this.timezone = "UTC";
-    }
-
-    @VisibleForTesting
-    public SchemaOperator(
-            List<RouteRule> routingRules,
-            Duration rpcTimeOut,
-            SchemaChangeBehavior schemaChangeBehavior) {
-        this.routingRules = routingRules;
-        this.chainingStrategy = ChainingStrategy.ALWAYS;
-        this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
-        this.schemaChangeBehavior = schemaChangeBehavior;
-        this.timezone = "UTC";
-    }
-
-    public SchemaOperator(
-            List<RouteRule> routingRules,
-            Duration rpcTimeOut,
-            SchemaChangeBehavior schemaChangeBehavior,
-            String timezone) {
-        this.routingRules = routingRules;
-        this.chainingStrategy = ChainingStrategy.ALWAYS;
-        this.rpcTimeOutInMillis = rpcTimeOut.toMillis();
-        this.schemaChangeBehavior = schemaChangeBehavior;
-        this.timezone = timezone;
-    }
-
-    @Override
-    public void open() throws Exception {
-        super.open();
-        schemaOperatorMetrics =
-                new SchemaOperatorMetrics(
-                        getRuntimeContext().getMetricGroup(), 
schemaChangeBehavior);
-        subTaskId = getRuntimeContext().getIndexOfThisSubtask();
-    }
-
-    @Override
-    public void setup(
-            StreamTask<?, ?> containingTask,
-            StreamConfig config,
-            Output<StreamRecord<Event>> output) {
-        super.setup(containingTask, config, output);
-        this.toCoordinator = 
containingTask.getEnvironment().getOperatorCoordinatorEventGateway();
-        routes =
-                routingRules.stream()
-                        .map(
-                                rule -> {
-                                    String tableInclusions = rule.sourceTable;
-                                    Selectors selectors =
-                                            new Selectors.SelectorsBuilder()
-                                                    
.includeTables(tableInclusions)
-                                                    .build();
-                                    return new Tuple3<>(
-                                            selectors, rule.sinkTable, 
rule.replaceSymbol);
-                                })
-                        .collect(Collectors.toList());
-        schemaEvolutionClient = new SchemaEvolutionClient(toCoordinator, 
getOperatorID());
-        evolvedSchema =
-                CacheBuilder.newBuilder()
-                        .expireAfterAccess(CACHE_EXPIRE_DURATION)
-                        .build(
-                                new CacheLoader<TableId, Schema>() {
-                                    @Override
-                                    public Schema load(TableId tableId) {
-                                        return getLatestEvolvedSchema(tableId);
-                                    }
-                                });
-        originalSchema =
-                CacheBuilder.newBuilder()
-                        .expireAfterAccess(CACHE_EXPIRE_DURATION)
-                        .build(
-                                new CacheLoader<TableId, Schema>() {
-                                    @Override
-                                    public Schema load(TableId tableId) throws 
Exception {
-                                        return 
getLatestOriginalSchema(tableId);
-                                    }
-                                });
-        schemaDivergesMap =
-                CacheBuilder.newBuilder()
-                        .expireAfterAccess(CACHE_EXPIRE_DURATION)
-                        .build(
-                                new CacheLoader<TableId, Boolean>() {
-                                    @Override
-                                    public Boolean load(TableId tableId) 
throws Exception {
-                                        return checkSchemaDiverges(tableId);
-                                    }
-                                });
-        tableIdMappingCache =
-                CacheBuilder.newBuilder()
-                        .expireAfterAccess(CACHE_EXPIRE_DURATION)
-                        .build(
-                                new CacheLoader<TableId, List<TableId>>() {
-                                    @Override
-                                    public List<TableId> load(TableId tableId) 
{
-                                        return getRoutedTables(tableId);
-                                    }
-                                });
-    }
-
-    /**
-     * This method is guaranteed to not be called concurrently with other 
methods of the operator.
-     */
-    @Override
-    public void processElement(StreamRecord<Event> streamRecord)
-            throws InterruptedException, TimeoutException, ExecutionException {
-        Event event = streamRecord.getValue();
-        if (event instanceof SchemaChangeEvent) {
-            processSchemaChangeEvents((SchemaChangeEvent) event);
-        } else if (event instanceof DataChangeEvent) {
-            processDataChangeEvents(streamRecord, (DataChangeEvent) event);
-        } else {
-            throw new RuntimeException("Unknown event type in Stream record: " 
+ event);
-        }
-    }
-
-    private void processSchemaChangeEvents(SchemaChangeEvent event)
-            throws InterruptedException, TimeoutException, ExecutionException {
-        TableId tableId = event.tableId();
-        LOG.info(
-                "{}> Table {} received SchemaChangeEvent {} and start to be 
blocked.",
-                subTaskId,
-                tableId,
-                event);
-        handleSchemaChangeEvent(tableId, event);
-
-        if (event instanceof DropTableEvent) {
-            // Update caches unless event is a Drop table event. In that case, 
no schema will be
-            // available / necessary.
-            return;
-        }
-
-        originalSchema.put(tableId, getLatestOriginalSchema(tableId));
-        schemaDivergesMap.put(tableId, checkSchemaDiverges(tableId));
-
-        List<TableId> optionalRoutedTable = getRoutedTables(tableId);
-        if (!optionalRoutedTable.isEmpty()) {
-            tableIdMappingCache
-                    .get(tableId)
-                    .forEach(routed -> evolvedSchema.put(routed, 
getLatestEvolvedSchema(routed)));
-        } else {
-            evolvedSchema.put(tableId, getLatestEvolvedSchema(tableId));
-        }
-    }
-
-    private void processDataChangeEvents(StreamRecord<Event> streamRecord, 
DataChangeEvent event) {
-        TableId tableId = event.tableId();
-        List<TableId> optionalRoutedTable = getRoutedTables(tableId);
-        if (!optionalRoutedTable.isEmpty()) {
-            optionalRoutedTable.forEach(
-                    evolvedTableId -> {
-                        output.collect(
-                                new StreamRecord<>(
-                                        normalizeSchemaChangeEvents(event, 
evolvedTableId, false)));
-                    });
-        } else if 
(Boolean.FALSE.equals(schemaDivergesMap.getIfPresent(tableId))) {
-            output.collect(new 
StreamRecord<>(normalizeSchemaChangeEvents(event, true)));
-        } else {
-            output.collect(streamRecord);
-        }
-    }
-
-    private DataChangeEvent normalizeSchemaChangeEvents(
-            DataChangeEvent event, boolean tolerantMode) {
-        return normalizeSchemaChangeEvents(event, event.tableId(), 
tolerantMode);
-    }
-
-    private DataChangeEvent normalizeSchemaChangeEvents(
-            DataChangeEvent event, TableId renamedTableId, boolean 
tolerantMode) {
-        try {
-            Schema originalSchema = this.originalSchema.get(event.tableId());
-            Schema evolvedTableSchema = evolvedSchema.get(renamedTableId);
-            if (originalSchema.equals(evolvedTableSchema)) {
-                return ChangeEventUtils.recreateDataChangeEvent(event, 
renamedTableId);
-            }
-            switch (event.op()) {
-                case INSERT:
-                    return DataChangeEvent.insertEvent(
-                            renamedTableId,
-                            regenerateRecordData(
-                                    event.after(),
-                                    originalSchema,
-                                    evolvedTableSchema,
-                                    tolerantMode),
-                            event.meta());
-                case UPDATE:
-                    return DataChangeEvent.updateEvent(
-                            renamedTableId,
-                            regenerateRecordData(
-                                    event.before(),
-                                    originalSchema,
-                                    evolvedTableSchema,
-                                    tolerantMode),
-                            regenerateRecordData(
-                                    event.after(),
-                                    originalSchema,
-                                    evolvedTableSchema,
-                                    tolerantMode),
-                            event.meta());
-                case DELETE:
-                    return DataChangeEvent.deleteEvent(
-                            renamedTableId,
-                            regenerateRecordData(
-                                    event.before(),
-                                    originalSchema,
-                                    evolvedTableSchema,
-                                    tolerantMode),
-                            event.meta());
-                case REPLACE:
-                    return DataChangeEvent.replaceEvent(
-                            renamedTableId,
-                            regenerateRecordData(
-                                    event.after(),
-                                    originalSchema,
-                                    evolvedTableSchema,
-                                    tolerantMode),
-                            event.meta());
-                default:
-                    throw new IllegalArgumentException(
-                            String.format("Unrecognized operation type 
\"%s\"", event.op()));
-            }
-        } catch (Exception e) {
-            throw new IllegalStateException("Unable to fill null for empty 
columns", e);
-        }
-    }
-
-    private RecordData regenerateRecordData(
-            RecordData recordData,
-            Schema originalSchema,
-            Schema routedTableSchema,
-            boolean tolerantMode) {
-        // Regenerate record data
-        List<RecordData.FieldGetter> fieldGetters = new ArrayList<>();
-        for (Column column : routedTableSchema.getColumns()) {
-            String columnName = column.getName();
-            int columnIndex = 
originalSchema.getColumnNames().indexOf(columnName);
-            if (columnIndex == -1) {
-                fieldGetters.add(new NullFieldGetter());
-            } else {
-                RecordData.FieldGetter fieldGetter =
-                        RecordData.createFieldGetter(
-                                
originalSchema.getColumn(columnName).get().getType(), columnIndex);
-                // Check type compatibility, ignoring nullability
-                if (originalSchema
-                        .getColumn(columnName)
-                        .get()
-                        .getType()
-                        .nullable()
-                        .equals(column.getType().nullable())) {
-                    fieldGetters.add(fieldGetter);
-                } else {
-                    fieldGetters.add(
-                            new TypeCoercionFieldGetter(
-                                    
originalSchema.getColumn(columnName).get().getType(),
-                                    column.getType(),
-                                    fieldGetter,
-                                    tolerantMode,
-                                    timezone));
-                }
-            }
-        }
-        BinaryRecordDataGenerator recordDataGenerator =
-                new BinaryRecordDataGenerator(
-                        routedTableSchema.getColumnDataTypes().toArray(new 
DataType[0]));
-        return recordDataGenerator.generate(
-                fieldGetters.stream()
-                        .map(fieldGetter -> 
fieldGetter.getFieldOrNull(recordData))
-                        .toArray());
-    }
-
-    private List<TableId> getRoutedTables(TableId originalTableId) {
-        return routes.stream()
-                .filter(route -> route.f0.isMatch(originalTableId))
-                .map(route -> resolveReplacement(originalTableId, route))
-                .collect(Collectors.toList());
-    }
-
-    private TableId resolveReplacement(
-            TableId originalTable, Tuple3<Selectors, String, String> route) {
-        if (route.f2 != null) {
-            return TableId.parse(route.f1.replace(route.f2, 
originalTable.getTableName()));
-        }
-        return TableId.parse(route.f1);
-    }
-
-    private void handleSchemaChangeEvent(TableId tableId, SchemaChangeEvent 
schemaChangeEvent)
-            throws InterruptedException, TimeoutException {
-
-        if (schemaChangeBehavior == SchemaChangeBehavior.EXCEPTION
-                && schemaChangeEvent.getType() != 
SchemaChangeEventType.CREATE_TABLE) {
-            // CreateTableEvent should be applied even in EXCEPTION mode
-            throw new RuntimeException(
-                    String.format(
-                            "Refused to apply schema change event %s in 
EXCEPTION mode.",
-                            schemaChangeEvent));
-        }
-
-        // The request will block if another schema change event is being 
handled
-        SchemaChangeResponse response = requestSchemaChange(tableId, 
schemaChangeEvent);
-        if (response.isAccepted()) {
-            LOG.info("{}> Sending the FlushEvent for table {}.", subTaskId, 
tableId);
-            output.collect(new StreamRecord<>(new FlushEvent(tableId)));
-            List<SchemaChangeEvent> expectedSchemaChangeEvents = 
response.getSchemaChangeEvents();
-            
schemaOperatorMetrics.increaseSchemaChangeEvents(expectedSchemaChangeEvents.size());
-
-            // The request will block until flushing finished in each sink 
writer
-            SchemaChangeResultResponse schemaEvolveResponse = 
requestSchemaChangeResult();
-            List<SchemaChangeEvent> finishedSchemaChangeEvents =
-                    schemaEvolveResponse.getFinishedSchemaChangeEvents();
-
-            // Update evolved schema changes based on apply results
-            finishedSchemaChangeEvents.forEach(e -> output.collect(new 
StreamRecord<>(e)));
-        } else if (response.isDuplicate()) {
-            LOG.info(
-                    "{}> Schema change event {} has been handled in another 
subTask already.",
-                    subTaskId,
-                    schemaChangeEvent);
-        } else if (response.isIgnored()) {
-            LOG.info(
-                    "{}> Schema change event {} has been ignored. No schema 
evolution needed.",
-                    subTaskId,
-                    schemaChangeEvent);
-        } else {
-            throw new IllegalStateException("Unexpected response status " + 
response);
-        }
-    }
-
-    private SchemaChangeResponse requestSchemaChange(
-            TableId tableId, SchemaChangeEvent schemaChangeEvent)
-            throws InterruptedException, TimeoutException {
-        long schemaEvolveTimeOutMillis = System.currentTimeMillis() + 
rpcTimeOutInMillis;
-        while (true) {
-            SchemaChangeResponse response =
-                    sendRequestToCoordinator(
-                            new SchemaChangeRequest(tableId, 
schemaChangeEvent, subTaskId));
-            if (response.isRegistryBusy()) {
-                if (System.currentTimeMillis() < schemaEvolveTimeOutMillis) {
-                    LOG.info(
-                            "{}> Schema Registry is busy now, waiting for next 
request...",
-                            subTaskId);
-                    Thread.sleep(1000);
-                } else {
-                    throw new TimeoutException("TimeOut when requesting schema 
change");
-                }
-            } else {
-                return response;
-            }
-        }
-    }
-
-    private SchemaChangeResultResponse requestSchemaChangeResult()
-            throws InterruptedException, TimeoutException {
-        CoordinationResponse coordinationResponse =
-                sendRequestToCoordinator(new SchemaChangeResultRequest());
-        long nextRpcTimeOutMillis = System.currentTimeMillis() + 
rpcTimeOutInMillis;
-        while (coordinationResponse instanceof SchemaChangeProcessingResponse) 
{
-            if (System.currentTimeMillis() < nextRpcTimeOutMillis) {
-                Thread.sleep(1000);
-                coordinationResponse = sendRequestToCoordinator(new 
SchemaChangeResultRequest());
-            } else {
-                throw new TimeoutException("TimeOut when requesting release 
upstream");
-            }
-        }
-        return ((SchemaChangeResultResponse) coordinationResponse);
-    }
-
-    private <REQUEST extends CoordinationRequest, RESPONSE extends 
CoordinationResponse>
-            RESPONSE sendRequestToCoordinator(REQUEST request) {
-        try {
-            CompletableFuture<CoordinationResponse> responseFuture =
-                    toCoordinator.sendRequestToCoordinator(
-                            getOperatorID(), new SerializedValue<>(request));
-            return CoordinationResponseUtils.unwrap(responseFuture.get());
-        } catch (Exception e) {
-            throw new IllegalStateException(
-                    "Failed to send request to coordinator: " + 
request.toString(), e);
-        }
-    }
-
-    private Schema getLatestEvolvedSchema(TableId tableId) {
-        try {
-            Optional<Schema> optionalSchema = 
schemaEvolutionClient.getLatestEvolvedSchema(tableId);
-            if (!optionalSchema.isPresent()) {
-                throw new IllegalStateException(
-                        String.format("Schema doesn't exist for table \"%s\"", 
tableId));
-            }
-            return optionalSchema.get();
-        } catch (Exception e) {
-            throw new IllegalStateException(
-                    String.format("Unable to get latest schema for table 
\"%s\"", tableId), e);
-        }
-    }
-
-    private Schema getLatestOriginalSchema(TableId tableId) {
-        try {
-            Optional<Schema> optionalSchema =
-                    schemaEvolutionClient.getLatestOriginalSchema(tableId);
-            if (!optionalSchema.isPresent()) {
-                throw new IllegalStateException(
-                        String.format("Schema doesn't exist for table \"%s\"", 
tableId));
-            }
-            return optionalSchema.get();
-        } catch (Exception e) {
-            throw new IllegalStateException(
-                    String.format("Unable to get latest schema for table 
\"%s\"", tableId), e);
-        }
-    }
-
-    private Boolean checkSchemaDiverges(TableId tableId) {
-        try {
-            return 
getLatestEvolvedSchema(tableId).equals(getLatestOriginalSchema(tableId));
-        } catch (IllegalStateException e) {
-            // schema fetch failed, regard it as diverged
-            return true;
-        }
-    }
-
-    private static class NullFieldGetter implements RecordData.FieldGetter {
-        @Nullable
-        @Override
-        public Object getFieldOrNull(RecordData recordData) {
-            return null;
-        }
-    }
-
-    private static class TypeCoercionFieldGetter implements 
RecordData.FieldGetter {
-        private final DataType originalType;
-        private final DataType destinationType;
-        private final RecordData.FieldGetter originalFieldGetter;
-        private final boolean tolerantMode;
-        private final String timezone;
-
-        public TypeCoercionFieldGetter(
-                DataType originalType,
-                DataType destinationType,
-                RecordData.FieldGetter originalFieldGetter,
-                boolean tolerantMode,
-                String timezone) {
-            this.originalType = originalType;
-            this.destinationType = destinationType;
-            this.originalFieldGetter = originalFieldGetter;
-            this.tolerantMode = tolerantMode;
-            this.timezone = timezone;
-        }
-
-        private Object fail(IllegalArgumentException e) throws 
IllegalArgumentException {
-            if (tolerantMode) {
-                return null;
-            }
-            throw e;
-        }
-
-        @Nullable
-        @Override
-        public Object getFieldOrNull(RecordData recordData) {
-            Object originalField = 
originalFieldGetter.getFieldOrNull(recordData);
-            if (originalField == null) {
-                return null;
-            }
-            if (destinationType.is(DataTypeRoot.BIGINT)) {
-                if (originalField instanceof Byte) {
-                    // TINYINT
-                    return ((Byte) originalField).longValue();
-                } else if (originalField instanceof Short) {
-                    // SMALLINT
-                    return ((Short) originalField).longValue();
-                } else if (originalField instanceof Integer) {
-                    // INT
-                    return ((Integer) originalField).longValue();
-                } else if (originalField instanceof Long) {
-                    // BIGINT
-                    return originalField;
-                } else {
-                    return fail(
-                            new IllegalArgumentException(
-                                    String.format(
-                                            "Cannot fit type \"%s\" into a 
BIGINT column. "
-                                                    + "Currently only TINYINT 
/ SMALLINT / INT / LONG can be accepted by a BIGINT column",
-                                            originalField.getClass())));
-                }
-            } else if (destinationType instanceof DecimalType) {
-                DecimalType decimalType = (DecimalType) destinationType;
-                BigDecimal decimalValue;
-                if (originalField instanceof Byte) {
-                    decimalValue = BigDecimal.valueOf(((Byte) 
originalField).longValue(), 0);
-                } else if (originalField instanceof Short) {
-                    decimalValue = BigDecimal.valueOf(((Short) 
originalField).longValue(), 0);
-                } else if (originalField instanceof Integer) {
-                    decimalValue = BigDecimal.valueOf(((Integer) 
originalField).longValue(), 0);
-                } else if (originalField instanceof Long) {
-                    decimalValue = BigDecimal.valueOf((Long) originalField, 0);
-                } else if (originalField instanceof DecimalData) {
-                    decimalValue = ((DecimalData) 
originalField).toBigDecimal();
-                } else {
-                    return fail(
-                            new IllegalArgumentException(
-                                    String.format(
-                                            "Cannot fit type \"%s\" into a 
DECIMAL column. "
-                                                    + "Currently only BYTE / 
SHORT / INT / LONG / DECIMAL can be accepted by a DECIMAL column",
-                                            originalField.getClass())));
-                }
-                return decimalValue != null
-                        ? DecimalData.fromBigDecimal(
-                                decimalValue, decimalType.getPrecision(), 
decimalType.getScale())
-                        : null;
-            } else if (destinationType.is(DataTypeFamily.APPROXIMATE_NUMERIC)) 
{
-                if (originalField instanceof Float) {
-                    // FLOAT
-                    return ((Float) originalField).doubleValue();
-                } else {
-                    return fail(
-                            new IllegalArgumentException(
-                                    String.format(
-                                            "Cannot fit type \"%s\" into a 
DOUBLE column. "
-                                                    + "Currently only FLOAT 
can be accepted by a DOUBLE column",
-                                            originalField.getClass())));
-                }
-            } else if (destinationType.is(DataTypeRoot.VARCHAR)) {
-                if (originalField instanceof StringData) {
-                    return originalField;
-                } else {
-                    return fail(
-                            new IllegalArgumentException(
-                                    String.format(
-                                            "Cannot fit type \"%s\" into a 
STRING column. "
-                                                    + "Currently only CHAR / 
VARCHAR can be accepted by a STRING column",
-                                            originalField.getClass())));
-                }
-            } else if 
(destinationType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)
-                    && 
originalType.is(DataTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE)) {
-                // For now, TimestampData / ZonedTimestampData / 
LocalZonedTimestampData has no
-                // difference in its internal representation, so there's no 
need to do any precision
-                // conversion.
-                return originalField;
-            } else if 
(destinationType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)
-                    && originalType.is(DataTypeRoot.TIMESTAMP_WITH_TIME_ZONE)) 
{
-                return originalField;
-            } else if 
(destinationType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)
-                    && 
originalType.is(DataTypeRoot.TIMESTAMP_WITH_LOCAL_TIME_ZONE)) {
-                return originalField;
-            } else if (destinationType.is(DataTypeFamily.TIMESTAMP)
-                    && originalType.is(DataTypeFamily.TIMESTAMP)) {
-                return castToTimestamp(originalField, timezone);
-            } else {
-                return fail(
-                        new IllegalArgumentException(
-                                String.format(
-                                        "Column type \"%s\" doesn't support 
type coercion",
-                                        destinationType)));
-            }
-        }
-    }
-
-    @Override
-    public void snapshotState(StateSnapshotContext context) throws Exception {
-        // Needless to do anything, since AbstractStreamOperator#snapshotState 
and #processElement
-        // is guaranteed not to be mixed together.
-    }
-
-    private static TimestampData castToTimestamp(Object object, String 
timezone) {
-        if (object == null) {
-            return null;
-        }
-        if (object instanceof LocalZonedTimestampData) {
-            return TimestampData.fromLocalDateTime(
-                    LocalDateTime.ofInstant(
-                            ((LocalZonedTimestampData) object).toInstant(), 
ZoneId.of(timezone)));
-        } else if (object instanceof ZonedTimestampData) {
-            return TimestampData.fromLocalDateTime(
-                    LocalDateTime.ofInstant(
-                            ((ZonedTimestampData) object).toInstant(), 
ZoneId.of(timezone)));
-        } else {
-            throw new IllegalArgumentException(
-                    String.format(
-                            "Unable to implicitly coerce object `%s` as a 
TIMESTAMP.", object));
-        }
-    }
-}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
new file mode 100644
index 000000000..23edeccb4
--- /dev/null
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinator.java
@@ -0,0 +1,499 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.schema.regular;
+
+import org.apache.flink.cdc.common.event.CreateTableEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
+import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.sink.MetadataApplier;
+import org.apache.flink.cdc.common.utils.Preconditions;
+import org.apache.flink.cdc.common.utils.SchemaMergingUtils;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaManager;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaRegistry;
+import 
org.apache.flink.cdc.runtime.operators.schema.common.event.FlushSuccessEvent;
+import 
org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
+import 
org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
+import org.apache.flink.cdc.runtime.serializer.TableIdSerializer;
+import org.apache.flink.core.memory.DataInputViewStreamWrapper;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils.wrap;
+
+/** Coordinator node for {@link SchemaOperator}. Registry actor in regular 
topology. */
+public class SchemaCoordinator extends SchemaRegistry {
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaCoordinator.class);
+
+    /** Executor service to execute schema change. */
+    private final ExecutorService schemaChangeThreadPool;
+
+    /**
+     * Atomic flag indicating if current RequestHandler could accept more 
schema changes for now.
+     */
+    private transient RequestStatus schemaChangeStatus;
+
+    /** Sink writers which have sent flush success events for the request. */
+    private transient ConcurrentHashMap<Integer, Set<Integer>> 
flushedSinkWriters;
+
+    /** Currently handling request's completable future. */
+    private transient CompletableFuture<CoordinationResponse> 
pendingResponseFuture;
+
+    // Static fields
+    public SchemaCoordinator(
+            String operatorName,
+            OperatorCoordinator.Context context,
+            ExecutorService coordinatorExecutor,
+            MetadataApplier metadataApplier,
+            List<RouteRule> routes,
+            SchemaChangeBehavior schemaChangeBehavior,
+            Duration rpcTimeout) {
+        super(
+                context,
+                operatorName,
+                coordinatorExecutor,
+                metadataApplier,
+                routes,
+                schemaChangeBehavior,
+                rpcTimeout);
+        this.schemaChangeThreadPool = Executors.newSingleThreadExecutor();
+    }
+
+    @Override
+    public void start() throws Exception {
+        super.start();
+        this.flushedSinkWriters = new ConcurrentHashMap<>();
+        this.schemaChangeStatus = RequestStatus.IDLE;
+    }
+
+    @Override
+    public void close() throws Exception {
+        super.close();
+        if (schemaChangeThreadPool != null && 
!schemaChangeThreadPool.isShutdown()) {
+            schemaChangeThreadPool.shutdownNow();
+        }
+    }
+
+    @Override
+    protected void snapshot(CompletableFuture<byte[]> resultFuture) throws 
Exception {
+        try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+                DataOutputStream out = new DataOutputStream(baos)) {
+            // Serialize SchemaManager
+            int schemaManagerSerializerVersion = 
SchemaManager.SERIALIZER.getVersion();
+            out.writeInt(schemaManagerSerializerVersion);
+            byte[] serializedSchemaManager;
+            serializedSchemaManager = 
SchemaManager.SERIALIZER.serialize(schemaManager);
+            out.writeInt(serializedSchemaManager.length);
+            out.write(serializedSchemaManager);
+
+            // Length-bit for SchemaDerivation, which is no longer necessary.
+            out.writeInt(0);
+            resultFuture.complete(baos.toByteArray());
+        }
+    }
+
+    @Override
+    protected void restore(byte[] checkpointData) throws Exception {
+        try (ByteArrayInputStream bais = new 
ByteArrayInputStream(checkpointData);
+                DataInputStream in = new DataInputStream(bais)) {
+            int schemaManagerSerializerVersion = in.readInt();
+
+            switch (schemaManagerSerializerVersion) {
+                case 0:
+                    {
+                        int length = in.readInt();
+                        byte[] serializedSchemaManager = new byte[length];
+                        in.readFully(serializedSchemaManager);
+                        schemaManager =
+                                SchemaManager.SERIALIZER.deserialize(
+                                        schemaManagerSerializerVersion, 
serializedSchemaManager);
+                        break;
+                    }
+                case 1:
+                case 2:
+                    {
+                        int length = in.readInt();
+                        byte[] serializedSchemaManager = new byte[length];
+                        in.readFully(serializedSchemaManager);
+                        schemaManager =
+                                SchemaManager.SERIALIZER.deserialize(
+                                        schemaManagerSerializerVersion, 
serializedSchemaManager);
+                        consumeUnusedSchemaDerivationBytes(in);
+                        break;
+                    }
+                default:
+                    throw new IOException(
+                            "Unrecognized serialization version " + 
schemaManagerSerializerVersion);
+            }
+        }
+    }
+
+    @Override
+    protected void handleCustomCoordinationRequest(
+            CoordinationRequest request, 
CompletableFuture<CoordinationResponse> responseFuture) {
+        if (request instanceof SchemaChangeRequest) {
+            handleSchemaChangeRequest((SchemaChangeRequest) request, 
responseFuture);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Unknown coordination request type: " + request);
+        }
+    }
+
+    @Override
+    protected void handleFlushSuccessEvent(FlushSuccessEvent event) {
+        int sinkSubtask = event.getSinkSubTaskId();
+        int sourceSubtask = event.getSourceSubTaskId();
+        LOG.info(
+                "Sink subtask {} succeed flushing from source subTask {}.",
+                sinkSubtask,
+                sourceSubtask);
+        if (!flushedSinkWriters.containsKey(sourceSubtask)) {
+            flushedSinkWriters.put(sourceSubtask, 
ConcurrentHashMap.newKeySet());
+        }
+        flushedSinkWriters.get(sourceSubtask).add(sinkSubtask);
+        LOG.info(
+                "Currently flushed sink writers for source task {} are: {}",
+                sourceSubtask,
+                flushedSinkWriters.get(sourceSubtask));
+    }
+
+    @Override
+    protected void handleUnrecoverableError(String taskDescription, Throwable 
t) {
+        super.handleUnrecoverableError(taskDescription, t);
+
+        // There's a pending future, release it exceptionally before quitting
+        if (pendingResponseFuture != null) {
+            pendingResponseFuture.completeExceptionally(t);
+        }
+    }
+
+    /**
+     * Handle the {@link SchemaChangeRequest} and wait for all sink subtasks 
flushing.
+     *
+     * @param request the received SchemaChangeRequest
+     */
+    public void handleSchemaChangeRequest(
+            SchemaChangeRequest request, 
CompletableFuture<CoordinationResponse> responseFuture) {
+
+        // We use subTaskId to identify each schema change request
+        int subTaskId = request.getSubTaskId();
+
+        if (schemaChangeStatus == RequestStatus.IDLE) {
+            if (activeSinkWriters.size() < currentParallelism) {
+                LOG.info(
+                        "Not all active sink writers have been registered. 
Current {}, expected {}.",
+                        activeSinkWriters.size(),
+                        currentParallelism);
+                
responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush()));
+                return;
+            }
+
+            if (!activeSinkWriters.equals(flushedSinkWriters.get(subTaskId))) {
+                LOG.info(
+                        "Not all active sink writers have completed flush. 
Flushed writers: {}, expected: {}.",
+                        flushedSinkWriters.get(subTaskId),
+                        activeSinkWriters);
+                
responseFuture.complete(wrap(SchemaChangeResponse.waitingForFlush()));
+                return;
+            }
+
+            LOG.info(
+                    "All sink writers have flushed for subTaskId {}. Switching 
to APPLYING state and starting schema evolution...",
+                    subTaskId);
+            flushedSinkWriters.remove(subTaskId);
+            schemaChangeStatus = RequestStatus.APPLYING;
+            pendingResponseFuture = responseFuture;
+            startSchemaChangesEvolve(request, responseFuture);
+        } else {
+            responseFuture.complete(wrap(SchemaChangeResponse.busy()));
+        }
+    }
+
+    private void startSchemaChangesEvolve(
+            SchemaChangeRequest request, 
CompletableFuture<CoordinationResponse> responseFuture) {
+        SchemaChangeEvent originalEvent = request.getSchemaChangeEvent();
+        TableId originalTableId = originalEvent.tableId();
+        Schema currentUpstreamSchema =
+                
schemaManager.getLatestOriginalSchema(originalTableId).orElse(null);
+
+        List<SchemaChangeEvent> deducedSchemaChangeEvents = new ArrayList<>();
+
+        // For redundant schema change events (possibly coming from duplicate 
emitted
+        // CreateTableEvents in snapshot stage), we just skip them.
+        if (!SchemaUtils.isSchemaChangeEventRedundant(currentUpstreamSchema, 
originalEvent)) {
+            schemaManager.applyOriginalSchemaChange(originalEvent);
+            
deducedSchemaChangeEvents.addAll(deduceEvolvedSchemaChanges(originalEvent));
+        } else {
+            LOG.info(
+                    "Schema change event {} is redundant for current schema 
{}, just skip it.",
+                    originalEvent,
+                    currentUpstreamSchema);
+        }
+
+        LOG.info(
+                "All sink subtask have flushed for table {}. Start to apply 
schema change request: \n\t{}\nthat extracts to:\n\t{}",
+                request.getTableId().toString(),
+                request,
+                deducedSchemaChangeEvents.stream()
+                        .map(SchemaChangeEvent::toString)
+                        .collect(Collectors.joining("\n\t")));
+        schemaChangeThreadPool.submit(
+                () -> {
+                    try {
+                        applySchemaChange(originalEvent, 
deducedSchemaChangeEvents);
+                    } catch (Throwable t) {
+                        failJob(
+                                "Schema change applying task",
+                                new FlinkRuntimeException(
+                                        "Failed to apply schema change 
event.", t));
+                        throw t;
+                    }
+                });
+    }
+
+    private List<SchemaChangeEvent> 
deduceEvolvedSchemaChanges(SchemaChangeEvent event) {
+        LOG.info("Step 1 - Start deducing evolved schema change for {}", 
event);
+
+        TableId originalTableId = event.tableId();
+        List<SchemaChangeEvent> deducedSchemaChangeEvents = new ArrayList<>();
+        Set<TableId> originalTables = schemaManager.getAllOriginalTables();
+
+        // First, grab all affected evolved tables.
+        Set<TableId> affectedEvolvedTables =
+                SchemaDerivator.getAffectedEvolvedTables(
+                        router, Collections.singleton(originalTableId));
+        LOG.info("Step 2 - Affected downstream tables are: {}", 
affectedEvolvedTables);
+
+        // For each affected table, we need to...
+        for (TableId evolvedTableId : affectedEvolvedTables) {
+            Schema currentEvolvedSchema =
+                    
schemaManager.getLatestEvolvedSchema(evolvedTableId).orElse(null);
+            LOG.info(
+                    "Step 3.1 - For to-be-evolved table {} with schema {}...",
+                    evolvedTableId,
+                    currentEvolvedSchema);
+
+            // ... reversely look up this affected sink table's upstream 
dependency
+            Set<TableId> upstreamDependencies =
+                    SchemaDerivator.reverseLookupDependingUpstreamTables(
+                            router, evolvedTableId, originalTables);
+            Preconditions.checkArgument(
+                    !upstreamDependencies.isEmpty(),
+                    "An affected sink table's upstream dependency cannot be 
empty.");
+            LOG.info("Step 3.2 - upstream dependency tables are: {}", 
upstreamDependencies);
+
+            List<SchemaChangeEvent> rawSchemaChangeEvents = new ArrayList<>();
+            if (upstreamDependencies.size() == 1) {
+                // If it's a one-by-one routing rule, we can simply forward it 
to downstream sink.
+                SchemaChangeEvent rawEvent = event.copy(evolvedTableId);
+                rawSchemaChangeEvents.add(rawEvent);
+                LOG.info(
+                        "Step 3.3 - It's an one-by-one routing and could be 
forwarded as {}.",
+                        rawEvent);
+            } else {
+                Set<Schema> toBeMergedSchemas =
+                        SchemaDerivator.reverseLookupDependingUpstreamSchemas(
+                                router, evolvedTableId, schemaManager);
+                LOG.info("Step 3.3 - Upstream dependency schemas are: {}.", 
toBeMergedSchemas);
+
+                // We're in a table routing mode now, so we need to infer a 
widest schema for all
+                // upstream tables.
+                Schema mergedSchema = currentEvolvedSchema;
+                for (Schema toBeMergedSchema : toBeMergedSchemas) {
+                    mergedSchema =
+                            
SchemaMergingUtils.getLeastCommonSchema(mergedSchema, toBeMergedSchema);
+                }
+                LOG.info("Step 3.4 - Deduced widest schema is: {}.", 
mergedSchema);
+
+                // Detect what schema changes we need to apply to get expected 
sink table.
+                List<SchemaChangeEvent> rawEvents =
+                        SchemaMergingUtils.getSchemaDifference(
+                                evolvedTableId, currentEvolvedSchema, 
mergedSchema);
+                LOG.info(
+                        "Step 3.5 - It's an many-to-one routing and causes 
schema changes: {}.",
+                        rawEvents);
+
+                rawSchemaChangeEvents.addAll(rawEvents);
+            }
+
+            // Finally, we normalize schema change events, including rewriting 
events by current
+            // schema change behavior configuration, dropping explicitly 
excluded schema change
+            // event types.
+            List<SchemaChangeEvent> normalizedEvents =
+                    SchemaDerivator.normalizeSchemaChangeEvents(
+                            currentEvolvedSchema, rawSchemaChangeEvents, 
behavior, metadataApplier);
+            LOG.info(
+                    "Step 4 - After being normalized with {} behavior, final 
schema change events are: {}",
+                    behavior,
+                    normalizedEvents);
+
+            deducedSchemaChangeEvents.addAll(normalizedEvents);
+        }
+
+        return deducedSchemaChangeEvents;
+    }
+
+    /** Applies the schema change to the external system. */
+    private void applySchemaChange(
+            SchemaChangeEvent originalEvent, List<SchemaChangeEvent> 
deducedSchemaChangeEvents) {
+        if (SchemaChangeBehavior.EXCEPTION.equals(behavior)) {
+            if (deducedSchemaChangeEvents.stream()
+                    .anyMatch(evt -> !(evt instanceof CreateTableEvent))) {
+                SchemaChangeEvent unacceptableSchemaChangeEvent =
+                        deducedSchemaChangeEvents.stream()
+                                .filter(evt -> !(evt instanceof 
CreateTableEvent))
+                                .findAny()
+                                .get();
+                throw new SchemaEvolveException(
+                        unacceptableSchemaChangeEvent,
+                        "Unexpected schema change events occurred in EXCEPTION 
mode. Job will fail now.");
+            }
+        }
+
+        // Tries to apply it to external system
+        List<SchemaChangeEvent> appliedSchemaChangeEvents = new ArrayList<>();
+        for (SchemaChangeEvent event : deducedSchemaChangeEvents) {
+            if (applyAndUpdateEvolvedSchemaChange(event)) {
+                appliedSchemaChangeEvents.add(event);
+            }
+        }
+
+        Map<TableId, Schema> refreshedEvolvedSchemas = new HashMap<>();
+
+        // We need to retrieve all possibly modified evolved schemas and 
refresh SchemaOperator's
+        // local cache since it might have been altered by another 
SchemaOperator instance.
+        // SchemaChangeEvents doesn't need to be emitted to downstream (since 
it might be broadcast
+        // from other SchemaOperators) though.
+        for (TableId tableId : router.route(originalEvent.tableId())) {
+            refreshedEvolvedSchemas.put(
+                    tableId, 
schemaManager.getLatestEvolvedSchema(tableId).orElse(null));
+        }
+
+        // And returns all successfully applied schema change events to 
SchemaOperator.
+        pendingResponseFuture.complete(
+                wrap(
+                        SchemaChangeResponse.success(
+                                appliedSchemaChangeEvents, 
refreshedEvolvedSchemas)));
+        pendingResponseFuture = null;
+
+        Preconditions.checkState(
+                schemaChangeStatus == RequestStatus.APPLYING,
+                "Illegal schemaChangeStatus state: should be APPLYING before 
applySchemaChange finishes, not "
+                        + schemaChangeStatus);
+        schemaChangeStatus = RequestStatus.IDLE;
+        LOG.info("SchemaChangeStatus switched from APPLYING to IDLE.");
+    }
+
+    private boolean applyAndUpdateEvolvedSchemaChange(SchemaChangeEvent 
schemaChangeEvent) {
+        try {
+            metadataApplier.applySchemaChange(schemaChangeEvent);
+            schemaManager.applyEvolvedSchemaChange(schemaChangeEvent);
+            LOG.info(
+                    "Successfully applied schema change event {} to external 
system.",
+                    schemaChangeEvent);
+            return true;
+        } catch (Throwable t) {
+            if (shouldIgnoreException(t)) {
+                LOG.warn(
+                        "Failed to apply schema change {}, but keeps running 
in tolerant mode. Caused by: {}",
+                        schemaChangeEvent,
+                        t);
+                return false;
+            } else {
+                throw t;
+            }
+        }
+    }
+
+    // -------------------------
+    // Utilities
+    // -------------------------
+
+    private boolean shouldIgnoreException(Throwable throwable) {
+        // In IGNORE mode, will never try to apply schema change events
+        // In EVOLVE and LENIENT mode, such failure will not be tolerated
+        // In EXCEPTION mode, an exception will be thrown once captured
+        return (throwable instanceof UnsupportedSchemaChangeEventException)
+                && (SchemaChangeBehavior.TRY_EVOLVE.equals(behavior));
+    }
+
+    /**
+     * {@code IDLE}: Initial idling state, ready for requests. <br>
+     * {@code APPLYING}: When schema change application finishes (successfully 
or with exceptions)
+     */
+    private enum RequestStatus {
+        IDLE,
+        APPLYING
+    }
+
+    /**
+     * Before Flink CDC 3.3, we store routing rules into {@link 
SchemaCoordinator}'s state, which
+     * turns out to be unnecessary since data stream topology might change 
after stateful restarts,
+     * and stale routing status is both unnecessary and erroneous. This 
function consumes these
+     * bytes from the state, but never returns them.
+     */
+    private void consumeUnusedSchemaDerivationBytes(DataInputStream in) throws 
IOException {
+        TableIdSerializer tableIdSerializer = TableIdSerializer.INSTANCE;
+        int derivationMappingSize = in.readInt();
+        Map<TableId, Set<TableId>> derivationMapping = new 
HashMap<>(derivationMappingSize);
+        for (int i = 0; i < derivationMappingSize; i++) {
+            // Routed table ID
+            TableId routedTableId =
+                    tableIdSerializer.deserialize(new 
DataInputViewStreamWrapper(in));
+            // Original table IDs
+            int numOriginalTables = in.readInt();
+            Set<TableId> originalTableIds = new HashSet<>(numOriginalTables);
+            for (int j = 0; j < numOriginalTables; j++) {
+                TableId originalTableId =
+                        tableIdSerializer.deserialize(new 
DataInputViewStreamWrapper(in));
+                originalTableIds.add(originalTableId);
+            }
+            derivationMapping.put(routedTableId, originalTableIds);
+        }
+    }
+}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java
similarity index 53%
copy from 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
copy to 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java
index 367f65597..253b52cac 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaCoordinatorProvider.java
@@ -15,50 +15,67 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.operators.schema;
+package org.apache.flink.cdc.runtime.operators.schema.regular;
 
 import org.apache.flink.cdc.common.annotation.Internal;
-import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
-import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider;
+import 
org.apache.flink.cdc.runtime.operators.schema.common.CoordinatorExecutorThreadFactory;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
-import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
-import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
-import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 
 import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 
-/** Factory to create {@link SchemaOperator}. */
+/** Provider of {@link SchemaCoordinator}. */
 @Internal
-public class SchemaOperatorFactory extends SimpleOperatorFactory<Event>
-        implements CoordinatedOperatorFactory<Event>, 
OneInputStreamOperatorFactory<Event, Event> {
-
+public class SchemaCoordinatorProvider implements OperatorCoordinator.Provider 
{
     private static final long serialVersionUID = 1L;
 
+    private final OperatorID operatorID;
+    private final String operatorName;
     private final MetadataApplier metadataApplier;
     private final List<RouteRule> routingRules;
     private final SchemaChangeBehavior schemaChangeBehavior;
+    private final Duration rpcTimeout;
 
-    public SchemaOperatorFactory(
+    public SchemaCoordinatorProvider(
+            OperatorID operatorID,
+            String operatorName,
             MetadataApplier metadataApplier,
             List<RouteRule> routingRules,
-            Duration rpcTimeOut,
             SchemaChangeBehavior schemaChangeBehavior,
-            String timezone) {
-        super(new SchemaOperator(routingRules, rpcTimeOut, 
schemaChangeBehavior, timezone));
+            Duration rpcTimeout) {
+        this.operatorID = operatorID;
+        this.operatorName = operatorName;
         this.metadataApplier = metadataApplier;
         this.routingRules = routingRules;
         this.schemaChangeBehavior = schemaChangeBehavior;
+        this.rpcTimeout = rpcTimeout;
+    }
+
+    @Override
+    public OperatorID getOperatorId() {
+        return operatorID;
     }
 
     @Override
-    public OperatorCoordinator.Provider getCoordinatorProvider(
-            String operatorName, OperatorID operatorID) {
-        return new SchemaRegistryProvider(
-                operatorID, operatorName, metadataApplier, routingRules, 
schemaChangeBehavior);
+    public OperatorCoordinator create(OperatorCoordinator.Context context) 
throws Exception {
+        CoordinatorExecutorThreadFactory coordinatorThreadFactory =
+                new CoordinatorExecutorThreadFactory(
+                        "schema-evolution-coordinator", 
context.getUserCodeClassloader());
+        ExecutorService coordinatorExecutor =
+                Executors.newSingleThreadExecutor(coordinatorThreadFactory);
+        return new SchemaCoordinator(
+                operatorName,
+                context,
+                coordinatorExecutor,
+                metadataApplier,
+                routingRules,
+                schemaChangeBehavior,
+                rpcTimeout);
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
new file mode 100644
index 000000000..9ea79d729
--- /dev/null
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperator.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cdc.runtime.operators.schema.regular;
+
+import org.apache.flink.cdc.common.annotation.Internal;
+import org.apache.flink.cdc.common.annotation.VisibleForTesting;
+import org.apache.flink.cdc.common.event.DataChangeEvent;
+import org.apache.flink.cdc.common.event.Event;
+import org.apache.flink.cdc.common.event.FlushEvent;
+import org.apache.flink.cdc.common.event.SchemaChangeEvent;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
+import org.apache.flink.cdc.common.route.RouteRule;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.common.utils.SchemaUtils;
+import 
org.apache.flink.cdc.runtime.operators.schema.common.CoordinationResponseUtils;
+import org.apache.flink.cdc.runtime.operators.schema.common.SchemaDerivator;
+import org.apache.flink.cdc.runtime.operators.schema.common.TableIdRouter;
+import 
org.apache.flink.cdc.runtime.operators.schema.common.metrics.SchemaOperatorMetrics;
+import 
org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeRequest;
+import 
org.apache.flink.cdc.runtime.operators.schema.regular.event.SchemaChangeResponse;
+import org.apache.flink.runtime.jobgraph.tasks.TaskOperatorEventGateway;
+import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
+import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.Output;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.StreamTask;
+import org.apache.flink.util.SerializedValue;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.time.Duration;
+import java.time.Instant;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static 
org.apache.flink.cdc.common.pipeline.PipelineOptions.DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT;
+
+/**
+ * The operator will evolve schemas in {@link
+ * org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator} 
for incoming {@link
+ * SchemaChangeEvent}s and block the stream for tables before their schema 
changes finish.
+ */
+@Internal
+public class SchemaOperator extends AbstractStreamOperator<Event>
+        implements OneInputStreamOperator<Event, Event>, Serializable {
+
+    private static final long serialVersionUID = 1L;
+    private static final Logger LOG = 
LoggerFactory.getLogger(SchemaOperator.class);
+
+    // Final fields that are set in constructor
+    private final String timezone;
+    private final Duration rpcTimeout;
+    private final SchemaChangeBehavior schemaChangeBehavior;
+    private final List<RouteRule> routingRules;
+
+    // Transient fields that are set during open()
+    private transient int subTaskId;
+    private transient TaskOperatorEventGateway toCoordinator;
+    private transient SchemaOperatorMetrics schemaOperatorMetrics;
+    private transient volatile Map<TableId, Schema> originalSchemaMap;
+    private transient volatile Map<TableId, Schema> evolvedSchemaMap;
+    private transient TableIdRouter router;
+    private transient SchemaDerivator derivator;
+
+    @VisibleForTesting
+    public SchemaOperator(List<RouteRule> routingRules) {
+        this(routingRules, DEFAULT_SCHEMA_OPERATOR_RPC_TIMEOUT);
+    }
+
+    @VisibleForTesting
+    public SchemaOperator(List<RouteRule> routingRules, Duration rpcTimeOut) {
+        this(routingRules, rpcTimeOut, SchemaChangeBehavior.EVOLVE);
+    }
+
+    @VisibleForTesting
+    public SchemaOperator(
+            List<RouteRule> routingRules,
+            Duration rpcTimeOut,
+            SchemaChangeBehavior schemaChangeBehavior) {
+        this(routingRules, rpcTimeOut, schemaChangeBehavior, "UTC");
+    }
+
+    public SchemaOperator(
+            List<RouteRule> routingRules,
+            Duration rpcTimeOut,
+            SchemaChangeBehavior schemaChangeBehavior,
+            String timezone) {
+        this.chainingStrategy = ChainingStrategy.ALWAYS;
+        this.rpcTimeout = rpcTimeOut;
+        this.schemaChangeBehavior = schemaChangeBehavior;
+        this.timezone = timezone;
+        this.routingRules = routingRules;
+    }
+
+    @Override
+    public void setup(
+            StreamTask<?, ?> containingTask,
+            StreamConfig config,
+            Output<StreamRecord<Event>> output) {
+        super.setup(containingTask, config, output);
+        this.toCoordinator = 
containingTask.getEnvironment().getOperatorCoordinatorEventGateway();
+    }
+
+    @Override
+    public void open() throws Exception {
+        super.open();
+        this.schemaOperatorMetrics =
+                new SchemaOperatorMetrics(
+                        getRuntimeContext().getMetricGroup(), 
schemaChangeBehavior);
+        this.subTaskId = 
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask();
+        this.originalSchemaMap = new HashMap<>();
+        this.evolvedSchemaMap = new HashMap<>();
+        this.router = new TableIdRouter(routingRules);
+        this.derivator = new SchemaDerivator();
+    }
+
+    /**
+     * This method is guaranteed to not be called concurrently with other 
methods of the operator.
+     */
+    @Override
+    public void processElement(StreamRecord<Event> streamRecord) throws 
Exception {
+        Event event = streamRecord.getValue();
+        if (event instanceof SchemaChangeEvent) {
+            handleSchemaChangeEvent((SchemaChangeEvent) event);
+        } else if (event instanceof DataChangeEvent) {
+            handleDataChangeEvent((DataChangeEvent) event);
+        } else {
+            throw new RuntimeException("Unknown event type in Stream record: " 
+ event);
+        }
+    }
+
+    private void handleSchemaChangeEvent(SchemaChangeEvent originalEvent) 
throws Exception {
+        // First, update original schema map unconditionally and it will never 
fail
+        TableId tableId = originalEvent.tableId();
+        originalSchemaMap.compute(
+                tableId,
+                (tId, schema) -> SchemaUtils.applySchemaChangeEvent(schema, 
originalEvent));
+        schemaOperatorMetrics.increaseSchemaChangeEvents(1);
+
+        // First, send FlushEvent or it might be blocked later
+        LOG.info("{}> Sending the FlushEvent.", subTaskId);
+        output.collect(new StreamRecord<>(new FlushEvent(subTaskId)));
+
+        // Then, queue to request schema change to SchemaCoordinator.
+        SchemaChangeResponse response = requestSchemaChange(tableId, 
originalEvent);
+
+        if (response.isSuccess()) {
+            LOG.info("{}> Successfully requested schema change.", subTaskId);
+            LOG.info(
+                    "{}> Finished schema change events: {}",
+                    subTaskId,
+                    response.getAppliedSchemaChangeEvents());
+            LOG.info("{}> Refreshed evolved schemas: {}", subTaskId, 
response.getEvolvedSchemas());
+
+            // After this request got successfully applied to DBMS, we can...
+            List<SchemaChangeEvent> finishedSchemaChangeEvents =
+                    response.getAppliedSchemaChangeEvents();
+
+            // Update local evolved schema map's cache
+            evolvedSchemaMap.putAll(response.getEvolvedSchemas());
+
+            // and emit the finished event to downstream
+            for (SchemaChangeEvent finishedEvent : finishedSchemaChangeEvents) 
{
+                output.collect(new StreamRecord<>(finishedEvent));
+            }
+
+            schemaOperatorMetrics.increaseFinishedSchemaChangeEvents(
+                    finishedSchemaChangeEvents.size());
+        } else if (response.isDuplicate()) {
+            LOG.info(
+                    "{}> Schema change event {} has been handled in another 
subTask already.",
+                    subTaskId,
+                    originalEvent);
+
+            schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(1);
+        } else if (response.isIgnored()) {
+            LOG.info(
+                    "{}> Schema change event {} has been ignored. No schema 
evolution needed.",
+                    subTaskId,
+                    originalEvent);
+
+            schemaOperatorMetrics.increaseIgnoredSchemaChangeEvents(1);
+        } else {
+            throw new IllegalStateException("Unexpected response status: " + 
response);
+        }
+    }
+
+    private void handleDataChangeEvent(DataChangeEvent dataChangeEvent) {
+        TableId tableId = dataChangeEvent.tableId();
+
+        // First, we obtain the original schema corresponding to this data 
change event
+        Schema originalSchema = 
originalSchemaMap.get(dataChangeEvent.tableId());
+
+        // Then, for each routing terminus, coerce data records to the 
expected schema
+        for (TableId sinkTableId : router.route(tableId)) {
+            Schema evolvedSchema = evolvedSchemaMap.get(sinkTableId);
+            DataChangeEvent coercedDataRecord =
+                    derivator
+                            .coerceDataRecord(
+                                    timezone,
+                                    DataChangeEvent.route(dataChangeEvent, 
sinkTableId),
+                                    originalSchema,
+                                    evolvedSchema)
+                            .orElseThrow(
+                                    () ->
+                                            new IllegalStateException(
+                                                    String.format(
+                                                            "Unable to coerce 
data record from %s (schema: %s) to %s (schema: %s)",
+                                                            tableId,
+                                                            originalSchema,
+                                                            sinkTableId,
+                                                            evolvedSchema)));
+            output.collect(new StreamRecord<>(coercedDataRecord));
+        }
+    }
+
+    private SchemaChangeResponse requestSchemaChange(
+            TableId tableId, SchemaChangeEvent schemaChangeEvent)
+            throws InterruptedException, TimeoutException {
+        long deadline = System.currentTimeMillis() + rpcTimeout.toMillis();
+        while (true) {
+            SchemaChangeResponse response =
+                    sendRequestToCoordinator(
+                            new SchemaChangeRequest(tableId, 
schemaChangeEvent, subTaskId));
+            if (System.currentTimeMillis() < deadline) {
+                if (response.isRegistryBusy()) {
+                    LOG.info(
+                            "{}> Schema Registry is busy now, waiting for next 
request...",
+                            subTaskId);
+                    Thread.sleep(1000);
+                } else if (response.isWaitingForFlush()) {
+                    LOG.info(
+                            "{}> Schema change event has not collected enough 
flush success events from writers, waiting...",
+                            subTaskId);
+                    Thread.sleep(1000);
+                } else {
+                    return response;
+                }
+            } else {
+                throw new TimeoutException("Timeout when requesting schema 
change.");
+            }
+        }
+    }
+
+    private <REQUEST extends CoordinationRequest, RESPONSE extends 
CoordinationResponse>
+            RESPONSE sendRequestToCoordinator(REQUEST request) {
+        try {
+            CompletableFuture<CoordinationResponse> responseFuture =
+                    toCoordinator.sendRequestToCoordinator(
+                            getOperatorID(), new SerializedValue<>(request));
+            return CoordinationResponseUtils.unwrap(
+                    responseFuture.get(rpcTimeout.toMillis(), 
TimeUnit.MILLISECONDS));
+        } catch (Exception e) {
+            throw new IllegalStateException(
+                    "Failed to send request to coordinator: " + 
request.toString(), e);
+        }
+    }
+
+    /** Visible for mocking in test cases. */
+    @VisibleForTesting
+    protected int getCurrentTimestamp() {
+        return (int) Instant.now().getEpochSecond();
+    }
+
+    @VisibleForTesting
+    public void registerInitialSchema(TableId tableId, Schema schema) {
+        originalSchemaMap.put(tableId, schema);
+        evolvedSchemaMap.put(tableId, schema);
+    }
+}
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
similarity index 84%
rename from 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
rename to 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
index 367f65597..630acc904 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorFactory.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorFactory.java
@@ -15,14 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.operators.schema;
+package org.apache.flink.cdc.runtime.operators.schema.regular;
 
 import org.apache.flink.cdc.common.annotation.Internal;
 import org.apache.flink.cdc.common.event.Event;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.route.RouteRule;
 import org.apache.flink.cdc.common.sink.MetadataApplier;
-import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryProvider;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
 import org.apache.flink.streaming.api.operators.CoordinatedOperatorFactory;
@@ -42,23 +41,30 @@ public class SchemaOperatorFactory extends 
SimpleOperatorFactory<Event>
     private final MetadataApplier metadataApplier;
     private final List<RouteRule> routingRules;
     private final SchemaChangeBehavior schemaChangeBehavior;
+    private final Duration rpcTimeout;
 
     public SchemaOperatorFactory(
             MetadataApplier metadataApplier,
             List<RouteRule> routingRules,
-            Duration rpcTimeOut,
+            Duration rpcTimeout,
             SchemaChangeBehavior schemaChangeBehavior,
             String timezone) {
-        super(new SchemaOperator(routingRules, rpcTimeOut, 
schemaChangeBehavior, timezone));
+        super(new SchemaOperator(routingRules, rpcTimeout, 
schemaChangeBehavior, timezone));
         this.metadataApplier = metadataApplier;
         this.routingRules = routingRules;
         this.schemaChangeBehavior = schemaChangeBehavior;
+        this.rpcTimeout = rpcTimeout;
     }
 
     @Override
     public OperatorCoordinator.Provider getCoordinatorProvider(
             String operatorName, OperatorID operatorID) {
-        return new SchemaRegistryProvider(
-                operatorID, operatorName, metadataApplier, routingRules, 
schemaChangeBehavior);
+        return new SchemaCoordinatorProvider(
+                operatorID,
+                operatorName,
+                metadataApplier,
+                routingRules,
+                schemaChangeBehavior,
+                rpcTimeout);
     }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeRequest.java
similarity index 79%
rename from 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
rename to 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeRequest.java
index fbc5e9c40..b2b3336b0 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeRequest.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeRequest.java
@@ -15,18 +15,19 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.operators.schema.event;
+package org.apache.flink.cdc.runtime.operators.schema.regular.event;
 
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
 import org.apache.flink.cdc.common.event.TableId;
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator;
+import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
 import org.apache.flink.runtime.operators.coordination.CoordinationRequest;
 
 import java.util.Objects;
 
 /**
- * The request from {@link SchemaOperator} to {@link SchemaRegistry} to 
request to change schemas.
+ * The request from {@link SchemaOperator} to {@link SchemaCoordinator} to 
request to change
+ * schemas.
  */
 public class SchemaChangeRequest implements CoordinationRequest {
 
@@ -34,8 +35,10 @@ public class SchemaChangeRequest implements 
CoordinationRequest {
 
     /** The sender of the request. */
     private final TableId tableId;
+
     /** The schema changes. */
     private final SchemaChangeEvent schemaChangeEvent;
+
     /** The ID of subTask that initiated the request. */
     private final int subTaskId;
 
@@ -76,4 +79,16 @@ public class SchemaChangeRequest implements 
CoordinationRequest {
     public int hashCode() {
         return Objects.hash(tableId, schemaChangeEvent, subTaskId);
     }
+
+    @Override
+    public String toString() {
+        return "SchemaChangeRequest{"
+                + "tableId="
+                + tableId
+                + ", schemaChangeEvent="
+                + schemaChangeEvent
+                + ", subTaskId="
+                + subTaskId
+                + '}';
+    }
 }
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java
similarity index 58%
rename from 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
rename to 
flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java
index 63d57139b..282557552 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/event/SchemaChangeResponse.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/regular/event/SchemaChangeResponse.java
@@ -15,56 +15,73 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.operators.schema.event;
+package org.apache.flink.cdc.runtime.operators.schema.regular.event;
 
 import org.apache.flink.cdc.common.event.SchemaChangeEvent;
-import org.apache.flink.cdc.runtime.operators.schema.SchemaOperator;
-import 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry;
+import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.schema.Schema;
+import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaCoordinator;
+import org.apache.flink.cdc.runtime.operators.schema.regular.SchemaOperator;
 import org.apache.flink.runtime.operators.coordination.CoordinationResponse;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
 /**
- * The response for {@link SchemaChangeRequest} from {@link SchemaRegistry} to 
{@link
+ * The response for {@link SchemaChangeRequest} from {@link SchemaCoordinator} 
to {@link
  * SchemaOperator}.
  */
 public class SchemaChangeResponse implements CoordinationResponse {
     private static final long serialVersionUID = 1L;
 
     /**
-     * Whether the SchemaOperator need to buffer data and the 
SchemaOperatorCoordinator need to wait
-     * for flushing.
+     * Actually finished schema change events. This will only be effective if 
status is {@code
+     * accepted}.
      */
-    private final List<SchemaChangeEvent> schemaChangeEvents;
+    private final List<SchemaChangeEvent> appliedSchemaChangeEvents;
+
+    private final Map<TableId, Schema> evolvedSchemas;
 
     private final ResponseCode responseCode;
 
-    public static SchemaChangeResponse accepted(List<SchemaChangeEvent> 
schemaChangeEvents) {
-        return new SchemaChangeResponse(schemaChangeEvents, 
ResponseCode.ACCEPTED);
+    public static SchemaChangeResponse success(
+            List<SchemaChangeEvent> schemaChangeEvents, Map<TableId, Schema> 
evolvedSchemas) {
+        return new SchemaChangeResponse(ResponseCode.SUCCESS, 
schemaChangeEvents, evolvedSchemas);
     }
 
     public static SchemaChangeResponse busy() {
-        return new SchemaChangeResponse(Collections.emptyList(), 
ResponseCode.BUSY);
+        return new SchemaChangeResponse(ResponseCode.BUSY);
     }
 
     public static SchemaChangeResponse duplicate() {
-        return new SchemaChangeResponse(Collections.emptyList(), 
ResponseCode.DUPLICATE);
+        return new SchemaChangeResponse(ResponseCode.DUPLICATE);
     }
 
     public static SchemaChangeResponse ignored() {
-        return new SchemaChangeResponse(Collections.emptyList(), 
ResponseCode.IGNORED);
+        return new SchemaChangeResponse(ResponseCode.IGNORED);
+    }
+
+    public static SchemaChangeResponse waitingForFlush() {
+        return new SchemaChangeResponse(ResponseCode.WAITING_FOR_FLUSH);
+    }
+
+    private SchemaChangeResponse(ResponseCode responseCode) {
+        this(responseCode, Collections.emptyList(), Collections.emptyMap());
     }
 
     private SchemaChangeResponse(
-            List<SchemaChangeEvent> schemaChangeEvents, ResponseCode 
responseCode) {
-        this.schemaChangeEvents = schemaChangeEvents;
+            ResponseCode responseCode,
+            List<SchemaChangeEvent> appliedSchemaChangeEvents,
+            Map<TableId, Schema> evolvedSchemas) {
         this.responseCode = responseCode;
+        this.appliedSchemaChangeEvents = appliedSchemaChangeEvents;
+        this.evolvedSchemas = evolvedSchemas;
     }
 
-    public boolean isAccepted() {
-        return ResponseCode.ACCEPTED.equals(responseCode);
+    public boolean isSuccess() {
+        return ResponseCode.SUCCESS.equals(responseCode);
     }
 
     public boolean isRegistryBusy() {
@@ -79,8 +96,16 @@ public class SchemaChangeResponse implements 
CoordinationResponse {
         return ResponseCode.IGNORED.equals(responseCode);
     }
 
-    public List<SchemaChangeEvent> getSchemaChangeEvents() {
-        return schemaChangeEvents;
+    public boolean isWaitingForFlush() {
+        return ResponseCode.WAITING_FOR_FLUSH.equals(responseCode);
+    }
+
+    public List<SchemaChangeEvent> getAppliedSchemaChangeEvents() {
+        return appliedSchemaChangeEvents;
+    }
+
+    public Map<TableId, Schema> getEvolvedSchemas() {
+        return evolvedSchemas;
     }
 
     @Override
@@ -92,20 +117,20 @@ public class SchemaChangeResponse implements 
CoordinationResponse {
             return false;
         }
         SchemaChangeResponse response = (SchemaChangeResponse) o;
-        return Objects.equals(schemaChangeEvents, response.schemaChangeEvents)
+        return Objects.equals(appliedSchemaChangeEvents, 
response.appliedSchemaChangeEvents)
                 && responseCode == response.responseCode;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(schemaChangeEvents, responseCode);
+        return Objects.hash(appliedSchemaChangeEvents, responseCode);
     }
 
     @Override
     public String toString() {
         return "SchemaChangeResponse{"
                 + "schemaChangeEvents="
-                + schemaChangeEvents
+                + appliedSchemaChangeEvents
                 + ", responseCode="
                 + responseCode
                 + '}';
@@ -126,9 +151,10 @@ public class SchemaChangeResponse implements 
CoordinationResponse {
      * required. Possibly caused by LENIENT mode or merging table strategies.
      */
     public enum ResponseCode {
-        ACCEPTED,
+        SUCCESS,
         BUSY,
         DUPLICATE,
-        IGNORED
+        IGNORED,
+        WAITING_FOR_FLUSH
     }
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java
similarity index 95%
rename from 
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
rename to 
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java
index e1f6a94c9..53895d56c 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaEvolveTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaEvolveTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.operators.schema;
+package org.apache.flink.cdc.runtime.operators.schema.regular;
 
 import org.apache.flink.cdc.common.data.RecordData;
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
@@ -30,6 +30,7 @@ import org.apache.flink.cdc.common.event.RenameColumnEvent;
 import org.apache.flink.cdc.common.event.SchemaChangeEventType;
 import org.apache.flink.cdc.common.event.SchemaChangeEventTypeFamily;
 import org.apache.flink.cdc.common.event.TableId;
+import org.apache.flink.cdc.common.exceptions.SchemaEvolveException;
 import 
org.apache.flink.cdc.common.exceptions.UnsupportedSchemaChangeEventException;
 import org.apache.flink.cdc.common.pipeline.SchemaChangeBehavior;
 import org.apache.flink.cdc.common.schema.Column;
@@ -37,9 +38,10 @@ import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataType;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
-import 
org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
+import 
org.apache.flink.cdc.runtime.testutils.operators.RegularEventOperatorTestHarness;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.FlinkRuntimeException;
 
 import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableMap;
 import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
@@ -53,6 +55,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.ExecutionException;
 import java.util.stream.Collectors;
 
 /** Unit tests for the {@link SchemaOperator} to handle evolved schema. */
@@ -85,8 +88,9 @@ public class SchemaEvolveTest {
 
         SchemaOperator schemaOperator =
                 new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
-        EventOperatorTestHarness<SchemaOperator, Event> harness =
-                new EventOperatorTestHarness<>(schemaOperator, 17, 
Duration.ofSeconds(3), behavior);
+        RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+                RegularEventOperatorTestHarness.withDurationAndBehavior(
+                        schemaOperator, 17, Duration.ofSeconds(3), behavior);
         harness.open();
 
         // Test CreateTableEvent
@@ -108,7 +112,7 @@ public class SchemaEvolveTest {
 
             Assertions.assertThat(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     createAndInsertDataEvents))
                     .isEqualTo(
                             harness.getOutputRecords().stream()
@@ -168,8 +172,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
-                                    addColumnEvents));
+                                    Collections.singletonList(new 
FlushEvent(0)), addColumnEvents));
 
             Schema schemaV2 =
                     Schema.newBuilder()
@@ -227,7 +230,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     renameColumnEvents));
 
             Schema schemaV3 =
@@ -272,7 +275,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     alterColumnTypeEvents));
 
             Schema schemaV4 =
@@ -308,7 +311,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     dropColumnEvents));
 
             Schema schemaV5 =
@@ -342,8 +345,9 @@ public class SchemaEvolveTest {
 
         SchemaOperator schemaOperator =
                 new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
-        EventOperatorTestHarness<SchemaOperator, Event> harness =
-                new EventOperatorTestHarness<>(schemaOperator, 17, 
Duration.ofSeconds(3), behavior);
+        RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+                RegularEventOperatorTestHarness.withDurationAndBehavior(
+                        schemaOperator, 17, Duration.ofSeconds(3), behavior);
         harness.open();
 
         // Test CreateTableEvent
@@ -369,7 +373,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     createAndInsertDataEvents));
 
             
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -425,8 +429,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
-                                    addColumnEvents));
+                                    Collections.singletonList(new 
FlushEvent(0)), addColumnEvents));
 
             Schema schemaV2 =
                     Schema.newBuilder()
@@ -484,7 +487,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     renameColumnEvents));
 
             Schema schemaV3 =
@@ -529,7 +532,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     alterColumnTypeEvents));
 
             Schema schemaV4 =
@@ -565,7 +568,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     dropColumnEvents));
 
             Schema schemaV5 =
@@ -599,8 +602,9 @@ public class SchemaEvolveTest {
 
         SchemaOperator schemaOperator =
                 new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
-        EventOperatorTestHarness<SchemaOperator, Event> harness =
-                new EventOperatorTestHarness<>(schemaOperator, 17, 
Duration.ofSeconds(3), behavior);
+        RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+                RegularEventOperatorTestHarness.withDurationAndBehavior(
+                        schemaOperator, 17, Duration.ofSeconds(3), behavior);
         harness.open();
 
         // Test CreateTableEvent
@@ -626,7 +630,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     createAndInsertDataEvents));
 
             
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -648,45 +652,25 @@ public class SchemaEvolveTest {
                                             new 
AddColumnEvent.ColumnWithPosition(
                                                     Column.physicalColumn(
                                                             "height", DOUBLE, 
"Height data")))));
-            Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, 
addColumnEvents));
+            Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, 
addColumnEvents))
+                    .isExactlyInstanceOf(IllegalStateException.class)
+                    .cause()
+                    .isExactlyInstanceOf(ExecutionException.class)
+                    .cause()
+                    .isExactlyInstanceOf(FlinkRuntimeException.class)
+                    .hasMessage("Failed to apply schema change event.")
+                    .cause()
+                    .isExactlyInstanceOf(SchemaEvolveException.class)
+                    .extracting("applyingEvent", "exceptionMessage")
+                    .containsExactly(
+                            addColumnEvents.get(0),
+                            "Unexpected schema change events occurred in 
EXCEPTION mode. Job will fail now.");
 
             // No schema change events should be sent to downstream
-            Assertions.assertThat(harness.getOutputRecords()).isEmpty();
-        }
-
-        // Test RenameColumnEvent (expected to fail)
-        {
-            List<Event> addColumnEvents =
-                    Collections.singletonList(
-                            new RenameColumnEvent(
-                                    tableId, ImmutableMap.of("name", "namae", 
"age", "toshi")));
-            Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, 
addColumnEvents));
-
-            // No schema change events should be sent to downstream
-            Assertions.assertThat(harness.getOutputRecords()).isEmpty();
-        }
-
-        // Test AlterColumnTypeEvent (expected to fail)
-        {
-            List<Event> addColumnEvents =
-                    Collections.singletonList(
-                            new AlterColumnTypeEvent(
-                                    tableId, ImmutableMap.of("score", BIGINT, 
"toshi", FLOAT)));
-            Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, 
addColumnEvents));
-
-            // No schema change events should be sent to downstream
-            Assertions.assertThat(harness.getOutputRecords()).isEmpty();
-        }
-
-        // Test DropColumnEvent (expected to fail)
-        {
-            List<Event> addColumnEvents =
-                    Collections.singletonList(
-                            new DropColumnEvent(tableId, 
Arrays.asList("score", "height")));
-            Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, 
addColumnEvents));
-
-            // No schema change events should be sent to downstream
-            Assertions.assertThat(harness.getOutputRecords()).isEmpty();
+            Assertions.assertThat(harness.getOutputRecords())
+                    .map(StreamRecord::getValue)
+                    .map(e -> e.getClass().getName())
+                    
.containsExactly("org.apache.flink.cdc.common.event.FlushEvent");
         }
 
         harness.close();
@@ -708,8 +692,9 @@ public class SchemaEvolveTest {
 
         SchemaOperator schemaOperator =
                 new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
-        EventOperatorTestHarness<SchemaOperator, Event> harness =
-                new EventOperatorTestHarness<>(schemaOperator, 17, 
Duration.ofSeconds(3), behavior);
+        RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+                RegularEventOperatorTestHarness.withDurationAndBehavior(
+                        schemaOperator, 17, Duration.ofSeconds(3), behavior);
         harness.open();
 
         // Test CreateTableEvent
@@ -735,7 +720,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     createAndInsertDataEvents));
 
             
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -787,7 +772,7 @@ public class SchemaEvolveTest {
 
             List<Event> expectedEvents =
                     Arrays.asList(
-                            new FlushEvent(tableId),
+                            new FlushEvent(0),
                             DataChangeEvent.insertEvent(
                                     tableId,
                                     buildRecord(INT, 4, STRING, "Derrida", 
SMALLINT, (short) 20)),
@@ -855,7 +840,7 @@ public class SchemaEvolveTest {
 
             List<Event> expectedEvents =
                     Arrays.asList(
-                            new FlushEvent(tableId),
+                            new FlushEvent(0),
                             DataChangeEvent.insertEvent(
                                     tableId,
                                     buildRecord(INT, 6, STRING, null, 
SMALLINT, (short) 22)),
@@ -904,7 +889,7 @@ public class SchemaEvolveTest {
 
             List<Event> expectedEvents =
                     Arrays.asList(
-                            new FlushEvent(tableId),
+                            new FlushEvent(0),
                             DataChangeEvent.insertEvent(
                                     tableId, buildRecord(INT, 8, STRING, null, 
SMALLINT, null)),
                             DataChangeEvent.insertEvent(
@@ -945,7 +930,7 @@ public class SchemaEvolveTest {
 
             List<Event> expectedEvents =
                     Arrays.asList(
-                            new FlushEvent(tableId),
+                            new FlushEvent(0),
                             DataChangeEvent.insertEvent(
                                     tableId, buildRecord(INT, 12, STRING, 
null, DOUBLE, null)),
                             DataChangeEvent.insertEvent(
@@ -987,8 +972,8 @@ public class SchemaEvolveTest {
 
         SchemaOperator schemaOperator =
                 new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
-        EventOperatorTestHarness<SchemaOperator, Event> harness =
-                new EventOperatorTestHarness<>(
+        RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+                
RegularEventOperatorTestHarness.withDurationAndFineGrainedBehaviorWithError(
                         schemaOperator,
                         17,
                         Duration.ofSeconds(3),
@@ -1021,7 +1006,7 @@ public class SchemaEvolveTest {
                                 .collect(Collectors.toList()))
                 .isEqualTo(
                         ListUtils.union(
-                                Collections.singletonList(new 
FlushEvent(tableId)),
+                                Collections.singletonList(new FlushEvent(0)),
                                 createAndInsertDataEvents));
 
         
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -1040,7 +1025,19 @@ public class SchemaEvolveTest {
                                         new AddColumnEvent.ColumnWithPosition(
                                                 Column.physicalColumn(
                                                         "height", DOUBLE, 
"Height data")))));
-        processEvent(schemaOperator, addColumnEvents);
+        Assertions.assertThatThrownBy(() -> processEvent(schemaOperator, 
addColumnEvents))
+                .isExactlyInstanceOf(IllegalStateException.class)
+                .cause()
+                .isExactlyInstanceOf(ExecutionException.class)
+                .cause()
+                .isExactlyInstanceOf(FlinkRuntimeException.class)
+                .hasMessage("Failed to apply schema change event.")
+                .cause()
+                
.isExactlyInstanceOf(UnsupportedSchemaChangeEventException.class)
+                .extracting("applyingEvent", "exceptionMessage")
+                .containsExactly(
+                        addColumnEvents.get(0), "Sink doesn't support such 
schema change event.");
+
         Assertions.assertThat(harness.isJobFailed()).isEqualTo(true);
         Assertions.assertThat(harness.getJobFailureCause())
                 .cause()
@@ -1072,8 +1069,8 @@ public class SchemaEvolveTest {
 
         // All types of schema change events will be sent to the sink
         // AddColumn and RenameColumn events will always fail
-        EventOperatorTestHarness<SchemaOperator, Event> harness =
-                new EventOperatorTestHarness<>(
+        RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+                
RegularEventOperatorTestHarness.withDurationAndFineGrainedBehaviorWithError(
                         schemaOperator,
                         17,
                         Duration.ofSeconds(3),
@@ -1108,7 +1105,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     createAndInsertDataEvents));
 
             
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -1159,7 +1156,7 @@ public class SchemaEvolveTest {
             processEvent(schemaOperator, addColumnEvents);
 
             List<Event> expectedEvents = new ArrayList<>();
-            expectedEvents.add(new FlushEvent(tableId));
+            expectedEvents.add(new FlushEvent(0));
             expectedEvents.addAll(addColumnEvents);
 
             Assertions.assertThat(
@@ -1221,7 +1218,7 @@ public class SchemaEvolveTest {
             processEvent(schemaOperator, renameColumnEvents);
 
             List<Event> expectedEvents = new ArrayList<>();
-            expectedEvents.add(new FlushEvent(tableId));
+            expectedEvents.add(new FlushEvent(0));
             expectedEvents.addAll(renameColumnEvents);
 
             Assertions.assertThat(
@@ -1266,7 +1263,7 @@ public class SchemaEvolveTest {
 
             List<Event> expectedEvents =
                     Arrays.asList(
-                            new FlushEvent(tableId),
+                            new FlushEvent(0),
                             DataChangeEvent.insertEvent(
                                     tableId,
                                     buildRecord(
@@ -1324,7 +1321,7 @@ public class SchemaEvolveTest {
 
             List<Event> expectedEvents =
                     Arrays.asList(
-                            new FlushEvent(tableId),
+                            new FlushEvent(0),
                             DataChangeEvent.insertEvent(
                                     tableId,
                                     buildRecord(
@@ -1384,8 +1381,8 @@ public class SchemaEvolveTest {
 
         // All types of schema change events will be sent to the sink
         // AddColumn and RenameColumn events will always fail
-        EventOperatorTestHarness<SchemaOperator, Event> harness =
-                new EventOperatorTestHarness<>(
+        RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+                
RegularEventOperatorTestHarness.withDurationAndFineGrainedBehavior(
                         schemaOperator,
                         17,
                         Duration.ofSeconds(3),
@@ -1420,7 +1417,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     createAndInsertDataEvents));
 
             
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
@@ -1471,7 +1468,8 @@ public class SchemaEvolveTest {
             processEvent(schemaOperator, addColumnEvents);
 
             List<Event> expectedEvents = new ArrayList<>();
-            expectedEvents.add(new FlushEvent(tableId));
+            expectedEvents.add(new FlushEvent(0));
+
             expectedEvents.addAll(addColumnEvents);
 
             Assertions.assertThat(
@@ -1533,7 +1531,7 @@ public class SchemaEvolveTest {
             processEvent(schemaOperator, renameColumnEvents);
 
             List<Event> expectedEvents = new ArrayList<>();
-            expectedEvents.add(new FlushEvent(tableId));
+            expectedEvents.add(new FlushEvent(0));
             expectedEvents.addAll(renameColumnEvents);
 
             Assertions.assertThat(
@@ -1578,7 +1576,7 @@ public class SchemaEvolveTest {
 
             List<Event> expectedEvents =
                     Arrays.asList(
-                            new FlushEvent(tableId),
+                            new FlushEvent(0),
                             DataChangeEvent.insertEvent(
                                     tableId,
                                     buildRecord(
@@ -1634,9 +1632,11 @@ public class SchemaEvolveTest {
 
             processEvent(schemaOperator, dropColumnEvents);
 
+            FlushEvent result;
+            result = new FlushEvent(0);
             List<Event> expectedEvents =
                     Arrays.asList(
-                            new FlushEvent(tableId),
+                            result,
                             DataChangeEvent.insertEvent(
                                     tableId,
                                     buildRecord(
@@ -1693,8 +1693,9 @@ public class SchemaEvolveTest {
 
         SchemaOperator schemaOperator =
                 new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
-        EventOperatorTestHarness<SchemaOperator, Event> harness =
-                new EventOperatorTestHarness<>(schemaOperator, 17, 
Duration.ofSeconds(3), behavior);
+        RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+                RegularEventOperatorTestHarness.withDurationAndBehavior(
+                        schemaOperator, 17, Duration.ofSeconds(3), behavior);
         harness.open();
 
         // Test CreateTableEvent
@@ -1714,14 +1715,17 @@ public class SchemaEvolveTest {
 
             processEvent(schemaOperator, createAndInsertDataEvents);
 
+            FlushEvent result;
+            TableId tableId1 = tableId;
+            Event schemaChangeEvent = createAndInsertDataEvents.get(0);
+            result = new FlushEvent(0);
             Assertions.assertThat(
                             harness.getOutputRecords().stream()
                                     .map(StreamRecord::getValue)
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
-                                    createAndInsertDataEvents));
+                                    Collections.singletonList(result), 
createAndInsertDataEvents));
 
             
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
             
Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV1);
@@ -1776,8 +1780,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
-                                    addColumnEvents));
+                                    Collections.singletonList(new 
FlushEvent(0)), addColumnEvents));
 
             Schema schemaV2 =
                     Schema.newBuilder()
@@ -1878,14 +1881,18 @@ public class SchemaEvolveTest {
                                             SMALLINT,
                                             (short) 23)));
 
+            FlushEvent result;
+            int subTaskId = 0;
+            TableId tableId1 = tableId;
+            Event schemaChangeEvent = renameColumnEvents.get(0);
+            result = new FlushEvent(subTaskId);
             Assertions.assertThat(
                             harness.getOutputRecords().stream()
                                     .map(StreamRecord::getValue)
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
-                                    lenientRenameColumnEvents));
+                                    Collections.singletonList(result), 
lenientRenameColumnEvents));
 
             Schema schemaV3 =
                     Schema.newBuilder()
@@ -1959,7 +1966,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     lenientAlterColumnTypeEvents));
 
             Schema schemaV4 =
@@ -2018,7 +2025,10 @@ public class SchemaEvolveTest {
                             harness.getOutputRecords().stream()
                                     .map(StreamRecord::getValue)
                                     .collect(Collectors.toList()))
-                    .isEqualTo(lenientDropColumnEvents);
+                    .isEqualTo(
+                            ListUtils.union(
+                                    Collections.singletonList(new 
FlushEvent(0)),
+                                    lenientDropColumnEvents));
 
             Schema schemaV5 =
                     Schema.newBuilder()
@@ -2063,8 +2073,9 @@ public class SchemaEvolveTest {
 
         SchemaOperator schemaOperator =
                 new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30), 
behavior);
-        EventOperatorTestHarness<SchemaOperator, Event> harness =
-                new EventOperatorTestHarness<>(schemaOperator, 17, 
Duration.ofSeconds(3), behavior);
+        RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+                RegularEventOperatorTestHarness.withDurationAndBehavior(
+                        schemaOperator, 17, Duration.ofSeconds(3), behavior);
         harness.open();
 
         // Test CreateTableEvent
@@ -2094,14 +2105,17 @@ public class SchemaEvolveTest {
 
             processEvent(schemaOperator, createAndInsertDataEvents);
 
+            FlushEvent result;
+            TableId tableId1 = tableId;
+            Event schemaChangeEvent = createAndInsertDataEvents.get(0);
+            result = new FlushEvent(0);
             Assertions.assertThat(
                             harness.getOutputRecords().stream()
                                     .map(StreamRecord::getValue)
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
-                                    createAndInsertDataEvents));
+                                    Collections.singletonList(result), 
createAndInsertDataEvents));
 
             
Assertions.assertThat(harness.getLatestOriginalSchema(tableId)).isEqualTo(schemaV1);
             
Assertions.assertThat(harness.getLatestEvolvedSchema(tableId)).isEqualTo(schemaV1);
@@ -2142,7 +2156,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     lenientDropColumnEvents));
 
             Schema schemaV2 =
@@ -2257,7 +2271,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     lenientAddColumnEvents));
 
             Schema schemaV3 =
@@ -2375,7 +2389,7 @@ public class SchemaEvolveTest {
                                     .collect(Collectors.toList()))
                     .isEqualTo(
                             ListUtils.union(
-                                    Collections.singletonList(new 
FlushEvent(tableId)),
+                                    Collections.singletonList(new 
FlushEvent(0)),
                                     lenientRenameColumnEvents));
 
             Schema schemaV4 =
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java
similarity index 73%
rename from 
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java
rename to 
flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java
index eb45a2531..9feaf29d3 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/SchemaOperatorTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/regular/SchemaOperatorTest.java
@@ -15,7 +15,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.cdc.runtime.operators.schema;
+package org.apache.flink.cdc.runtime.operators.schema.regular;
 
 import org.apache.flink.cdc.common.data.binary.BinaryStringData;
 import org.apache.flink.cdc.common.event.CreateTableEvent;
@@ -26,13 +26,13 @@ import org.apache.flink.cdc.common.schema.Schema;
 import org.apache.flink.cdc.common.types.DataTypes;
 import org.apache.flink.cdc.common.types.RowType;
 import org.apache.flink.cdc.runtime.serializer.event.EventSerializer;
-import 
org.apache.flink.cdc.runtime.testutils.operators.EventOperatorTestHarness;
+import 
org.apache.flink.cdc.runtime.testutils.operators.RegularEventOperatorTestHarness;
 import org.apache.flink.cdc.runtime.typeutils.BinaryRecordDataGenerator;
 import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 
-import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.time.Duration;
@@ -46,6 +46,8 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatCode;
 
 /** Unit tests for the {@link SchemaOperator}. */
 public class SchemaOperatorTest {
@@ -67,14 +69,21 @@ public class SchemaOperatorTest {
         final OperatorID opID = new OperatorID();
         final TableId tableId = TableId.tableId("testProcessElement");
         final RowType rowType = DataTypes.ROW(DataTypes.BIGINT(), 
DataTypes.STRING());
+        final Schema schema =
+                Schema.newBuilder()
+                        .physicalColumn("id", DataTypes.BIGINT())
+                        .physicalColumn("name", DataTypes.STRING())
+                        .build();
 
         List<OneInputStreamOperatorTestHarness<Event, Event>> testHarnesses = 
new ArrayList<>();
         for (int subtaskIndex = 0; subtaskIndex < parallelism; subtaskIndex++) 
{
+            SchemaOperator operator = new SchemaOperator(new ArrayList<>());
             OneInputStreamOperatorTestHarness<Event, Event> testHarness =
-                    createTestHarness(maxParallelism, parallelism, 
subtaskIndex, opID);
+                    createTestHarness(maxParallelism, parallelism, 
subtaskIndex, opID, operator);
             testHarnesses.add(testHarness);
             testHarness.setup(EventSerializer.INSTANCE);
             testHarness.open();
+            operator.registerInitialSchema(tableId, schema);
 
             Map<String, String> meta = new HashMap<>();
             meta.put("subtask", String.valueOf(subtaskIndex));
@@ -114,15 +123,18 @@ public class SchemaOperatorTest {
     void testProcessSchemaChangeEventWithTimeOut() throws Exception {
         SchemaOperator schemaOperator =
                 new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(1));
-        EventOperatorTestHarness<SchemaOperator, Event> harness =
-                new EventOperatorTestHarness<>(schemaOperator, 1, 
Duration.ofSeconds(3));
+        RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+                RegularEventOperatorTestHarness.withDuration(
+                        schemaOperator, 1, Duration.ofSeconds(3));
         harness.open();
-        Assertions.assertThrowsExactly(
-                TimeoutException.class,
-                () ->
-                        schemaOperator.processElement(
-                                new StreamRecord<>(
-                                        new CreateTableEvent(CUSTOMERS, 
CUSTOMERS_SCHEMA))));
+        assertThatThrownBy(
+                        () ->
+                                schemaOperator.processElement(
+                                        new StreamRecord<>(
+                                                new 
CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA))))
+                .isExactlyInstanceOf(IllegalStateException.class)
+                .cause()
+                .isExactlyInstanceOf(TimeoutException.class);
         harness.close();
     }
 
@@ -130,22 +142,28 @@ public class SchemaOperatorTest {
     void testProcessSchemaChangeEventWithOutTimeOut() throws Exception {
         SchemaOperator schemaOperator =
                 new SchemaOperator(new ArrayList<>(), Duration.ofSeconds(30));
-        EventOperatorTestHarness<SchemaOperator, Event> harness =
-                new EventOperatorTestHarness<>(schemaOperator, 1, 
Duration.ofSeconds(3));
+        RegularEventOperatorTestHarness<SchemaOperator, Event> harness =
+                RegularEventOperatorTestHarness.withDuration(
+                        schemaOperator, 1, Duration.ofSeconds(3));
         harness.open();
-        Assertions.assertDoesNotThrow(
-                () ->
-                        schemaOperator.processElement(
-                                new StreamRecord<>(
-                                        new CreateTableEvent(CUSTOMERS, 
CUSTOMERS_SCHEMA))));
+        assertThatCode(
+                        () ->
+                                schemaOperator.processElement(
+                                        new StreamRecord<>(
+                                                new 
CreateTableEvent(CUSTOMERS, CUSTOMERS_SCHEMA))))
+                .doesNotThrowAnyException();
         harness.close();
     }
 
     private OneInputStreamOperatorTestHarness<Event, Event> createTestHarness(
-            int maxParallelism, int parallelism, int subtaskIndex, OperatorID 
opID)
+            int maxParallelism,
+            int parallelism,
+            int subtaskIndex,
+            OperatorID opID,
+            OneInputStreamOperator<Event, Event> operator)
             throws Exception {
         return new OneInputStreamOperatorTestHarness<>(
-                new SchemaOperator(new ArrayList<>()),
+                operator,
                 maxParallelism,
                 parallelism,
                 subtaskIndex,

Reply via email to