ThorneANN opened a new pull request, #4406:
URL: https://github.com/apache/flink-cdc/pull/4406
Problem
When using a distributed pipeline source (e.g., Kafka, any source where
isParallelMetadataSource() = true) with a Paimon sink, the job fails with
TableNotExistException if the
target table does not already exist in the catalog.
Root cause: In the distributed pipeline topology,
DistributedPrePartitionOperator runs before SchemaOperator. When it receives a
CreateTableEvent, it immediately calls
PaimonHashFunctionProvider.getHashFunction(), which constructs a
PaimonHashFunction. The PaimonHashFunction constructor called
catalog.getTable() to determine whether the table is
append-only — but the table hasn't been created yet because
MetadataApplier hasn't run.
This does not affect MySQL-CDC pipelines because the regular topology
routes events through SchemaOperator first, so the table always exists by the
time
RegularPrePartitionOperator creates the hash function.
Fix
PaimonHashFunction queried the catalog for exactly one reason: to
distinguish append-only tables (no primary key) from primary-key tables. This
information is already available in
the CDC Schema object passed to getHashFunction() via
schema.primaryKeys().
Changes:
- PaimonHashFunction: Replaced the catalog.getTable() call with a check on
schema.primaryKeys().isEmpty(). For primary-key tables, a minimal TableSchema
is constructed directly
from the CDC Schema (column types, partition keys, primary keys) to
initialize RowAssignerChannelComputer. No catalog access at any stage.
- PaimonHashFunctionProvider: Removed the Options parameter since it was
only used to create a catalog.
- PaimonDataSink: Updated the provider instantiation accordingly.
- PaimonHashFunctionTest: Removed all catalog setup/teardown; tests now
run fully offline.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]