[jira] [Created] (FLINK-36565) Pipeline YAML should allow merging decimal with different precisions
yux created FLINK-36565: --- Summary: Pipeline YAML should allow merging decimal with different precisions Key: FLINK-36565 URL: https://issues.apache.org/jira/browse/FLINK-36565 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, it's not possible to merge two Decimal-typed fields with different precision or scaling. Since DECIMAL(p1, s1) and DECIMAL(p2, s2) could be converted to DECIMAL(MAX(p1 - s1, p2 - s2) + MAX(s1, s2), MAX(s1, s2) without any loss, this converting path seems reasonable and worth being added. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36558) MySQL CDC Fails to parse array-typed key index binlog created between 8.0.17 and 8.0.18
yux created FLINK-36558: --- Summary: MySQL CDC Fails to parse array-typed key index binlog created between 8.0.17 and 8.0.18 Key: FLINK-36558 URL: https://issues.apache.org/jira/browse/FLINK-36558 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux MySQL 8.0.17 and 8.0.18 writes Array-typed key index differently into Binlog. * MySQL 8.0.17 used a different internal enum value for TYPED_ARRAY column type * MySQL 8.0.17 and 8.0.18 writes extraneous bytes after expected column metadata It has been fixed since MySQL 8.0.19. Since MySQL takes Binlog format as an internal implementation detail[1] and doesn't guarantee the backwards compatibility, MySQL CDC has to deal with the versioning problem itself. [1] https://bugs.mysql.com/bug.php?id=105545 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36075) Add paginate partition strategy for mongodb connector
[ https://issues.apache.org/jira/browse/FLINK-36075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889926#comment-17889926 ] yux commented on FLINK-36075: - Sure! Please assign it to me. > Add paginate partition strategy for mongodb connector > - > > Key: FLINK-36075 > URL: https://issues.apache.org/jira/browse/FLINK-36075 > Project: Flink > Issue Type: Improvement > Components: Connectors / MongoDB >Affects Versions: mongodb-1.2.0 >Reporter: Jiabao Sun >Assignee: Jiabao Sun >Priority: Major > Fix For: mongodb-1.3.0 > > > Add paginate partition strategy for mongodb connector -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36515) Adds high-precision TIME type support in YAML pipeline
yux created FLINK-36515: --- Summary: Adds high-precision TIME type support in YAML pipeline Key: FLINK-36515 URL: https://issues.apache.org/jira/browse/FLINK-36515 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, CDC treats `TimeType` in the same way as Flink Table API, and supports time types with zero-precision, but MySQL upstream may generate time data with up to precision 9. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36514) Unable to override include/exclude schema types in lenient mode
yux created FLINK-36514: --- Summary: Unable to override include/exclude schema types in lenient mode Key: FLINK-36514 URL: https://issues.apache.org/jira/browse/FLINK-36514 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux If schema evolution behavior is set to LENIENT, Truncate / Drop table events will be ignored by default. However, there's currently no way for users to override this behavior due to the following code: ```java if (excludedSETypes.isEmpty() && SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) { // In lenient mode, we exclude DROP_TABLE and TRUNCATE_TABLE by default. This could be // overridden by manually specifying excluded types. Stream.of(SchemaChangeEventType.DROP_TABLE, SchemaChangeEventType.TRUNCATE_TABLE) .map(SchemaChangeEventType::getTag) .forEach(excludedSETypes::add); } ``` If one wants to exclude no types, it's actually not possible since passing `[]` is equivalent to passing nothing, and `DROP` and `TRUNCATE` events will still be ignored. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36475) YAML Transform supports case-insensitive column names conversion
yux created FLINK-36475: --- Summary: YAML Transform supports case-insensitive column names conversion Key: FLINK-36475 URL: https://issues.apache.org/jira/browse/FLINK-36475 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Some YAML pipeline sink has a case-insensitive semantic (Paimon, Hive, etc.), which means all given columns should be converted to lowercased letters. It's a chore for users to write individual transform rules just for changing column names, and a meta-option to automatically downcase column names would be nice. > Originally discussed in > https://github.com/apache/flink-cdc/discussions/3612#discussioncomment-10910326 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36474) YAML Table-merging route should accept more type widening cases
yux created FLINK-36474: --- Summary: YAML Table-merging route should accept more type widening cases Key: FLINK-36474 URL: https://issues.apache.org/jira/browse/FLINK-36474 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, there are only three specific type widening cases available when performing table-merging routes: * Integer Type Promotions * Float to Double Promotion * Char-like Type to String Any other types (including some reasonable ones, like temporal types) will be rejected during wider type inferencing, which is a little surprising, since table-merging route could tolerate columns with various names and counts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36461) YAML job failed to schema evolve with unmatched transform tables
yux created FLINK-36461: --- Summary: YAML job failed to schema evolve with unmatched transform tables Key: FLINK-36461 URL: https://issues.apache.org/jira/browse/FLINK-36461 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, such transform configuration will fail for any schema change events from tables except `foo.bar.baz`: ```yaml transform: - source-table: foo.bar.baz projection: \* ``` The exception message is as follows: {{2024-10-10 11:04:43: java.lang.ArrayIndexOutOfBoundsException: -1 at java.util.ArrayList.elementData(ArrayList.java:424) at java.util.ArrayList.get(ArrayList.java:437) at org.apache.flink.cdc.common.utils.SchemaUtils.lambda$transformSchemaChangeEvent$11(SchemaUtils.java:371) at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384) at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) at org.apache.flink.cdc.common.utils.SchemaUtils.transformSchemaChangeEvent(SchemaUtils.java:386) at org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.cacheChangeSchema(PreTransformOperator.java:272) at org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.processElement(PreTransformOperator.java:248) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) at org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(IteratorSourceReaderBase.java:103) at org.apache.flink.cdc.connectors.values.source.ValuesDataSource$EventIteratorReader.pollNext(ValuesDataSource.java:294) at org.apache.flink.streaming.api.operators.SourceOperator.pollNext(SourceOperator.java:779) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:457) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:70) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:616) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1071) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1020) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567) at java.lang.Thread.run(Thread.java:879)}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36453) Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver compatbility
[ https://issues.apache.org/jira/browse/FLINK-36453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17887877#comment-17887877 ] yux commented on FLINK-36453: - I'd love to take this ticket. [~leonard] > Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver compatbility > --- > > Key: FLINK-36453 > URL: https://issues.apache.org/jira/browse/FLINK-36453 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > > DBZ-6502 fixes Oracle JDBC 23.x compatibility, but it wasn't shipped in > versions prior to 2.2. > We may backport this bugfix for now before updating to Debezium 2.x. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36453) Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver compatbility
[ https://issues.apache.org/jira/browse/FLINK-36453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-36453: Description: DBZ-6502 fixes Oracle JDBC 23.x compatibility, but it wasn't shipped in versions prior to 2.2. We may backport this bugfix for now before updating to Debezium 2.x. > Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver compatbility > --- > > Key: FLINK-36453 > URL: https://issues.apache.org/jira/browse/FLINK-36453 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > > DBZ-6502 fixes Oracle JDBC 23.x compatibility, but it wasn't shipped in > versions prior to 2.2. > We may backport this bugfix for now before updating to Debezium 2.x. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36453) Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver compatbility
yux created FLINK-36453: --- Summary: Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver compatbility Key: FLINK-36453 URL: https://issues.apache.org/jira/browse/FLINK-36453 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36408) MySQL pipeline connector could not work with FLOAT type with precision
yux created FLINK-36408: --- Summary: MySQL pipeline connector could not work with FLOAT type with precision Key: FLINK-36408 URL: https://issues.apache.org/jira/browse/FLINK-36408 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.2.0 Reporter: yux Currently, MySQL SourceRecordEmitter treats FLOAT(x) (float with explicit precision) as DOUBLE, but MetadataAccessor reports a plain "FLOAT", which causes discrepancy and corrupted deserialized data. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36403) K8s native mode Use ConfigMap to get a better experience
[ https://issues.apache.org/jira/browse/FLINK-36403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885758#comment-17885758 ] yux commented on FLINK-36403: - +1 that repackaging images are not convenient and necessary considering most parts are unchanged, and we may be able to public generic docker images without requiring users to build their own ones. Just two minor questions: * How will this approach handle dependencies (including JDBC drivers, UDF jars, etc.)? * Currently, deploy target is configured via CLI arguments. Will there be some discrepancies if we have `deploy:` configuration in YAML files? P.S. I guess the design purpose is "Pipeline description YAML" should be a target-neutral abstract representation, and one may deploy a job everywhere (for example, in local cluster when testing, and remote when deploying) without changing YAML jobs. cc [~renqs] [~ConradJam] > K8s native mode Use ConfigMap to get a better experience > > > Key: FLINK-36403 > URL: https://issues.apache.org/jira/browse/FLINK-36403 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: gaoyan >Priority: Major > > Currently, the operation of CDC's k8s native mode requires users to repackage > the YAML into the Docker image each time to update the task, which is a poor > experience. However, this issue can be resolved with a very simple method. > When submitting a task, create a ConfigMap and write the YAML content into > it, then mount it to the container. At the same time, there is no need to > worry about resource release and deletion . The ConfigMap can be strongly > bound to the container, and k8s will automatically handle these issues for > us. If there are security concerns, we can also use a Secret. > This method is simple and convenient, and does not require additional tools > such as a k8s Operator. > Regarding deployment, I also have some suggestions. There are many > customizable parameters that can be configured for k8s native submissions. > Currently, doing these things only through CLI args is very difficult for > users. Writing them into the YAML for configuration will provide a better > experience, as shown below: > {code:java} > source: > x > sink: > xx > pipeline: > > deploy: > target: k8s-native > jar: "/opt/cdc/cdc.jar" > namespace: > account: > {code} > Would this approach be more user-friendly? > I have done similar things in other communities. If the CDC community thinks > this idea is feasible, I am willing to work on this issue and submit a PR. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36221) Add specification about CAST ... AS ... built-in functions
yux created FLINK-36221: --- Summary: Add specification about CAST ... AS ... built-in functions Key: FLINK-36221 URL: https://issues.apache.org/jira/browse/FLINK-36221 Project: Flink Issue Type: Sub-task Reporter: yux Fix For: cdc-3.2.0 FLINK-34877 adds CAST ... AS ... syntax in transform expressions, but there's no corresponding documentations yet. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36219) Add Flink compatibility matrix of CDC releases
yux created FLINK-36219: --- Summary: Add Flink compatibility matrix of CDC releases Key: FLINK-36219 URL: https://issues.apache.org/jira/browse/FLINK-36219 Project: Flink Issue Type: Sub-task Reporter: yux Fix For: cdc-3.2.0 Now, CDC releases have their own preferences over Flink versions. For example, Flink 3.1.0- doesn't work with Flink 1.19. Adding a compatibility table would be much cleaner. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36214) Error log when building flink-cdc-pipeline-udf-examples from source code
[ https://issues.apache.org/jira/browse/FLINK-36214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17879401#comment-17879401 ] yux commented on FLINK-36214: - Seems it was brought in by FLINK-34876, will investigate this. > Error log when building flink-cdc-pipeline-udf-examples from source code > > > Key: FLINK-36214 > URL: https://issues.apache.org/jira/browse/FLINK-36214 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: lincoln lee >Priority: Minor > > There's an error log when building from source code(encountered on 3.2.0 rc & > master branch), but not fail the build. > {code} > [INFO] --< org.apache.flink:flink-cdc-pipeline-udf-examples > >-- > [INFO] Building flink-cdc-pipeline-udf-examples 3.2.0 > [3/42] > [INFO] [ jar > ]- > [INFO] > [INFO] --- maven-clean-plugin:3.1.0:clean (default-clean) @ > flink-cdc-pipeline-udf-examples --- > [INFO] Deleting > /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/target > [INFO] > [INFO] --- flatten-maven-plugin:1.5.0:clean (flatten.clean) @ > flink-cdc-pipeline-udf-examples --- > [INFO] Deleting > /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/.flattened-pom.xml > [INFO] > [INFO] --- spotless-maven-plugin:2.4.2:check (spotless-check) @ > flink-cdc-pipeline-udf-examples --- > [INFO] > [INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ > flink-cdc-pipeline-udf-examples --- > [INFO] > [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ > flink-cdc-pipeline-udf-examples --- > [INFO] > [INFO] --- maven-remote-resources-plugin:1.5:process > (process-resource-bundles) @ flink-cdc-pipeline-udf-examples --- > [INFO] > [INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ > flink-cdc-pipeline-udf-examples --- > [INFO] Using 'UTF-8' encoding to copy filtered resources. > [INFO] skip non existing resourceDirectory > /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/src/main/resources > [INFO] Copying 3 resources > [INFO] > [INFO] --- flatten-maven-plugin:1.5.0:flatten (flatten) @ > flink-cdc-pipeline-udf-examples --- > [INFO] Generating flattened POM of project > org.apache.flink:flink-cdc-pipeline-udf-examples:jar:3.2.0... > [INFO] > [INFO] --- scala-maven-plugin:4.9.2:add-source (scala-compile-first) @ > flink-cdc-pipeline-udf-examples --- > [INFO] Add Source directory: > /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/src/main/scala > [INFO] Add Test Source directory: > /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/src/test/scala > [INFO] > [INFO] --- scala-maven-plugin:4.9.2:compile (scala-compile-first) @ > flink-cdc-pipeline-udf-examples --- > [INFO] Compiler bridge file: > /Users/lilin/.sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.12-1.10.0-bin_2.12.16__52.0-1.10.0_20240505T232140.jar > [INFO] compiling 8 Scala sources and 8 Java sources to > /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/target/classes > ... > [ERROR] -release is only supported on Java 9 and higher > [INFO] done compiling > [INFO] compile in 8.2 s > {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36193) Allow applying new SchemaChangeEvents to sink connectors
yux created FLINK-36193: --- Summary: Allow applying new SchemaChangeEvents to sink connectors Key: FLINK-36193 URL: https://issues.apache.org/jira/browse/FLINK-36193 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux We've added `DropTableEvent` and `TruncateTableEvent` support and provides pre-schema backfill feature in FLINK-35243. However, there's no sink connectors supporting them for now. This could be done for sink connectors including Doris, StarRocks, and Paimon. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36184) Transform Operator swallows schema changes from tables not present in transform rules
yux created FLINK-36184: --- Summary: Transform Operator swallows schema changes from tables not present in transform rules Key: FLINK-36184 URL: https://issues.apache.org/jira/browse/FLINK-36184 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.2.0, cdc-3.3.0 Reporter: yux Currently, tables that are not present in transform blocks should be treated as if there's such a dummy fallback block: transform: - source-table: "\.*.\.*" # capture all unmentioned tables projection: "*" # keep all columns # without filtering any rows There's a bug in #3557's implementation, where schema change events should be filtered out if there's no wildcard (*) in transform rules. However, it also filters out those tables that are not defined in transform rules, which causes schema change events lost with the following example: transform: - source-table: foo.bar.baz # Another table that doesn't really matter projection: ... Here, since there's one transform block, TransformOperator will be added into operator chain. Now let's perform some schema change events in another table (like db.table), it will be filtered out since TransformOperator regards it as a asterisk-less table and does not require schema change events. By checking if a table is transformed or not, we could set hasAsterisk flag map correctly and resolve this problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36183) Lenient mode doesn't work with route blocks
yux created FLINK-36183: --- Summary: Lenient mode doesn't work with route blocks Key: FLINK-36183 URL: https://issues.apache.org/jira/browse/FLINK-36183 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.2.0, cdc-3.3.0 Reporter: yux We should applySchemaChange (where the route rule works) first and lenientize its result then, or we may not be able to get evolved schema since tableId isn't routed: Caused by: java.lang.IllegalStateException: Evolved schema does not exist, not ready for schema change event AddColumnEvent{tableId=kunni_test.customers, addedColumns=[ColumnWithPosition{column=`newCol2` VARCHAR(100), position=AFTER, existedColumnName=newCol}]} at org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lambda$lenientizeSchemaChangeEvent$3(SchemaRegistryRequestHandler.java:378) at java.util.Optional.orElseThrow(Optional.java:290) at org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lenientizeSchemaChangeEvent(SchemaRegistryRequestHandler.java:376) at org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.calculateDerivedSchemaChangeEvents(SchemaRegistryRequestHandler.java:360) at org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.handleSchemaChangeRequest(SchemaRegistryRequestHandler.java:184) at org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry.lambda$handleCoordinationRequest$3(SchemaRegistry.java:273) ... 4 more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36153) MySQL fails to handle schema change events In Timestamp or Earliest Offset startup mode
[ https://issues.apache.org/jira/browse/FLINK-36153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-36153: Issue Type: Bug (was: Improvement) > MySQL fails to handle schema change events In Timestamp or Earliest Offset > startup mode > --- > > Key: FLINK-36153 > URL: https://issues.apache.org/jira/browse/FLINK-36153 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > > Currently, if MySQL source is trying to startup fro a binlog position where > there are schema changes within range, job will fail due to non-replayable > schema change events. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36153) MySQL fails to handle schema change events In Timestamp or Earliest Offset startup mode
yux created FLINK-36153: --- Summary: MySQL fails to handle schema change events In Timestamp or Earliest Offset startup mode Key: FLINK-36153 URL: https://issues.apache.org/jira/browse/FLINK-36153 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, if MySQL source is trying to startup fro a binlog position where there are schema changes within range, job will fail due to non-replayable schema change events. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36153) MySQL fails to handle schema change events In Timestamp or Earliest Offset startup mode
[ https://issues.apache.org/jira/browse/FLINK-36153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876559#comment-17876559 ] yux commented on FLINK-36153: - [~Leonard] Please assign this to me. > MySQL fails to handle schema change events In Timestamp or Earliest Offset > startup mode > --- > > Key: FLINK-36153 > URL: https://issues.apache.org/jira/browse/FLINK-36153 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Major > > Currently, if MySQL source is trying to startup fro a binlog position where > there are schema changes within range, job will fail due to non-replayable > schema change events. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36151) Add documentations for Schema Evolution related options
yux created FLINK-36151: --- Summary: Add documentations for Schema Evolution related options Key: FLINK-36151 URL: https://issues.apache.org/jira/browse/FLINK-36151 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux CDC Documentations should be updated to reflect recent changes of schema change features, like new TRY_EVOLVE and LENIENT mode, `include.schema.change` and `exclude.schema.change` options. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36114) Schema registry should block when handling existing requests
[ https://issues.apache.org/jira/browse/FLINK-36114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-36114: Issue Type: Bug (was: Improvement) > Schema registry should block when handling existing requests > > > Key: FLINK-36114 > URL: https://issues.apache.org/jira/browse/FLINK-36114 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > Labels: pull-request-available > > Currently, SchemaRegistry asynchronously receives schema change requests from > SchemaOperator, and results of multiple requests might got mixed up together, > causing incorrect logic flow in multiple parallelism cases. > Changing SchemaRegistry's behavior to accept requests in serial should > resolve this problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36105) CDC pipeline job could not restore from state in Flink 1.20
[ https://issues.apache.org/jira/browse/FLINK-36105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-36105: Priority: Blocker (was: Major) > CDC pipeline job could not restore from state in Flink 1.20 > --- > > Key: FLINK-36105 > URL: https://issues.apache.org/jira/browse/FLINK-36105 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Blocker > > Currently, it is not possible to restore pipeline job with Flink 1.20 with > the following exception: > Exception in thread "main" java.lang.NoClassDefFoundError: > org/apache/flink/runtime/jobgraph/RestoreMode > at > org.apache.flink.cdc.cli.CliFrontend.createSavepointRestoreSettings(CliFrontend.java:130) > at > org.apache.flink.cdc.cli.CliFrontend.createExecutor(CliFrontend.java:95) > at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:69) > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.runtime.jobgraph.RestoreMode > at > java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) > at > java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) > at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527) > ... 3 more > Seems the Flink API mentioned above has been changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36114) Schema registry should block when handling existing requests
[ https://issues.apache.org/jira/browse/FLINK-36114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-36114: Priority: Blocker (was: Major) > Schema registry should block when handling existing requests > > > Key: FLINK-36114 > URL: https://issues.apache.org/jira/browse/FLINK-36114 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Blocker > Labels: pull-request-available > > Currently, SchemaRegistry asynchronously receives schema change requests from > SchemaOperator, and results of multiple requests might got mixed up together, > causing incorrect logic flow in multiple parallelism cases. > Changing SchemaRegistry's behavior to accept requests in serial should > resolve this problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36128) Promote LENIENT mode as the default schema evolution behavior
[ https://issues.apache.org/jira/browse/FLINK-36128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-36128: Issue Type: Bug (was: Improvement) > Promote LENIENT mode as the default schema evolution behavior > - > > Key: FLINK-36128 > URL: https://issues.apache.org/jira/browse/FLINK-36128 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > > Currently, default schema evolution mode "EVOLVE" could not handle > exceptions, and might not be able to restore from existing state correctly > after failover. Before we can "manually trigger checkpoint" that was > introduced in Flink 1.19, making "LENIENT" a default option might be more > suitable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-36128) Promote LENIENT mode as the default schema evolution behavior
[ https://issues.apache.org/jira/browse/FLINK-36128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-36128: Priority: Blocker (was: Major) > Promote LENIENT mode as the default schema evolution behavior > - > > Key: FLINK-36128 > URL: https://issues.apache.org/jira/browse/FLINK-36128 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Blocker > > Currently, default schema evolution mode "EVOLVE" could not handle > exceptions, and might not be able to restore from existing state correctly > after failover. Before we can "manually trigger checkpoint" that was > introduced in Flink 1.19, making "LENIENT" a default option might be more > suitable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36128) Promote LENIENT mode as the default schema evolution behavior
yux created FLINK-36128: --- Summary: Promote LENIENT mode as the default schema evolution behavior Key: FLINK-36128 URL: https://issues.apache.org/jira/browse/FLINK-36128 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, default schema evolution mode "EVOLVE" could not handle exceptions, and might not be able to restore from existing state correctly after failover. Before we can "manually trigger checkpoint" that was introduced in Flink 1.19, making "LENIENT" a default option might be more suitable. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36114) Schema registry should block when handling existing requests
yux created FLINK-36114: --- Summary: Schema registry should block when handling existing requests Key: FLINK-36114 URL: https://issues.apache.org/jira/browse/FLINK-36114 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, SchemaRegistry asynchronously receives schema change requests from SchemaOperator, and results of multiple requests might got mixed up together, causing incorrect logic flow in multiple parallelism cases. Changing SchemaRegistry's behavior to accept requests in serial should resolve this problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36105) CDC pipeline job could not restore from state in Flink 1.20
yux created FLINK-36105: --- Summary: CDC pipeline job could not restore from state in Flink 1.20 Key: FLINK-36105 URL: https://issues.apache.org/jira/browse/FLINK-36105 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, it is not possible to restore pipeline job with Flink 1.20 with the following exception: Exception in thread "main" java.lang.NoClassDefFoundError: org/apache/flink/runtime/jobgraph/RestoreMode at org.apache.flink.cdc.cli.CliFrontend.createSavepointRestoreSettings(CliFrontend.java:130) at org.apache.flink.cdc.cli.CliFrontend.createExecutor(CliFrontend.java:95) at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:69) Caused by: java.lang.ClassNotFoundException: org.apache.flink.runtime.jobgraph.RestoreMode at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581) at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178) at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527) ... 3 more Seems the Flink API mentioned above has been changed. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36093) PreTransform operator wrongly filters out columns when multiple transform rules were defined
yux created FLINK-36093: --- Summary: PreTransform operator wrongly filters out columns when multiple transform rules were defined Key: FLINK-36093 URL: https://issues.apache.org/jira/browse/FLINK-36093 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, such transform rule could not work: ```yaml transform: - projection: 'A' as result filter: tag >= 0 - projection: score as result filter: tag < 0 ``` Here, `score` column will be filtered out in PreTransform stage, since it wasn't referenced in the first transform rule. As a result, the following transform rule will fail since `score` does not exist in PostTransform operator. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36092) Transform doesn't fully support with schema evolution
[ https://issues.apache.org/jira/browse/FLINK-36092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17874799#comment-17874799 ] yux commented on FLINK-36092: - Noticed this bug when running DataGen -> StarRocks fuzzing test. [~Leonard] Please assign this to me. > Transform doesn't fully support with schema evolution > - > > Key: FLINK-36092 > URL: https://issues.apache.org/jira/browse/FLINK-36092 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Major > > Currently, transformed table could not fully support upstream schema > evolutions. > Need to add more test cases to test if add / alterType / rename / drop column > events works with wildcard matchers and non-wildcard matchers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36092) Transform doesn't fully support with schema evolution
yux created FLINK-36092: --- Summary: Transform doesn't fully support with schema evolution Key: FLINK-36092 URL: https://issues.apache.org/jira/browse/FLINK-36092 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, transformed table could not fully support upstream schema evolutions. Need to add more test cases to test if add / alterType / rename / drop column events works with wildcard matchers and non-wildcard matchers. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36041) Eliminate Calcite dependency during runtime
yux created FLINK-36041: --- Summary: Eliminate Calcite dependency during runtime Key: FLINK-36041 URL: https://issues.apache.org/jira/browse/FLINK-36041 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, runtime operator `PreTransformOp` and `PostTransformOp` heavily relies on Calcite to parse expression rules. Calcite is a heavy dependency and quite easily to cause conflicts (since it's a Flink dependency, too). It would be better if we could construct those abstract grammar data structures early (in composer, before submitting jobs). It could improve runtime execution performance and provide better error messages given malformed expressions. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35985) SUBSTRING function not available in transform rules
[ https://issues.apache.org/jira/browse/FLINK-35985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872902#comment-17872902 ] yux commented on FLINK-35985: - Thanks [~MOBIN]'s suggestion! +1 for providing both functions to keep consistency with Flink SQL. Kindly remind that FLINK-35991 just refactored builtin functions declaration, so make sure you've rebased before working on this : ) > SUBSTRING function not available in transform rules > --- > > Key: FLINK-35985 > URL: https://issues.apache.org/jira/browse/FLINK-35985 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > > Currently, one could not write `SUBSTRING(str FROM idx FOR len)` in the way > that suggested in the doumentations[1], while the workable function, > `SUBSTR(str, idx, len)`, isn't mentioned anywhere. > Either the code or the docs needs update to avoid confusing users. > [1] > https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/core-concept/transform/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-36034) Remove dependencies of Flink table planner in Transform module
[ https://issues.apache.org/jira/browse/FLINK-36034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872765#comment-17872765 ] yux commented on FLINK-36034: - [~Leonard] I'm willing to investigate this. > Remove dependencies of Flink table planner in Transform module > -- > > Key: FLINK-36034 > URL: https://issues.apache.org/jira/browse/FLINK-36034 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > > Currently, transform module `BuiltinScalarFunction` redundantly relies on > `org.apache.flink.table.functions.BuiltInFunctionDefinition`. Getting rid of > it should allow CDC jobs running on Kubernetes clusters where such dependency > isn't provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36034) Remove dependencies of Flink table planner in Transform module
yux created FLINK-36034: --- Summary: Remove dependencies of Flink table planner in Transform module Key: FLINK-36034 URL: https://issues.apache.org/jira/browse/FLINK-36034 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, transform module `BuiltinScalarFunction` redundantly relies on `org.apache.flink.table.functions.BuiltInFunctionDefinition`. Getting rid of it should allow CDC jobs running on Kubernetes clusters where such dependency isn't provided. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35615) CDC3.0, the pipeline of mysql-doris , do not support the MySQL field type timestamp(6)
[ https://issues.apache.org/jira/browse/FLINK-35615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872198#comment-17872198 ] yux commented on FLINK-35615: - This has been resolved in https://github.com/apache/flink-cdc/pull/3511. > CDC3.0, the pipeline of mysql-doris , do not support the MySQL field type > timestamp(6) > --- > > Key: FLINK-35615 > URL: https://issues.apache.org/jira/browse/FLINK-35615 > Project: Flink > Issue Type: Bug >Affects Versions: cdc-3.1.0 >Reporter: ConorZhao >Priority: Major > Labels: pull-request-available > > 2024-06-14 16:45:24 > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269) > at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) > at > org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) > at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) > at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > at > org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) > at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) > at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) > at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) > at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) > at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) > at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) > at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) > Caused by: java.lang.ArrayIndexOutOfBoundsException: 8 > at > com.ververica.cdc.common.data.binary.BinarySegmentUtils.getLongMultiSegments(BinarySegmentUtils.java:731) > at > com.ververica.cdc.common.data.binary.BinarySegmentUtils.getLong(BinarySegmentUtils.java:721) > at > com.ververica.cdc.common.data.binary.BinarySegmentUtils.readLocalZonedTimestampData(BinarySegmentUtils.java:1033) > at > com.ververica.cdc.common.data.binary.BinaryRecordData.getLocalZonedTimestampData(BinaryRecordData.java:244) > at > com.ververica.cdc.connectors.doris.sink.DorisRowConverter.lambda$createExternalConverter$cc45f215$1(DorisRowConverter.java:113) > at > com.ververica.cdc.connectors.dori
[jira] [Commented] (FLINK-35774) The cache of transform is not updated after process schema change event
[ https://issues.apache.org/jira/browse/FLINK-35774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872193#comment-17872193 ] yux commented on FLINK-35774: - Resolved by https://github.com/apache/flink-cdc/pull/3285. > The cache of transform is not updated after process schema change event > --- > > Key: FLINK-35774 > URL: https://issues.apache.org/jira/browse/FLINK-35774 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 >Reporter: Wenkai Qi >Priority: Major > Labels: pull-request-available > Original Estimate: 1m > Remaining Estimate: 1m > > The cache of transform is not updated after process schema change event. > For example, when add column event, tableInfo is not updated in > TransformSchema and TransformData. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35852) When used through the transform function, the decimal(10,2) type field value in the MySQL source table becomes 100 times the original value after being transferred to
[ https://issues.apache.org/jira/browse/FLINK-35852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872192#comment-17872192 ] yux commented on FLINK-35852: - Resolved in https://github.com/apache/flink-cdc/pull/3285. > When used through the transform function, the decimal(10,2) type field value > in the MySQL source table becomes 100 times the original value after being > transferred to the target table. > > > Key: FLINK-35852 > URL: https://issues.apache.org/jira/browse/FLINK-35852 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 > Environment: flink-1.18.0、 > flink-cdc-3.1.1、mysql-5.7.43、Doris-2.0.6、CentOS Linux release 8.5.2111 >Reporter: zheng_shengsheng >Priority: Major > Attachments: image-2024-07-16-14-46-53-982.png, > image-2024-07-16-14-48-07-981.png > > > When I use CDC's yaml mode to collect MySQL data to Doris, I use the latest > supported transform feature. > Finally, I found that the database table in MySQL collected and the > decimal(10,2) type fields in the Mysql source table were automatically > converted into decimal(19,0) in Doris, and then the value size became 100 > times the original size. As follows > > !image-2024-07-16-14-48-07-981.png! > !image-2024-07-16-14-46-53-982.png! > > The core part of yaml is as follows: > {code:java} > // code placeholder > source: > type: mysql > hostname: node2 > port: 3306 > username: > password: > tables: sys_canteen_consume_conf,sys_canteen_consume_rec,sys_order_course, > server-id: 5513 > connectionTimeZone: GMT+8 > scan.startup.mode: initial > sink: > type: doris > fenodes: node3:8030 > username: admin > password: > table.create.properties.light_schema_change: true > table.create.properties.replication_num: 1 > transform: > - source-table: jeecg-boot2.sys_\.* > projection: concat('LYDB2','') as tenant_code, * > primary-keys: tenant_code,id > - source-table: jeecg-boot2.sys_user_depart > projection: concat('LYDB2','') as tenant_code, * > primary-keys: tenant_code,ID > pipeline: > name: test,server-id-5513,flinkCDC-3.1.1 > parallelism: 1 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-36006) Add documentations for Flink CDC CLI
yux created FLINK-36006: --- Summary: Add documentations for Flink CDC CLI Key: FLINK-36006 URL: https://issues.apache.org/jira/browse/FLINK-36006 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, there's no specification about CDC CLI usage in documentations, while such information is only available with `./flink-cdc.sh --help`. Adding them should help users referencing options more easily. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35991) Resolve conflicting definitions in transform SqlFunctionTable definition
yux created FLINK-35991: --- Summary: Resolve conflicting definitions in transform SqlFunctionTable definition Key: FLINK-35991 URL: https://issues.apache.org/jira/browse/FLINK-35991 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, transform operator chains self-defined SqlFunctions and standard function tables, which might be a problem since there are some conflicting function declarations. Getting rid of `SqlStdOperatorTable` and declaring supported functions only should resolve this problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35986) NULL literal is not supported in Transform rules
yux created FLINK-35986: --- Summary: NULL literal is not supported in Transform rules Key: FLINK-35986 URL: https://issues.apache.org/jira/browse/FLINK-35986 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Sometimes one may want to explicitly write down a NULL value, for example in some CASE - WHEN branches or when calling some UDF functions. However, it is not possible to write down such expressions, and an exception will be thrown: Caused by: java.lang.UnsupportedOperationException: Unsupported type: NULL at org.apache.flink.cdc.runtime.typeutils.DataTypeConverter.convertCalciteRelDataTypeToDataType(DataTypeConverter.java:181) at org.apache.flink.cdc.runtime.parser.TransformParser.generateProjectionColumns(TransformParser.java:189) ... -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35985) SUBSTRING function not available in transform rules
yux created FLINK-35985: --- Summary: SUBSTRING function not available in transform rules Key: FLINK-35985 URL: https://issues.apache.org/jira/browse/FLINK-35985 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, one could not write `SUBSTRING(str FROM idx FOR len)` in the way that suggested in the doumentations[1], while the workable function, `SUBSTR(str, idx, len)`, isn't mentioned anywhere. Either the code or the docs needs update to avoid confusing users. [1] https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/core-concept/transform/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35984) Job crashes when metadata column names present in transform rules
yux created FLINK-35984: --- Summary: Job crashes when metadata column names present in transform rules Key: FLINK-35984 URL: https://issues.apache.org/jira/browse/FLINK-35984 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Given such a transform rule: transform: projection: \*, '__namespace_name__schema_name__table_name__' AS string_literal Obviously this shouldn't insert any metadata columns. However since metadata column existence check was done by searching identifier string in statement, without considering any syntax info: ```java // TransformParser.java#L357 if (transformStatement.contains(DEFAULT_NAMESPACE_NAME) ... ``` Transform operator will mistakenly append metadata columns into Janino arguments list, and crash the job: Caused by: java.lang.IllegalArgumentException: wrong number of arguments at java.lang.reflect.Method.invoke(Method.java:498) at org.codehaus.janino.ExpressionEvaluator.evaluate(ExpressionEvaluator.java:541) at org.codehaus.janino.ExpressionEvaluator.evaluate(ExpressionEvaluator.java:533) at org.apache.flink.cdc.runtime.operators.transform.ProjectionColumnProcessor.evaluate(ProjectionColumnProcessor.java:64) at org.apache.flink.cdc.runtime.operators.transform.TransformProjectionProcessor.processData(TransformProjectionProcessor.java:153) at org.apache.flink.cdc.runtime.operators.transform.TransformDataOperator.processProjection(TransformDataOperator.java:387) at org.apache.flink.cdc.runtime.operators.transform.TransformDataOperator.processDataChangeEvent(TransformDataOperator.java:328) at org.apache.flink.cdc.runtime.operators.transform.TransformDataOperator.processElement(TransformDataOperator.java:190) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146) at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35983) Job crashes when using wildcard (*) match with metadata column
yux created FLINK-35983: --- Summary: Job crashes when using wildcard (*) match with metadata column Key: FLINK-35983 URL: https://issues.apache.org/jira/browse/FLINK-35983 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux One may write such a projection rule: transform: - projection: '*, __namespace_name__, __schema_name__, __table_name__' to append some metadata columns at the end of existing columns. However this will crash the job since once a metadata column was declared, since wildcard matches metadata column, too: Caused by: java.lang.IllegalArgumentException: Field names must be unique. Found duplicates: [__namespace_name__, __schema_name__, __table_name__] at org.apache.flink.cdc.common.types.RowType.validateFields(RowType.java:158) at org.apache.flink.cdc.common.types.RowType.(RowType.java:54) at org.apache.flink.cdc.common.types.RowType.of(RowType.java:183) at org.apache.flink.cdc.common.types.RowType.of(RowType.java:175) at org.apache.flink.cdc.runtime.typeutils.DataTypeConverter.toRowType(DataTypeConverter.java:55) at org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo.of(TableChangeInfo.java:100) at org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator.cacheCreateTable(TransformSchemaOperator.java:183) at org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator.processElement(TransformSchemaOperator.java:168) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50) at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29) at org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110) at org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101) at org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(IteratorSourceReaderBase.java:111) at org.apache.flink.cdc.connectors.values.source.ValuesDataSource$EventIteratorReader.pollNext(ValuesDataSource.java:294) at org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419) at org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68) at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65) at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562) at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231) at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807) at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953) at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932) at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562) at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35982) Transform metadata config doesn't work if no projection block was provided
yux created FLINK-35982: --- Summary: Transform metadata config doesn't work if no projection block was provided Key: FLINK-35982 URL: https://issues.apache.org/jira/browse/FLINK-35982 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux One may readjust source table primary keys, partition keys, table options by specifying them in a Transform block like this: transform: - projection: '*' primary-keys: order_id, product_name partition-keys: order_id table-options: bucket=1 However, if projection field is omitted (which fallbacks to default behavior that does not change any source table columns), such configuration will not take effect: transform: - primary-keys: order_id, product_name partition-keys: order_id table-options: bucket=1 # These options will not apply -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35981) Transform rule doesn't support referencing one column more than once
yux created FLINK-35981: --- Summary: Transform rule doesn't support referencing one column more than once Key: FLINK-35981 URL: https://issues.apache.org/jira/browse/FLINK-35981 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, transform rule (projection / filtering) doesn't support referencing one column more than once, which means if we write such a projection rule: transform: - projection: \*, age * age AS age_square filter: age < 18 OR age > 60 Janino compiler will crash with the following exception stack. Seems duplicated columns were added into Janino arguments list: Caused by: org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException: org.apache.flink.api.common.InvalidProgramException: Expression cannot be compiled. This is a bug. Please file an issue. Expression: import static org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;age * age at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863) at org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.compileExpression(TransformExpressionCompiler.java:46) ... 18 more Caused by: org.apache.flink.api.common.InvalidProgramException: Expression cannot be compiled. This is a bug. Please file an issue. Expression: import static org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;age * age at org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.lambda$compileExpression$0(TransformExpressionCompiler.java:62) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159) at org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049) ... 21 more Caused by: org.codehaus.commons.compiler.CompileException: Line 1, Column 82: Redefinition of parameter "age" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35980) Add transform test coverage in Integrated / E2e tests
yux created FLINK-35980: --- Summary: Add transform test coverage in Integrated / E2e tests Key: FLINK-35980 URL: https://issues.apache.org/jira/browse/FLINK-35980 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, there are no enough UT/E2e test cases to cover transform features like built-in scalar functions, temporary functions, complex expressions, etc. Adding them should cover more use-cases and reduce the possibility of accidentally introduced mistakes. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35912) SqlServer CDC doesn't chunk UUID-typed columns correctly
yux created FLINK-35912: --- Summary: SqlServer CDC doesn't chunk UUID-typed columns correctly Key: FLINK-35912 URL: https://issues.apache.org/jira/browse/FLINK-35912 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux As reported by GitHub user @LiPL2017, SqlServer CDC doesn't chunk UUID-typed columns correctly since UUID comparison isn't implemented correctly[1]. [1] https://learn.microsoft.com/en-us/sql/connect/ado-net/sql/compare-guid-uniqueidentifier-values?view=sql-server-ver16 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Comment Edited] (FLINK-35889) mongo cdc restore from expired resume token and job status still running but expect failed
[ https://issues.apache.org/jira/browse/FLINK-35889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868347#comment-17868347 ] yux edited comment on FLINK-35889 at 7/24/24 12:08 PM: --- Sure, I noticed that such behaviour could be controlled by `MongoSourceTask#sourceConfig#tolerateErrors` configuration. Could you please share more information about your configuration, like are you setting `errors.tolerance` to `none`? was (Author: JIRAUSER300787): Sure, I noticed that such behaviour could be controlled by `MongoSourceTask#sourceConfig#tolerateErrors` configuration. Should we add a new option for this? > mongo cdc restore from expired resume token and job status still running but > expect failed > -- > > Key: FLINK-35889 > URL: https://issues.apache.org/jira/browse/FLINK-35889 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Darren_Han >Priority: Blocker > > version > mongodb: 3.6 > flink: 1.14.6 > mongo-cdc:2.4.2 > > Restarting job from a savepoint/checpoint which contains expired resume > token/point, job status is always running and do not capture change data, > printing logs continuously. > Here is some example logs: > 2024-07-23 11:11:04,214 INFO > com.mongodb.kafka.connect.source.MongoSourceTask [] - An > exception occurred when trying to get the next item from the Change Stream > com.mongodb.MongoQueryException: Query failed with error code 280 and error > message 'resume of change stream was not possible, as the resume token was > not found. \{_data: BinData(0, "xx")}' > 2024-07-23 17:53:27,330 INFO > com.mongodb.kafka.connect.source.MongoSourceTask [] - An > exception occurred when trying to get the next item from the Change Stream > com.mongodb.MongoQueryException: Query failed with error code 280 and error > message 'resume of change notification was not possible, as the resume point > may no longer be in the oplog. ' on server > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35889) mongo cdc restore from expired resume token and job status still running but expect failed
[ https://issues.apache.org/jira/browse/FLINK-35889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868347#comment-17868347 ] yux commented on FLINK-35889: - Sure, I noticed that such behaviour could be controlled by `MongoSourceTask#sourceConfig#tolerateErrors` configuration. Should we add a new option for this? > mongo cdc restore from expired resume token and job status still running but > expect failed > -- > > Key: FLINK-35889 > URL: https://issues.apache.org/jira/browse/FLINK-35889 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Darren_Han >Priority: Blocker > > version > mongodb: 3.6 > flink: 1.14.6 > mongo-cdc:2.4.2 > > Restarting job from a savepoint/checpoint which contains expired resume > token/point, job status is always running and do not capture change data, > printing logs continuously. > Here is some example logs: > 2024-07-23 11:11:04,214 INFO > com.mongodb.kafka.connect.source.MongoSourceTask [] - An > exception occurred when trying to get the next item from the Change Stream > com.mongodb.MongoQueryException: Query failed with error code 280 and error > message 'resume of change stream was not possible, as the resume token was > not found. \{_data: BinData(0, "xx")}' > 2024-07-23 17:53:27,330 INFO > com.mongodb.kafka.connect.source.MongoSourceTask [] - An > exception occurred when trying to get the next item from the Change Stream > com.mongodb.MongoQueryException: Query failed with error code 280 and error > message 'resume of change notification was not possible, as the resume point > may no longer be in the oplog. ' on server > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35883) Wildcard projection inserts column at wrong place
yux created FLINK-35883: --- Summary: Wildcard projection inserts column at wrong place Key: FLINK-35883 URL: https://issues.apache.org/jira/browse/FLINK-35883 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux In this case where a wildcard projection was declared: ```yaml transform: - projection: \*, 'extras' AS extras ``` For upstream schema [a, b, c], transform operator should send [a, b, c, extras] to downstream. However, if another column 'd' was inserted at the end, upstream schema would be [a, b, c, d], and one might expect transformed schema to be [a, b, c, d, extras]. But it's [a, b, c, extras, d], since `AddColumnEvent{d, position=LAST}` was applied to [a, b, c, extras] after the projection process. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35883) Wildcard projection inserts column at wrong place
[ https://issues.apache.org/jira/browse/FLINK-35883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868022#comment-17868022 ] yux commented on FLINK-35883: - I'd like to investigate this based on FLINK-35272. > Wildcard projection inserts column at wrong place > - > > Key: FLINK-35883 > URL: https://issues.apache.org/jira/browse/FLINK-35883 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > > In this case where a wildcard projection was declared: > ```yaml > transform: > - projection: \*, 'extras' AS extras > ``` > For upstream schema [a, b, c], transform operator should send [a, b, c, > extras] to downstream. > However, if another column 'd' was inserted at the end, upstream schema would > be [a, b, c, d], and one might expect transformed schema to be [a, b, c, d, > extras]. But it's [a, b, c, extras, d], since `AddColumnEvent{d, > position=LAST}` was applied to [a, b, c, extras] after the projection process. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35634) Add a CDC quickstart utility
[ https://issues.apache.org/jira/browse/FLINK-35634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867744#comment-17867744 ] yux commented on FLINK-35634: - Paimon / kafka sink options have been added. > Add a CDC quickstart utility > > > Key: FLINK-35634 > URL: https://issues.apache.org/jira/browse/FLINK-35634 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Minor > > Currently, it's not very easy to initialize a CDC pipeline job from scratch, > requiring user to configure lots of Flink configurations manually. > This ticket suggests creating an extra component like `tiup` and `rustup` to > help user creating and submitting CDC job quickly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35868) Bump Mongo driver version to support Mongo 7.0+
yux created FLINK-35868: --- Summary: Bump Mongo driver version to support Mongo 7.0+ Key: FLINK-35868 URL: https://issues.apache.org/jira/browse/FLINK-35868 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, MongoDB CDC connector depends on mongodb-driver v4.9.1, which doesn't support Mongo Server 7.0+[1]. Upgrading dependency version would be nice since Mongo 7.0 has been released nearly a year ago. [1] https://www.mongodb.com/docs/drivers/java/sync/current/compatibility/ -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35634) Add a CDC quickstart utility
[ https://issues.apache.org/jira/browse/FLINK-35634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867204#comment-17867204 ] yux commented on FLINK-35634: - Just implemented a primitive demo for `cdcup` in https://github.com/yuxiqian/cdcup. Looking forward to any comments / suggestions from [~Leonard] and [~kunni]. > Add a CDC quickstart utility > > > Key: FLINK-35634 > URL: https://issues.apache.org/jira/browse/FLINK-35634 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: yux >Priority: Minor > > Currently, it's not very easy to initialize a CDC pipeline job from scratch, > requiring user to configure lots of Flink configurations manually. > This ticket suggests creating an extra component like `tiup` and `rustup` to > help user creating and submitting CDC job quickly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35634) Add a CDC quickstart utility
[ https://issues.apache.org/jira/browse/FLINK-35634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867228#comment-17867228 ] yux commented on FLINK-35634: - Seems Paimon / Kafka sink are missing. Will add them soon. > Add a CDC quickstart utility > > > Key: FLINK-35634 > URL: https://issues.apache.org/jira/browse/FLINK-35634 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Minor > > Currently, it's not very easy to initialize a CDC pipeline job from scratch, > requiring user to configure lots of Flink configurations manually. > This ticket suggests creating an extra component like `tiup` and `rustup` to > help user creating and submitting CDC job quickly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35852) When used through the transform function, the decimal(10,2) type field value in the MySQL source table becomes 100 times the original value after being transferred to
[ https://issues.apache.org/jira/browse/FLINK-35852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17866290#comment-17866290 ] yux commented on FLINK-35852: - After some quick debugging I noticed that this has been resolved when implementing FLINK-35272 (https://github.com/apache/flink-cdc/pull/3285). But I'm not quite sure the underlying reason behind this weird issue. Any thoughts from [~wink]? > When used through the transform function, the decimal(10,2) type field value > in the MySQL source table becomes 100 times the original value after being > transferred to the target table. > > > Key: FLINK-35852 > URL: https://issues.apache.org/jira/browse/FLINK-35852 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 > Environment: flink-1.18.0、 > flink-cdc-3.1.1、mysql-5.7.43、Doris-2.0.6、CentOS Linux release 8.5.2111 >Reporter: zheng_shengsheng >Priority: Major > Attachments: image-2024-07-16-14-46-53-982.png, > image-2024-07-16-14-48-07-981.png > > > When I use CDC's yaml mode to collect MySQL data to Doris, I use the latest > supported transform feature. > Finally, I found that the database table in MySQL collected and the > decimal(10,2) type fields in the Mysql source table were automatically > converted into decimal(19,0) in Doris, and then the value size became 100 > times the original size. As follows > > !image-2024-07-16-14-48-07-981.png! > !image-2024-07-16-14-46-53-982.png! > > The core part of yaml is as follows: > {code:java} > // code placeholder > source: > type: mysql > hostname: node2 > port: 3306 > username: > password: > tables: sys_canteen_consume_conf,sys_canteen_consume_rec,sys_order_course, > server-id: 5513 > connectionTimeZone: GMT+8 > scan.startup.mode: initialsink: > type: doris > fenodes: node3:8030 > username: admin > password: > table.create.properties.light_schema_change: true > table.create.properties.replication_num: 1transform: > - source-table: jeecg-boot2.sys_\.* > projection: concat('LYDB2','') as tenant_code, * > primary-keys: tenant_code,id > - source-table: jeecg-boot2.sys_user_depart > projection: concat('LYDB2','') as tenant_code, * > primary-keys: tenant_code,ID pipeline: > name: test,server-id-5513,flinkCDC-3.1.1 > parallelism: 1 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35852) When used through the transform function, the decimal(10,2) type field value in the MySQL source table becomes 100 times the original value after being transferred to
[ https://issues.apache.org/jira/browse/FLINK-35852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17866255#comment-17866255 ] yux commented on FLINK-35852: - Thanks for [~zheng_shengsheng]'s detailed report, I will nvestigate this. > When used through the transform function, the decimal(10,2) type field value > in the MySQL source table becomes 100 times the original value after being > transferred to the target table. > > > Key: FLINK-35852 > URL: https://issues.apache.org/jira/browse/FLINK-35852 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 > Environment: flink-1.18.0、 > flink-cdc-3.1.1、mysql-5.7.43、Doris-2.0.6、CentOS Linux release 8.5.2111 >Reporter: zheng_shengsheng >Priority: Major > Attachments: image-2024-07-16-14-46-53-982.png, > sp20240716_144157_519.png, sp20240716_144344_710.png > > > When I use CDC's yaml mode to collect MySQL data to Doris, I use the latest > supported transform feature. > Finally, I found that the database table in MySQL collected and the > decimal(10,2) type fields in the Mysql source table were automatically > converted into decimal(19,0) in Doris, and then the value size became 100 > times the original size. As follows > The core part of yaml is as follows: > {code:java} > // code placeholder > source: > type: mysql > hostname: node2 > port: 3306 > username: > password: > tables: sys_canteen_consume_conf,sys_canteen_consume_rec,sys_order_course, > server-id: 5513 > connectionTimeZone: GMT+8 > scan.startup.mode: initialsink: > type: doris > fenodes: node3:8030 > username: admin > password: > table.create.properties.light_schema_change: true > table.create.properties.replication_num: 1transform: > - source-table: jeecg-boot2.sys_\.* > projection: concat('LYDB2','') as tenant_code, * > primary-keys: tenant_code,id > - source-table: jeecg-boot2.sys_user_depart > projection: concat('LYDB2','') as tenant_code, * > primary-keys: tenant_code,ID pipeline: > name: test,server-id-5513,flinkCDC-3.1.1 > parallelism: 1 {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35491) [JUnit5 Migration] Module: Flink CDC modules
[ https://issues.apache.org/jira/browse/FLINK-35491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17866214#comment-17866214 ] yux commented on FLINK-35491: - I'd like to take this ticket if needed. > [JUnit5 Migration] Module: Flink CDC modules > > > Key: FLINK-35491 > URL: https://issues.apache.org/jira/browse/FLINK-35491 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: Muhammet Orazov >Priority: Major > > Migrate Junit4 tests to Junit5 for the following modules: > * flink-cdc-common > * flink-cdc-composer > * flink-cdc-runtime > * flink-cdc-connect/flink-cdc-pipeline-connectors > * flink-cdc-e2e-tests -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35834) Update Doris Dependency to 1.6.2 to use group commit.
[ https://issues.apache.org/jira/browse/FLINK-35834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865828#comment-17865828 ] yux commented on FLINK-35834: - +1, bumping doris connector to 1.6.2 should also resolve FLINK-35072 and FLINK-35090. > Update Doris Dependency to 1.6.2 to use group commit. > - > > Key: FLINK-35834 > URL: https://issues.apache.org/jira/browse/FLINK-35834 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 >Reporter: LvYanquan >Priority: Minor > Fix For: cdc-3.3.0 > > > Doris support streamload group commit to improve performance of writing. > Refer to https://github.com/apache/doris-flink-connector/pull/412 -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35813) Transform schema operator failed to trigger checkpoint with bounded source
yux created FLINK-35813: --- Summary: Transform schema operator failed to trigger checkpoint with bounded source Key: FLINK-35813 URL: https://issues.apache.org/jira/browse/FLINK-35813 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, transform schema operator clears its state field after a bounded job finishes. Any following checkpoint requests in snapshotState method will cause an NPE. Here's a minimum reproducible example: ``` source: type: values sink: type: values transform: ... ``` -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35805) Add "OpType" metadata column in transform
yux created FLINK-35805: --- Summary: Add "OpType" metadata column in transform Key: FLINK-35805 URL: https://issues.apache.org/jira/browse/FLINK-35805 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, there's no way to retrieve Operation type in transform projection / filtering rules. Users may want to tag data records or implement logical delete tags. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35783) Flink CDC Could not start the yaml Job
[ https://issues.apache.org/jira/browse/FLINK-35783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863766#comment-17863766 ] yux commented on FLINK-35783: - It is probably because pipeline jar & SQL jar has overlapping classes which will conflict. Using `ADD JAR` statement[1] in SQL job to load SQL jar dynamically instead of putting it into `FLINK_HOME/lib` might help. [1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/jar/ > Flink CDC Could not start the yaml Job > -- > > Key: FLINK-35783 > URL: https://issues.apache.org/jira/browse/FLINK-35783 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 >Reporter: layhuts >Priority: Major > > * flink版本1.19.1 > * flink CDC版本3.1.1 > * 在${FLINK_HOME}/lib下增加了 mysql-connector-java-8.0.27.jar 和 > flink-sql-connector-mysql-cdc-3.1.1.jar > * 在flink-cdc/lib下增加了flink-cdc-pipeline-connector-mysql-3.1.1.jar 和 > flink-cdc-pipeline-connector-doris-3.1.1.jar > 第一次使用 > {code:java} > bin/flink-cdc.sh ***.yaml {code} > > 提交作业提示java.lang.NoClassDefFoundError:org/apache/flink/cdc/runtime/typeutils/EventTypeInfo > {code:java} > Caused by: java.lang.NoClassDefFoundError: > org/apache/flink/cdc/runtime/typeutils/EventTypeInfo at > java.lang.Class.getDeclaredFields0(Native Method) at > java.lang.Class.privateGetDeclaredFields(Class.java:2583) at > java.lang.Class.getDeclaredField(Class.java:2068) at > java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1872) at > java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79) at > java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506) at > java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494) at > java.security.AccessController.doPrivileged(Native Method) at > java.io.ObjectStreamClass.(ObjectStreamClass.java:494) at > java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) at > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681) at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028) at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875) at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028) at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2209) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:508) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:466) at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:496) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:294) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:173) > ... 19 more Caused by: java.lang.ClassNotFoundException: > org.apache.flink.cdc.runtime.typeutils.EventTypeInfo at > java.net.URLClassLoader.findClass(URLClassLoader.java:387) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 52 more {code} > 按照提示在${FLINK_HOME}/lib下增加了 flink-cdc-runtime-3.1.1.jar 后再次运行出现如下问题: > {code:java} > Exception in thread "main" org.apache.flink.util.FlinkException: Failed to > execute job 'Sync mid_cloud Database to Doris'. > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2455) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421) > at > org.apache.flink.cdc.composer.f
[jira] [Resolved] (FLINK-35077) Add package license check for Flink CDC modules.
[ https://issues.apache.org/jira/browse/FLINK-35077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux resolved FLINK-35077. - Fix Version/s: cdc-3.1.1 Release Note: Fixed in https://github.com/apache/flink-cdc/commit/ca1470d5dd48f9238d13a350be99c174ff600b7b. Resolution: Fixed > Add package license check for Flink CDC modules. > > > Key: FLINK-35077 > URL: https://issues.apache.org/jira/browse/FLINK-35077 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Minor > Labels: pull-request-available > Fix For: cdc-3.1.1 > > > Currently, Flink project has CI scripts checking if dependencies with > incompatible licenses are introduced. > Flink CDC module heavily relies on external libraries (especially > connectors), so running similar checking scripts during every CI would be > helpful preventing developers introducing questionable dependencies by > accident. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35783) Flink CDC Could not start the yaml Job
[ https://issues.apache.org/jira/browse/FLINK-35783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863759#comment-17863759 ] yux commented on FLINK-35783: - Hi [~layhuts], As you're submitting a pipeline job, `flink-sql-connector-mysql-cdc-3.1.1.jar` for Flink SQL job is not necessary here. Also, `flink-cdc-runtime-3.1.1.jar` doesn't need to be copied either, since it has been packaged into the `flink-cdc-dist` jar. Please try adjusting jar libraries as follows and retry after *restarting Flink cluster*: In $FLINK_HOME/lib: * mysql-connector-java-8.0.27.jar In $FLINK_CDC_HOME/lib: * flink-cdc-pipeline-connector-mysql-3.1.1.jar * flink-cdc-pipeline-connector-doris-3.1.1.jar Please reply to this thread if the problem persists. > Flink CDC Could not start the yaml Job > -- > > Key: FLINK-35783 > URL: https://issues.apache.org/jira/browse/FLINK-35783 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.1 >Reporter: layhuts >Priority: Major > > * flink版本1.19.1 > * flink CDC版本3.1.1 > * 在${FLINK_HOME}/lib下增加了 mysql-connector-java-8.0.27.jar 和 > flink-sql-connector-mysql-cdc-3.1.1.jar > * 在flink-cdc/lib下增加了flink-cdc-pipeline-connector-mysql-3.1.1.jar 和 > flink-cdc-pipeline-connector-doris-3.1.1.jar > 第一次使用 > {code:java} > bin/flink-cdc.sh ***.yaml {code} > > 提交作业提示java.lang.NoClassDefFoundError:org/apache/flink/cdc/runtime/typeutils/EventTypeInfo > {code:java} > Caused by: java.lang.NoClassDefFoundError: > org/apache/flink/cdc/runtime/typeutils/EventTypeInfo at > java.lang.Class.getDeclaredFields0(Native Method) at > java.lang.Class.privateGetDeclaredFields(Class.java:2583) at > java.lang.Class.getDeclaredField(Class.java:2068) at > java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1872) at > java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79) at > java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506) at > java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494) at > java.security.AccessController.doPrivileged(Native Method) at > java.io.ObjectStreamClass.(ObjectStreamClass.java:494) at > java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391) at > java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681) at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028) at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875) at > java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028) at > java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2209) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) at > java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454) at > java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378) at > java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:508) at > java.io.ObjectInputStream.readObject(ObjectInputStream.java:466) at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539) > at > org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527) > at > org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67) > at > org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:496) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:294) > at > org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:173) > ... 19 more Caused by: java.lang.ClassNotFoundException: > org.apache.flink.cdc.runtime.typeutils.EventTypeInfo at > java.net.URLClassLoader.findClass(URLClassLoader.java:387) at > java.lang.ClassLoader.loadClass(ClassLoader.java:418) at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355) at > java.lang.ClassLoader.loadClass(ClassLoader.java:351) ... 52 more {code} > 按照提示在${FLINK_HOME}/lib下增加了 flink-cdc-runtime-3.1.1.jar 后再次运行出现如下问题: > {code:java} > Exception in thread "main" org.apache.flink.util.FlinkException: Failed to > execute job 'Sync mid_cloud Database to Doris'. > at > org.apache.f
[jira] [Created] (FLINK-35781) Make pipeline parallelism config optional
yux created FLINK-35781: --- Summary: Make pipeline parallelism config optional Key: FLINK-35781 URL: https://issues.apache.org/jira/browse/FLINK-35781 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, Flink CDC `PIPELINE_PARALLELISM` option is forcefully required in pipeline definition, which turns out to be unnecessary since Flink already has a fallback parallelism option. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35762) Cache hashCode for immutable map key classes
yux created FLINK-35762: --- Summary: Cache hashCode for immutable map key classes Key: FLINK-35762 URL: https://issues.apache.org/jira/browse/FLINK-35762 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux As suggested by [~kunni], hash code caching is a common optimization in Java world (for example, java.lang.String uses such optimization to reduce duplicate hash calculation since String as a hashmap key is quite common). Such optimization could be applied in CDC to optimize execution performance. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35736) Add E2e migration scripts for Flink CDC
yux created FLINK-35736: --- Summary: Add E2e migration scripts for Flink CDC Key: FLINK-35736 URL: https://issues.apache.org/jira/browse/FLINK-35736 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, there's no E2e migration tests in Flink CDC CI, and it's not very convenient for testers to run migration tests to verify RC release before each release. Adding a automation migration testing script could help verifying CDC backwards compatibility better. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35615) CDC3.0, the pipeline of mysql-doris , do not support the MySQL field type timestamp(6)
[ https://issues.apache.org/jira/browse/FLINK-35615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860925#comment-17860925 ] yux commented on FLINK-35615: - I could reproduce this issue by inserting a timestamp with no nanoSeconds part into a `TIMESTAMP(6)` field. Seems it's a MySQL source side issue. > CDC3.0, the pipeline of mysql-doris , do not support the MySQL field type > timestamp(6) > --- > > Key: FLINK-35615 > URL: https://issues.apache.org/jira/browse/FLINK-35615 > Project: Flink > Issue Type: Bug >Affects Versions: cdc-3.1.0 >Reporter: ConorZhao >Priority: Major > > 2024-06-14 16:45:24 > org.apache.flink.runtime.JobException: Recovery is suppressed by > NoRestartBackoffTimeStrategy > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176) > at > org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276) > at > org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269) > at > org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764) > at > org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741) > at > org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83) > at > org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309) > at > org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222) > at > org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85) > at > org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168) > at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) > at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) > at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) > at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) > at > org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) > at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) > at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) > at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) > at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) > at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) > at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280) > at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241) > at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253) > at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289) > at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056) > at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692) > at > java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157) > Caused by: java.lang.ArrayIndexOutOfBoundsException: 8 > at > com.ververica.cdc.common.data.binary.BinarySegmentUtils.getLongMultiSegments(BinarySegmentUtils.java:731) > at > com.ververica.cdc.common.data.binary.BinarySegmentUtils.getLong(BinarySegmentUtils.java:721) > at > com.ververica.cdc.common.data.binary.BinarySegmentUtils.readLocalZonedTimestampData(BinarySegmentUtils.java:1033) > at > com.ververica.cdc.common.data.binary.BinaryRecordData.getLocalZonedTimestampData(BinaryRecordData.java:244) > at > com.ververica.cdc.connectors.doris.sink.DorisRowConverter.lambda$createExternalConverter$cc45f215$1(DorisRowConverter.java:113) > at > com.v
[jira] [Commented] (FLINK-35654) Add Flink CDC verification guide in docs
[ https://issues.apache.org/jira/browse/FLINK-35654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860289#comment-17860289 ] yux commented on FLINK-35654: - Hi [~m.orazow], I've submitted a PR for this. Are you interested in reviewing this? > Add Flink CDC verification guide in docs > > > Key: FLINK-35654 > URL: https://issues.apache.org/jira/browse/FLINK-35654 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > > Currently, ASF voting process requires vast quality verification before > releasing any new versions, including: > * Tarball checksum verification > * Compile from source code > * Run pipeline E2e tests > * Run migration tests > * Check if jar was packaged with correct JDK version > * ... > Adding verification guide in Flink CDC docs should help developers verify > future releases more easily. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35700) Loosen CDC pipeline options validation
yux created FLINK-35700: --- Summary: Loosen CDC pipeline options validation Key: FLINK-35700 URL: https://issues.apache.org/jira/browse/FLINK-35700 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux FLINK-35121 adds pipeline configuration validation, rejecting any unknown options, which turns to be too strict, and it's not possible to create customized configuration extensions. Also, Flink doesn't reject unknown entries in flink-conf / config.yaml, just silently ignores them. It might be better for CDC to follow such behavior. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35664) Flink CDC pipeline transform supports mask function
[ https://issues.apache.org/jira/browse/FLINK-35664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856665#comment-17856665 ] yux commented on FLINK-35664: - Hi [~melin], just to confirm, are you suggesting adding built-in functions like `mask_inner` and `mask_outer` in MySQL ([https://dev.mysql.com/doc/refman/5.7/en/data-masking-usage.html|https://dev.mysql.com/doc/refman/5.7/en/data-masking-usage.html)?]) to transform API? > Flink CDC pipeline transform supports mask function > --- > > Key: FLINK-35664 > URL: https://issues.apache.org/jira/browse/FLINK-35664 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: melin >Priority: Major > Fix For: cdc-3.2.0 > > > Flink CDC pipeline transform supports mask function -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35654) Add Flink CDC verification guide in docs
yux created FLINK-35654: --- Summary: Add Flink CDC verification guide in docs Key: FLINK-35654 URL: https://issues.apache.org/jira/browse/FLINK-35654 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, ASF voting process requires vast quality verification before releasing any new versions, including: * Tarball checksum verification * Compile from source code * Run pipeline E2e tests * Run migration tests * Check if jar was packaged with correct JDK version * ... Adding verification guide in Flink CDC docs should help developers verify future releases more easily. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35651) Unable to call argument-less built-in functions with parentheses
yux created FLINK-35651: --- Summary: Unable to call argument-less built-in functions with parentheses Key: FLINK-35651 URL: https://issues.apache.org/jira/browse/FLINK-35651 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, CDC Pipeline transform module handles some built-in functions specially, including LOCALTIME, LOCALTIMESTAMP, CURRENT_TIME, CURRENT_DATE, and CURRENT_TIMESTAMP. In SystemFunctionUtils implementation, they have arguments actually, but are silently bound to current _{_}epoch_time{_}_ and _{_}time_zone{_}_ variable. However, user could not call these functions with parentheses directly because function signature doesn't match. For example, ```yaml transform: - projection: CURRENT_TIMESTAMP AS TS ``` is OK since it was automatically expanded to something like `currentTimestamp(_{_}epoch_time{_}{_}, __time_zone{_}_)`. But for this definition: ```yaml transform: - projection: CURRENT_TIMESTAMP() AS TS ``` Transform will fail at runtime: > org.apache.calcite.runtime.CalciteContextException: From line 1, column 12 to > line 1, column 30: No match found for function signature CURRENT_TIMESTAMP() which could be quite confusing to users. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35648) Pipeline job doesn't support multiple routing
yux created FLINK-35648: --- Summary: Pipeline job doesn't support multiple routing Key: FLINK-35648 URL: https://issues.apache.org/jira/browse/FLINK-35648 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, any upstream could be routed at most once, which means if we wrote such route definition: routes: - source-table: db.(A|B) sink-table: terminal.one - source0table: db.(B|C) sink-table: terminal.two Any upstream schema / data changes from db.B will be sent to terminal.one {*}only{*}, not to terminal.two since it has been handled by the first route rule. This ticket suggests adding a route behavior option (FIRST_MATCH / COMPLETE) to configure if all route rules should be applied or only the first matched rule (for backwards compatibility.). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35647) Pipeline route rule supports placeholder replacement
yux created FLINK-35647: --- Summary: Pipeline route rule supports placeholder replacement Key: FLINK-35647 URL: https://issues.apache.org/jira/browse/FLINK-35647 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, we must provide explicit sink-table in pipeline route rules, which means if we'd like to route all tables in specific database and change names in pattern, there's no choice but write all rules separately, which is a chore. There's already an implementation ([https://github.com/apache/flink-cdc/pull/2908)] to use some placeholder syntax like (db.source<> -> db.sink<>) to perform replacement in batch, which could solve this problem. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35634) Add a CDC quickstart utility
yux created FLINK-35634: --- Summary: Add a CDC quickstart utility Key: FLINK-35634 URL: https://issues.apache.org/jira/browse/FLINK-35634 Project: Flink Issue Type: New Feature Components: Flink CDC Reporter: yux Currently, it's not very easy to initialize a CDC pipeline job from scratch, requiring user to configure lots of Flink configurations manually. This ticket suggests creating an extra component like `tiup` and `rustup` to help user creating and submitting CDC job quickly. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35633) Early verification and clearer error message of pipeline YAML definition
yux created FLINK-35633: --- Summary: Early verification and clearer error message of pipeline YAML definition Key: FLINK-35633 URL: https://issues.apache.org/jira/browse/FLINK-35633 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, little verifications are applied before submitting YAML pipeline job to Flink cluster, and errors in Transform / Route rules will be exposed as a runtime exception, which is hard to debug and investigate. This ticket suggests performing Transform / Route expression validation in CDC CLI before submitting YAML jobs, providing clearer and more descriptive error messages in early. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35261) Flink CDC pipeline transform doesn't support decimal-type comparison
[ https://issues.apache.org/jira/browse/FLINK-35261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-35261: Parent: FLINK-34877 Issue Type: Sub-task (was: Improvement) > Flink CDC pipeline transform doesn't support decimal-type comparison > > > Key: FLINK-35261 > URL: https://issues.apache.org/jira/browse/FLINK-35261 > Project: Flink > Issue Type: Sub-task > Components: Flink CDC >Reporter: yux >Priority: Major > > It would be convenient if we can filter by comparing decimal to number > literals like: > {{transform:}} > {{ - source-table: XXX}} > {{ filter: price > 50}} > where price is a Decimal typed column. However currently such expression is > not supported, and a runtime exception will be thrown as follows: > > Caused by: org.apache.flink.api.common.InvalidProgramException: Expression > cannot be compiled. This is a bug. Please file an issue. > Expression: import static > org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;PRICEALPHA > 50 > at > org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.lambda$compileExpression$0(TransformExpressionCompiler.java:62) > > ~[blob_p-c3b34ad5a5a3a0bc443bb738e308b20b1da04a1f-8d419e2c927baeb0eeb40fb35c0a52dc:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.compileExpression(TransformExpressionCompiler.java:46) > > ~[blob_p-c3b34ad5a5a3a0bc443bb738e308b20b1da04a1f-8d419e2c927baeb0eeb40fb35c0a52dc:3.2-SNAPSHOT] > ... 17 more > Caused by: org.codehaus.commons.compiler.CompileException: Line 1, Column 89: > Cannot compare types "java.math.BigDecimal" and "int" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35261) Flink CDC pipeline transform doesn't support decimal-type comparison
[ https://issues.apache.org/jira/browse/FLINK-35261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854308#comment-17854308 ] yux commented on FLINK-35261: - Thanks for [~wink] 's clarification! I'll put this ticket as a subtask of FLINK-34877 since it could be resolved by type conversion. > Flink CDC pipeline transform doesn't support decimal-type comparison > > > Key: FLINK-35261 > URL: https://issues.apache.org/jira/browse/FLINK-35261 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Major > > It would be convenient if we can filter by comparing decimal to number > literals like: > {{transform:}} > {{ - source-table: XXX}} > {{ filter: price > 50}} > where price is a Decimal typed column. However currently such expression is > not supported, and a runtime exception will be thrown as follows: > > Caused by: org.apache.flink.api.common.InvalidProgramException: Expression > cannot be compiled. This is a bug. Please file an issue. > Expression: import static > org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;PRICEALPHA > 50 > at > org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.lambda$compileExpression$0(TransformExpressionCompiler.java:62) > > ~[blob_p-c3b34ad5a5a3a0bc443bb738e308b20b1da04a1f-8d419e2c927baeb0eeb40fb35c0a52dc:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863) > > ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT] > at > org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.compileExpression(TransformExpressionCompiler.java:46) > > ~[blob_p-c3b34ad5a5a3a0bc443bb738e308b20b1da04a1f-8d419e2c927baeb0eeb40fb35c0a52dc:3.2-SNAPSHOT] > ... 17 more > Caused by: org.codehaus.commons.compiler.CompileException: Line 1, Column 89: > Cannot compare types "java.math.BigDecimal" and "int" -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35242) Add per-type schema evolution behavior configuration
[ https://issues.apache.org/jira/browse/FLINK-35242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-35242: Description: > Update: Changed `fine grained` terminology to avoid confusion between > fine-grained job resource management feature. Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or throwing an exception. However such configuration strategy doesn't cover all user cases and requires want more fine-grained strategy configuration. This ticket suggests adding one more strategy "try_evolve" or "evolve_when_available". It's basically like "evolving" option, but doesn't throw an exception if such operation fails, which provides more flexibility. Also, this ticket suggests allowing user to configure per-schema-event strategy, so users could evolve some types of event (like rename column) and reject some unwelcomed events (like truncate table, remove column). was: Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or throwing an exception. However such configuration strategy doesn't cover all user cases and requires want more fine-grained strategy configuration. This ticket suggests adding one more strategy "try_evolve" or "evolve_when_available". It's basically like "evolving" option, but doesn't throw an exception if such operation fails, which provides more flexibility. Also, this ticket suggests allowing user to configure per-schema-event strategy, so users could evolve some types of event (like rename column) and reject some dangerous events (like truncate table, remove column). Summary: Add per-type schema evolution behavior configuration (was: Add fine-grained schema evolution strategy) > Add per-type schema evolution behavior configuration > > > Key: FLINK-35242 > URL: https://issues.apache.org/jira/browse/FLINK-35242 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Assignee: yux >Priority: Major > Labels: pull-request-available > Fix For: cdc-3.2.0 > > Original Estimate: 336h > Remaining Estimate: 336h > > > Update: Changed `fine grained` terminology to avoid confusion between > > fine-grained job resource management feature. > Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or > throwing an exception. However such configuration strategy doesn't cover all > user cases and requires want more fine-grained strategy configuration. > This ticket suggests adding one more strategy "try_evolve" or > "evolve_when_available". It's basically like "evolving" option, but doesn't > throw an exception if such operation fails, which provides more flexibility. > Also, this ticket suggests allowing user to configure per-schema-event > strategy, so users could evolve some types of event (like rename column) and > reject some unwelcomed events (like truncate table, remove column). -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35527) Polish quickstart guide & clean stale links in docs
yux created FLINK-35527: --- Summary: Polish quickstart guide & clean stale links in docs Key: FLINK-35527 URL: https://issues.apache.org/jira/browse/FLINK-35527 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: yux Fix For: cdc-3.2.0 Currently, there's still a lot of stale links in Flink CDC docs, including some download links pointing to Ververica maven repositories. Need to clean them up to avoid user conflicts. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35503) OracleE2eITCase fails with error ORA-12528 on Mac M2
[ https://issues.apache.org/jira/browse/FLINK-35503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851479#comment-17851479 ] yux commented on FLINK-35503: - Hi [~sakkurn], Currently [~gongzhongqiang] tweaks Oracle EE docker image for running E2e tests and repackaged it as goodboy008/oracle-19.3.0-ee:non-cdb. However it supports amd64 only, and requires Rosetta (or something similar) to run. Oracle community discussions[2] revealed that ORA-03113 also appears when running cross-architecture emulation, perhaps caused by a bug in Rosetta 2, but is hard to investigate and debug since they're not open sourced. As this PR [3] has brought official arm64 architecture support to Oracle docker image, I think we can repackage Oracle E2e docker image to support both amd64 and arm64 architecture, which could help Mac developers run E2e tests easier. I'd like to help implementing this if needed. [1] https://hub.docker.com/r/gvenzl/oracle-xe/tags [2] https://github.com/oracle/docker-images/discussions/1951 [3] https://github.com/oracle/docker-images/pull/2659 > OracleE2eITCase fails with error ORA-12528 on Mac M2 > > > Key: FLINK-35503 > URL: https://issues.apache.org/jira/browse/FLINK-35503 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 > Environment: > * Mac M2 (Apple Silicon) > * using docker desktop with Rosetta enabled for amd64 emulation > >Reporter: Saketh Kurnool >Priority: Blocker > Attachments: com.ververica.cdc.connectors.tests.OracleE2eITCase.txt, > oracle-docker-setup-logs.txt > > > Hello Flink CDC community, > I am attempting to run `OracleE2eITCase` (in the cdc source connector e2e > tests), and I am running into the following runtime exception: > {code:java} > java.sql.SQLException: > Listener refused the connection with the following error: > ORA-12528, TNS:listener: all appropriate instances are blocking new > connections > > at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:854) > at > oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793) > at > oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57) > at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747) > at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:562) > at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:677) > at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:228) > at > com.ververica.cdc.connectors.tests.OracleE2eITCase.getOracleJdbcConnection(OracleE2eITCase.java:197) > at > com.ververica.cdc.connectors.tests.OracleE2eITCase.testOracleCDC(OracleE2eITCase.java:149) > at java.base/java.lang.reflect.Method.invoke(Method.java:567) > at > org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45) > at > org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29) > Caused by: oracle.net.ns.NetException: Listener refused the connection with > the following error: > ORA-12528, TNS:listener: all appropriate instances are blocking new > connections > > at oracle.net.ns.NSProtocolNIO.negotiateConnection(NSProtocolNIO.java:284) > at oracle.net.ns.NSProtocol.connect(NSProtocol.java:340) > at oracle.jdbc.driver.T4CConnection.connect(T4CConnection.java:1596) > at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:588) > ... 11 more{code} > I have attached the test results to this issue. > `OracleE2eITCase` runs the `goodboy008/oracle-19.3.0-ee:non-cdb` docker > image. I am able to reproduce the same issue when I run this docker image > locally - my observation is that dockerized Oracle DB instance is not being > set up properly, as I notice another ORA in the setup logs (`ORA-03113: > end-of-file on communication channel`). I have also attached the logs from > the docker image setup to this issue. To reproduce the ORA-12528 issue > locally, I: > * ran: `docker run goodboy008/oracle-19.3.0-ee:non-cdb` > * ssh'ed into the db pod > * ran: `sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba` > Any insight/workaround on getting this e2e test and the docker image running > on my machine would be much appreciated. I'm also happy to provide any other > information regarding my setup in the comments. Thank you! > -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35464) Flink CDC 3.1 breaks operator state compatiblity
[ https://issues.apache.org/jira/browse/FLINK-35464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849704#comment-17849704 ] yux commented on FLINK-35464: - [~Leonard] I'm willing to investigate this. > Flink CDC 3.1 breaks operator state compatiblity > > > Key: FLINK-35464 > URL: https://issues.apache.org/jira/browse/FLINK-35464 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: yux >Priority: Critical > Fix For: cdc-3.1.1 > > > Flink CDC 3.1 changed how SchemaRegistry [de]serializes state data, which > causes any checkpoint states saved with earlier version could not be restored > in version 3.1.0. > This could be resolved by adding serialization versioning control logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35464) Flink CDC 3.1 breaks operator state compatiblity
yux created FLINK-35464: --- Summary: Flink CDC 3.1 breaks operator state compatiblity Key: FLINK-35464 URL: https://issues.apache.org/jira/browse/FLINK-35464 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: yux Fix For: cdc-3.1.1 Flink CDC 3.1 changed how SchemaRegistry [de]serializes state data, which causes any checkpoint states saved with earlier version could not be restored in version 3.1.0. This could be resolved by adding serialization versioning control logic. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35441) Add CDC upgrade compatibility tests
[ https://issues.apache.org/jira/browse/FLINK-35441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-35441: Issue Type: Improvement (was: Bug) > Add CDC upgrade compatibility tests > --- > > Key: FLINK-35441 > URL: https://issues.apache.org/jira/browse/FLINK-35441 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Major > > Currently, there's no test cases to guarantee checkpoint state compatibility > between different CDC versions like Flink's SerializerUpgradeTestBase. > Adding it should help CDC users upgrading version with more confidence. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35441) Add CDC upgrade compatibility tests
[ https://issues.apache.org/jira/browse/FLINK-35441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849142#comment-17849142 ] yux commented on FLINK-35441: - [~renqs] I'd love to take this ticket if needed. > Add CDC upgrade compatibility tests > --- > > Key: FLINK-35441 > URL: https://issues.apache.org/jira/browse/FLINK-35441 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > > Currently, there's no test cases to guarantee checkpoint state compatibility > between different CDC versions like Flink's SerializerUpgradeTestBase. > Adding it should help CDC users upgrading version with more confidence. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35441) Add CDC upgrade compatibility tests
yux created FLINK-35441: --- Summary: Add CDC upgrade compatibility tests Key: FLINK-35441 URL: https://issues.apache.org/jira/browse/FLINK-35441 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Currently, there's no test cases to guarantee checkpoint state compatibility between different CDC versions like Flink's SerializerUpgradeTestBase. Adding it should help CDC users upgrading version with more confidence. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35415) CDC Fails to create sink with Flink 1.19
yux created FLINK-35415: --- Summary: CDC Fails to create sink with Flink 1.19 Key: FLINK-35415 URL: https://issues.apache.org/jira/browse/FLINK-35415 Project: Flink Issue Type: Bug Components: Flink CDC Affects Versions: cdc-3.1.0 Reporter: yux Fix For: cdc-3.2.0 Currently, Flink CDC doesn't work with Flink 1.19 with the following exception: Exception in thread "main" java.lang.NoSuchMethodError: 'void org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory.(org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink, boolean, boolean)' The reason is Flink CDC uses Flink @Internal API and it was changed in 1.19 update. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35385) upgrader flink dependency version to 1.19
[ https://issues.apache.org/jira/browse/FLINK-35385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848411#comment-17848411 ] yux commented on FLINK-35385: - Seems https://github.com/apache/flink-cdc/issues/3327 was caused by this since CommitterOperatorFactory API has changed in Flink 1.19. [~renqs] I'd like to take this if needed. > upgrader flink dependency version to 1.19 > - > > Key: FLINK-35385 > URL: https://issues.apache.org/jira/browse/FLINK-35385 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Affects Versions: cdc-3.1.0 >Reporter: Hongshun Wang >Priority: Minor > Fix For: cdc-3.2.0 > > > Flink 1.19 was released on 2024-03-18 and the connectors have not yet > caught up. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Commented] (FLINK-35329) Support modify table/column comments
[ https://issues.apache.org/jira/browse/FLINK-35329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845203#comment-17845203 ] yux commented on FLINK-35329: - Hi [~melin], I'm trying to add AlterTableCommentEvent / AlterColumnCommentEvent support in PR https://github.com/apache/flink-cdc/pull/3296, PTAL. > Support modify table/column comments > > > Key: FLINK-35329 > URL: https://issues.apache.org/jira/browse/FLINK-35329 > Project: Flink > Issue Type: New Feature > Components: Flink CDC >Affects Versions: cdc-3.2.0 >Reporter: melin >Priority: Major > > The database has ddl sql to change the comment of the table/column. It is > recommended to add the change comment event. You can also set unsynchronized > comments. Some scenarios cannot synchronize comments. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Closed] (FLINK-35312) Insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes_
[ https://issues.apache.org/jira/browse/FLINK-35312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux closed FLINK-35312. --- Resolution: Done This ticket has been resolved by https://github.com/apache/flink-cdc/pull/2551. > Insufficient number of arguments were supplied for the procedure or function > cdc.fn_cdc_get_all_changes_ > > > Key: FLINK-35312 > URL: https://issues.apache.org/jira/browse/FLINK-35312 > Project: Flink > Issue Type: Bug > Components: Flink CDC >Reporter: yux >Priority: Major > > h3. Flink version > 1.17.0 > h3. Flink CDC version > 2.4.1 > h3. Database and its version > sql server 2014 > h3. Minimal reproduce step > 1 > h3. What did you expect to see? > Caused by: java.lang.RuntimeException: SplitFetcher thread 22 received > unexpected exception while polling the records > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) > at > org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) > at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > ... 1 more > Caused by: org.apache.kafka.connect.errors.RetriableException: An exception > occurred in the change event producer. This connector will be restarted. > at > io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46) > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:458) > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) > at > com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$LsnSplitReadTask.execute(SqlServerStreamFetchTask.java:161) > at > com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerScanFetchTask.execute(SqlServerScanFetchTask.java:123) > at > com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:95) > ... 5 more > Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: An insufficient > number of arguments were supplied for the procedure or function > cdc.fn_cdc_get_all_changes_ ... . > at > com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:265) > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5471) > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1794) > at > com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1052) > at > io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63) > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:269) > at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) > at > io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) > at > io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:251) > ... 9 more -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35316) Add CDC e2e test case for on Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-35316: Priority: Minor (was: Major) > Add CDC e2e test case for on Flink 1.19 > --- > > Key: FLINK-35316 > URL: https://issues.apache.org/jira/browse/FLINK-35316 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Minor > > Since Flink 1.19 has been generally available, Flink CDC is expected to be > used with it. E2e test cases should cover this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Updated] (FLINK-35316) Add CDC e2e test case for on Flink 1.19
[ https://issues.apache.org/jira/browse/FLINK-35316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] yux updated FLINK-35316: Issue Type: Improvement (was: Bug) > Add CDC e2e test case for on Flink 1.19 > --- > > Key: FLINK-35316 > URL: https://issues.apache.org/jira/browse/FLINK-35316 > Project: Flink > Issue Type: Improvement > Components: Flink CDC >Reporter: yux >Priority: Major > > Since Flink 1.19 has been generally available, Flink CDC is expected to be > used with it. E2e test cases should cover this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35317) Flink CDC CLI Supports submitting multiple YAML job at once
yux created FLINK-35317: --- Summary: Flink CDC CLI Supports submitting multiple YAML job at once Key: FLINK-35317 URL: https://issues.apache.org/jira/browse/FLINK-35317 Project: Flink Issue Type: Improvement Components: Flink CDC Reporter: yux Currently, Flink CDC CLI only allows submitting one YAML pipeline job each time. It would be convenient if users can submit multiple .yml files at once like this: {{./bin/flink-cdc.sh job1.yml job2.yml --flink-home /opt/flink ...}} -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35316) Add CDC e2e test case for on Flink 1.19
yux created FLINK-35316: --- Summary: Add CDC e2e test case for on Flink 1.19 Key: FLINK-35316 URL: https://issues.apache.org/jira/browse/FLINK-35316 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux Since Flink 1.19 has been generally available, Flink CDC is expected to be used with it. E2e test cases should cover this. -- This message was sent by Atlassian Jira (v8.20.10#820010)
[jira] [Created] (FLINK-35312) Insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes_
yux created FLINK-35312: --- Summary: Insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes_ Key: FLINK-35312 URL: https://issues.apache.org/jira/browse/FLINK-35312 Project: Flink Issue Type: Bug Components: Flink CDC Reporter: yux h3. Flink version 1.17.0 h3. Flink CDC version 2.4.1 h3. Database and its version sql server 2014 h3. Minimal reproduce step 1 h3. What did you expect to see? Caused by: java.lang.RuntimeException: SplitFetcher thread 22 received unexpected exception while polling the records at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165) at org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more Caused by: org.apache.kafka.connect.errors.RetriableException: An exception occurred in the change event producer. This connector will be restarted. at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:458) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$LsnSplitReadTask.execute(SqlServerStreamFetchTask.java:161) at com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerScanFetchTask.execute(SqlServerScanFetchTask.java:123) at com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:95) ... 5 more Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: An insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes_ ... . at com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:265) at com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5471) at com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1794) at com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1052) at io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:269) at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606) at io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329) at io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:251) ... 9 more -- This message was sent by Atlassian Jira (v8.20.10#820010)