[
https://issues.apache.org/jira/browse/FLINK-35859?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17900028#comment-17900028
]
Xin Gong commented on FLINK-35859:
----------------------------------
[~loserwang1024] Users cannot immediately perceive task issues. Maybe we can
fix it to more perfect. I add a flag to trigger restart when status is
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED. newly table will be synchronized.
{code:java}
// code placeholder
/** Assigner for snapshot split. */
public class SnapshotSplitAssigner<C extends SourceConfig> implements
SplitAssigner {
private static final Logger LOG =
LoggerFactory.getLogger(SnapshotSplitAssigner.class);
private boolean flagExceptionAssignerStatusWhenCheckpoint;
private void captureNewlyAddedTables() {
if (sourceConfig.isScanNewlyAddedTableEnabled() &&
AssignerStatus.isAssigningFinished(assignerStatus)) {
......
} else if
(AssignerStatus.isNewlyAddedAssigningSnapshotFinished(assignerStatus)) {
flagExceptionAssignerStatusWhenCheckpoint = true;
LOG.info("exceptionAssignerStatusCheckpointFlag to true");
}
}
@Override
public void notifyCheckpointComplete(long checkpointId) {
if (AssignerStatus.isNewlyAddedAssigningFinished(assignerStatus)
&& flagExceptionAssignerStatusWhenCheckpoint) {
throw new FlinkRuntimeException("Previous assigner status is
NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED and "
+ "newly add table will cause task always be exception from
checkpoint, so we "
+ "trigger restart for newly table after assigner to normal
status");
}
}
}
{code}
> [flink-cdc] Fix: The assigner is not ready to offer finished split
> information, this should not be called
> ---------------------------------------------------------------------------------------------------------
>
> Key: FLINK-35859
> URL: https://issues.apache.org/jira/browse/FLINK-35859
> Project: Flink
> Issue Type: Bug
> Components: Flink CDC
> Affects Versions: cdc-3.1.1
> Reporter: Hongshun Wang
> Assignee: Hongshun Wang
> Priority: Minor
> Fix For: cdc-3.2.0
>
>
> When use CDC with newly added table, an error occurs:
> {code:java}
> The assigner is not ready to offer finished split information, this should
> not be called. {code}
> It's because:
> 1. when stop then restart the job , the status is
> NEWLY_ADDED_ASSIGNING_SNAPSHOT_FINISHED.
>
> 2. Then Enumerator will send each reader with
> BinlogSplitUpdateRequestEvent to update binlog. (see
> org.apache.flink.cdc.connectors.mysql.source.enumerator.MySqlSourceEnumerator#syncWithReaders).
> 3. The Reader will suspend binlog reader then send
> BinlogSplitMetaRequestEvent to Enumerator.
> 4. The Enumerator found that some tables are not sent, an error will occur
> {code:java}
> private void sendBinlogMeta(int subTask, BinlogSplitMetaRequestEvent
> requestEvent) {
> // initialize once
> if (binlogSplitMeta == null) {
> final List<FinishedSnapshotSplitInfo> finishedSnapshotSplitInfos =
> splitAssigner.getFinishedSplitInfos();
> if (finishedSnapshotSplitInfos.isEmpty()) {
> LOG.error(
> "The assigner offers empty finished split information,
> this should not happen");
> throw new FlinkRuntimeException(
> "The assigner offers empty finished split information,
> this should not happen");
> }
> binlogSplitMeta =
> Lists.partition(
> finishedSnapshotSplitInfos,
> sourceConfig.getSplitMetaGroupSize());
> }
> }{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)