This is an automated email from the ASF dual-hosted git repository.

yuxiqian pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git


The following commit(s) were added to refs/heads/master by this push:
     new 3e1211e27 [FLINK-39696][runtime] Use source partition for distributed 
flush events (#4400)
3e1211e27 is described below

commit 3e1211e271ddb0335eb45a0dc16b1d83616b5e90
Author: Ran Tao <[email protected]>
AuthorDate: Mon May 18 15:00:37 2026 +0800

    [FLINK-39696][runtime] Use source partition for distributed flush events 
(#4400)
---
 .../schema/distributed/SchemaOperator.java         |  9 +++++--
 .../schema/distributed/SchemaEvolveTest.java       | 31 ++++++++++++++++++++++
 .../DistributedEventOperatorTestHarness.java       | 23 ++++++++++++----
 3 files changed, 56 insertions(+), 7 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
index 7964efd3f..114629e2d 100755
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaOperator.java
@@ -195,11 +195,16 @@ public class SchemaOperator extends 
AbstractStreamOperatorAdapter<Event>
 
     private void requestSchemaChange(
             TableId sourceTableId, SchemaChangeRequest schemaChangeRequest) {
-        LOG.info("{}> Sent FlushEvent to downstream...", subTaskId);
+        final int sourcePartition = schemaChangeRequest.getSourceSubTaskId();
+
+        LOG.info(
+                "{}> Sent FlushEvent to downstream for source partition {}...",
+                subTaskId,
+                sourcePartition);
         output.collect(
                 new StreamRecord<>(
                         new FlushEvent(
-                                subTaskId,
+                                sourcePartition,
                                 tableIdRouter.route(sourceTableId),
                                 
schemaChangeRequest.getSchemaChangeEvent().getType())));
 
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
index 4245887f2..b7424cb5d 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/operators/schema/distributed/SchemaEvolveTest.java
@@ -382,6 +382,37 @@ public class SchemaEvolveTest extends SchemaTestBase {
                         "Unexpected schema change events occurred in EXCEPTION 
mode. Job will fail now.");
     }
 
+    @Test
+    void testFlushEventUsesSourcePartitionInsteadOfSchemaOperatorSubtask() 
throws Exception {
+        CreateTableEvent createTableEvent = new CreateTableEvent(TABLE_ID, 
INITIAL_SCHEMA);
+
+        Assertions.assertThat(
+                        runInHarness(
+                                () ->
+                                        new SchemaOperator(
+                                                ROUTING_RULES,
+                                                RouteMode.ALL_MATCH,
+                                                Duration.ofMinutes(3),
+                                                SchemaChangeBehavior.LENIENT,
+                                                "UTC"),
+                                (op) ->
+                                        new 
DistributedEventOperatorTestHarness<>(
+                                                op,
+                                                20,
+                                                1,
+                                                Duration.ofSeconds(3),
+                                                Duration.ofMinutes(3)),
+                                (operator, harness) ->
+                                        
operator.processElement(wrap(createTableEvent, 0, 1))))
+                .map(StreamRecord::getValue)
+                .first()
+                .isEqualTo(
+                        new FlushEvent(
+                                0,
+                                Collections.singletonList(TABLE_ID),
+                                createTableEvent.getType()));
+    }
+
     protected static <
                     OP extends AbstractStreamOperatorAdapter<E>,
                     E extends Event,
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
index 3e81ec82b..8277dc9f1 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/testutils/operators/DistributedEventOperatorTestHarness.java
@@ -81,15 +81,26 @@ public class DistributedEventOperatorTestHarness<
     private final TestingSchemaRegistryGateway schemaRegistryGateway;
     private final LinkedList<StreamRecord<E>> outputRecords = new 
LinkedList<>();
     private final MockedOperatorCoordinatorContext mockedContext;
+    private final int subtaskIndex;
 
     public DistributedEventOperatorTestHarness(OP operator, int numOutputs) {
-        this(operator, numOutputs, Duration.ofSeconds(3), 
Duration.ofMinutes(3));
+        this(operator, numOutputs, 0, Duration.ofSeconds(3), 
Duration.ofMinutes(3));
     }
 
     public DistributedEventOperatorTestHarness(
             OP operator, int numOutputs, Duration applyDuration, Duration 
rpcTimeout) {
+        this(operator, numOutputs, 0, applyDuration, rpcTimeout);
+    }
+
+    public DistributedEventOperatorTestHarness(
+            OP operator,
+            int numOutputs,
+            int subtaskIndex,
+            Duration applyDuration,
+            Duration rpcTimeout) {
         this.operator = operator;
         this.numOutputs = numOutputs;
+        this.subtaskIndex = subtaskIndex;
         this.mockedContext =
                 new MockedOperatorCoordinatorContext(
                         SCHEMA_OPERATOR_ID, 
Thread.currentThread().getContextClassLoader());
@@ -160,7 +171,7 @@ public class DistributedEventOperatorTestHarness<
 
     private void initializeOperator() throws Exception {
         operator.setup(
-                new MockStreamTask(schemaRegistryGateway),
+                new MockStreamTask(schemaRegistryGateway, subtaskIndex),
                 new MockStreamConfig(new Configuration(), numOutputs),
                 new EventCollectingOutput<>(outputRecords, 
schemaRegistryGateway));
         schemaRegistryGateway.sendOperatorEventToCoordinator(
@@ -227,9 +238,10 @@ public class DistributedEventOperatorTestHarness<
     }
 
     private static class MockStreamTask extends StreamTask<Event, 
AbstractStreamOperator<Event>> {
-        protected MockStreamTask(TestingSchemaRegistryGateway 
schemaRegistryGateway)
+        protected MockStreamTask(
+                TestingSchemaRegistryGateway schemaRegistryGateway, int 
subtaskIndex)
                 throws Exception {
-            super(new 
SchemaRegistryCoordinatingEnvironment(schemaRegistryGateway));
+            super(new 
SchemaRegistryCoordinatingEnvironment(schemaRegistryGateway, subtaskIndex));
         }
 
         @Override
@@ -240,7 +252,8 @@ public class DistributedEventOperatorTestHarness<
         private final TestingSchemaRegistryGateway schemaRegistryGateway;
 
         public SchemaRegistryCoordinatingEnvironment(
-                TestingSchemaRegistryGateway schemaRegistryGateway) {
+                TestingSchemaRegistryGateway schemaRegistryGateway, int 
subtaskIndex) {
+            super("test-task", 2, subtaskIndex, 2);
             this.schemaRegistryGateway = schemaRegistryGateway;
         }
 

Reply via email to