[
https://issues.apache.org/jira/browse/FLINK-38218?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Sergei Morozov updated FLINK-38218:
-----------------------------------
Description:
{*}Steps to reproduce{*}:
# Prepare tables {{{}A{}}}, {{B}} and {{C}} to be captured by the source. The
number of rows in {{B}} and {{C}} is irrelevant, table {{A}} needs to have
enough rows for its snapshot to take a couple of minutes (100 rows should be
enough given the configuration below).
# Make sure the source has the following configuration parameters:
## {{{}chunk-meta.group.size: 2{}}}. This a small enough number to make the
binlog split metadata transmitted in groups. {{1}} may work too but I didn't
test with it.
## {{{}scan.snapshot.fetch.size: 2{}}}. This will guarantee that the snapshot
of table A will take long enough ({{{}2{}}} is the minimum allowed value).
# Include tables {{B}} and {{C}} into the source's configuration.
# Start the job and wait until it transitions to the binlog phase.
# Stop the job.
# Include table {{A}} into the configuration.
# Start the job and wait until the source reader starts reporting finished
offsets back to the enumerator.
# Stop the job before the snapshot is completed.
# Restart the job.
# Wait until the snapshot of table A is finished and the source transitions to
the binlog phase.
{*}Expected behavior{*}: changes in table {{A}} are read from the binlog.
{*}Actual behavior{*}: changes in table A in the binlog are ignored.
h3. What's happening internally
{*}Note{*}: for readability, I will use a simplified format for snapshot split
IDs (instead of {{{}A:0{}}}, I will use just {{{}A{}}}).
# The source reader snapshots tables {{B}} and {{C}} and reports their
finished snapshot split infos to the enumerator.
# Enumerator creates the binlog split with infos {{B}} and {{C}} and assigns
it to the reader.
# The job stops, gets table {{A}} included into the configuration and restarts.
# The enumerator assigns split {{A}} and puts it into {{assignedSplits}} so
the order of its keys becomes {{{}B{}}}, {{{}C{}}}, {{{}A{}}}.
# The source reader snapshots {{A}} and reports its finished snapshot split
info(s) to the enumerator.
# If at this point the connection wasn't stopped and restarted, there wouldn't
be an issue. But the connection gets stopped and restarted.
# {{MySqlSnapshotSplitAssigner}} constructor sorts {{assignedSplits}} so the
order of its keys becomes {{{}A{}}}, {{{}B{}}}, {{{}C{}}}.
# The source reader knows that it has 2 split infos ({{{}B{}}} and {{{}C{}}})
but the new total number is 3, so it requests split meta group 1.
# It receives the group with the following elements: {{{}C{}}}.
# It calculates {{existedSplitsOfLastGroup}} (which is an empty set).
# It doesn't deduplicate {{C}} and appends it to the finished split infos.
# As a result, finished split infos look like B, C, C. The transmission is
over, but the info for table A is lost.
# Subsequently, for all changes from table {{{}A{}}},
{{BinlogSplitReader#shouldEmit()}} returns {{false}} because there's no
finished snapshot split info for this table.
Note that other connectors that use {{SnapshotSplitAssigner}} from the
{{flink-cdc-base}} package may be prone to this issue as well.
was:
{*}Steps to reproduce{*}:
# Prepare tables {{{}A{}}}, {{B}} and {{C}} to be captured by the source. The
number of rows in {{B}} and {{C}} is irrelevant, table {{A}} needs to have
enough rows for its snapshot to take a couple of minutes (100 rows should be
enough given the configuration below).
# Make sure the source has the following configuration parameters:
## {{{}chunk-meta.group.size: 2{}}}. This a small enough number to make the
binlog split metadata transmitted in groups. {{1}} may work too but I didn't
test with it.
## {{{}scan.snapshot.fetch.size: 2{}}}. This will guarantee that the snapshot
of table A will take long enough (2 is the minimum allowed value).
# Include tables {{B}} and {{C}} into the source's configuration.
# Start the job and wait until it transitions to the binlog phase.
# Stop the job.
# Include table {{A}} into the configuration.
# Start the job and wait until the source reader starts reporting finished
offsets back to the enumerator.
# Stop the job before the snapshot is completed.
# Restart the job.
# Wait until the snapshot of table A is finished and the source transitions to
the binlog phase.
{*}Expected behavior{*}: changes in table {{A}} are read from the binlog.
{*}Actual behavior{*}: changes in table A in the binlog are ignored.
h3. What's happening internally
{*}Note{*}: for readability, I will use a simplified format for snapshot split
IDs (instead of {{{}A:0{}}}, I will use just {{{}A{}}}).
# The source reader snapshots tables {{B}} and {{C}} and reports their
finished snapshot split infos to the enumerator.
# Enumerator creates the binlog split with infos {{B}} and {{C}} and assigns
it to the reader.
# The job stops, gets table {{A}} included into the configuration and restarts.
# The enumerator assigns split {{A}} and puts it into {{assignedSplits}} so
the order of its keys becomes {{{}B{}}}, {{{}C{}}}, {{{}A{}}}.
# The source reader snapshots {{A}} and reports its finished snapshot split
info(s) to the enumerator.
# If at this point the connection wasn't stopped and restarted, there wouldn't
be an issue. But the connection gets stopped and restarted.
# {{MySqlSnapshotSplitAssigner}} constructor sorts {{assignedSplits}} so the
order of its keys becomes {{{}A{}}}, {{{}B{}}}, {{{}C{}}}.
# The source reader knows that it has 2 split infos ({{{}B{}}} and {{{}C{}}})
but the new total number is 3, so it requests split meta group 1.
# It receives the group with the following elements: {{{}C{}}}.
# It calculates {{existedSplitsOfLastGroup}} (which is an empty set).
# It doesn't deduplicate {{C}} and appends it to the finished split infos.
# As a result, finished split infos look like B, C, C. The transmission is
over, but the info for table A is lost.
# Subsequently, for all changes from table {{{}A{}}},
{{BinlogSplitReader#shouldEmit()}} returns {{false}} because there's no
finished snapshot split info for this table.
Note that other connectors that use {{SnapshotSplitAssigner}} from the
{{flink-cdc-base}} package may be prone to this issue as well.
> MySQL CDC source may ignore newly added tables while reading the binlog
> (scenario 1)
> ------------------------------------------------------------------------------------
>
> Key: FLINK-38218
> URL: https://issues.apache.org/jira/browse/FLINK-38218
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.2.1
> Reporter: Sergei Morozov
> Priority: Major
> Labels: pull-request-available
>
>
> {*}Steps to reproduce{*}:
> # Prepare tables {{{}A{}}}, {{B}} and {{C}} to be captured by the source.
> The number of rows in {{B}} and {{C}} is irrelevant, table {{A}} needs to
> have enough rows for its snapshot to take a couple of minutes (100 rows
> should be enough given the configuration below).
> # Make sure the source has the following configuration parameters:
> ## {{{}chunk-meta.group.size: 2{}}}. This a small enough number to make the
> binlog split metadata transmitted in groups. {{1}} may work too but I didn't
> test with it.
> ## {{{}scan.snapshot.fetch.size: 2{}}}. This will guarantee that the
> snapshot of table A will take long enough ({{{}2{}}} is the minimum allowed
> value).
> # Include tables {{B}} and {{C}} into the source's configuration.
> # Start the job and wait until it transitions to the binlog phase.
> # Stop the job.
> # Include table {{A}} into the configuration.
> # Start the job and wait until the source reader starts reporting finished
> offsets back to the enumerator.
> # Stop the job before the snapshot is completed.
> # Restart the job.
> # Wait until the snapshot of table A is finished and the source transitions
> to the binlog phase.
>
> {*}Expected behavior{*}: changes in table {{A}} are read from the binlog.
> {*}Actual behavior{*}: changes in table A in the binlog are ignored.
> h3. What's happening internally
>
> {*}Note{*}: for readability, I will use a simplified format for snapshot
> split IDs (instead of {{{}A:0{}}}, I will use just {{{}A{}}}).
> # The source reader snapshots tables {{B}} and {{C}} and reports their
> finished snapshot split infos to the enumerator.
> # Enumerator creates the binlog split with infos {{B}} and {{C}} and assigns
> it to the reader.
> # The job stops, gets table {{A}} included into the configuration and
> restarts.
> # The enumerator assigns split {{A}} and puts it into {{assignedSplits}} so
> the order of its keys becomes {{{}B{}}}, {{{}C{}}}, {{{}A{}}}.
> # The source reader snapshots {{A}} and reports its finished snapshot split
> info(s) to the enumerator.
> # If at this point the connection wasn't stopped and restarted, there
> wouldn't be an issue. But the connection gets stopped and restarted.
> # {{MySqlSnapshotSplitAssigner}} constructor sorts {{assignedSplits}} so the
> order of its keys becomes {{{}A{}}}, {{{}B{}}}, {{{}C{}}}.
> # The source reader knows that it has 2 split infos ({{{}B{}}} and
> {{{}C{}}}) but the new total number is 3, so it requests split meta group 1.
> # It receives the group with the following elements: {{{}C{}}}.
> # It calculates {{existedSplitsOfLastGroup}} (which is an empty set).
> # It doesn't deduplicate {{C}} and appends it to the finished split infos.
> # As a result, finished split infos look like B, C, C. The transmission is
> over, but the info for table A is lost.
> # Subsequently, for all changes from table {{{}A{}}},
> {{BinlogSplitReader#shouldEmit()}} returns {{false}} because there's no
> finished snapshot split info for this table.
>
> Note that other connectors that use {{SnapshotSplitAssigner}} from the
> {{flink-cdc-base}} package may be prone to this issue as well.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)