[
https://issues.apache.org/jira/browse/FLINK-39696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39696:
-----------------------------------
Labels: pull-request-available (was: )
> Distributed SchemaOperator emits FlushEvent with schema operator subtaskId
> instead of upstream sourcePartition
> --------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-39696
> URL: https://issues.apache.org/jira/browse/FLINK-39696
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Reporter: Ran Tao
> Priority: Major
> Labels: pull-request-available
>
> In the distributed schema evolution topology, the source identity used by the
> schema-change path and the flush path is inconsistent.
> {*}Current behavior{*}:
> 1. DistributedPrePartitionOperator preserves the original upstream source
> partition when it broadcasts a schema change event.
> 2. Distributed SchemaOperator forwards that identity to the coordinator via
> {*}SchemaChangeRequest(sourcePartition, sinkSubTaskId, schemaChangeEvent){*}.
> 3. SchemaCoordinator also deduplicates distributed schema change events by
> (sourcePartition, schemaChangeEvent).
> 4. However, when SchemaOperator emits the downstream FlushEvent, it currently
> uses the SchemaOperator subtask id instead of the original source partition.
> As a result, the same distributed schema change is identified by different
> keys in different parts of the pipeline:
> * SchemaChangeRequest / coordinator deduplication use the original source
> partition
> * FlushEvent / FlushSuccessEvent use the SchemaOperator subtask id
> This makes the flush lineage inconsistent with the schema-change lineage in
> distributed topology, and duplicated broadcast branches of one upstream
> schema change cannot be consistently aligned by the original source partition.
> {*}Expected behavior{*}:
> FlushEvent in distributed SchemaOperator should carry the original upstream
> source partition, i.e. the same source identity already used in
> SchemaChangeRequest.
> {*}Suggested fix{*}:
> In
> {*}org.apache.flink.cdc.runtime.operators.schema.distributed.SchemaOperator#requestSchemaChange{*},
> construct FlushEvent with schemaChangeRequest.getSourceSubTaskId() instead
> of the current SchemaOperator subtask id.
> A simple regression test can reproduce the issue:
> * source partition = 0
> * distributed SchemaOperator subtask = 1
> * process a CreateTableEvent
> * the first emitted FlushEvent should use 0, not 1
--
This message was sent by Atlassian Jira
(v8.20.10#820010)