[jira] [Created] (FLINK-36565) Pipeline YAML should allow merging decimal with different precisions

2024-10-17 Thread yux (Jira)
yux created FLINK-36565:
---

 Summary: Pipeline YAML should allow merging decimal with different 
precisions
 Key: FLINK-36565
 URL: https://issues.apache.org/jira/browse/FLINK-36565
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, it's not possible to merge two Decimal-typed fields with different 
precision or scaling. Since DECIMAL(p1, s1) and DECIMAL(p2, s2) could be 
converted to DECIMAL(MAX(p1 - s1, p2 - s2) + MAX(s1, s2), MAX(s1, s2) without 
any loss, this converting path seems reasonable and worth being added.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36558) MySQL CDC Fails to parse array-typed key index binlog created between 8.0.17 and 8.0.18

2024-10-16 Thread yux (Jira)
yux created FLINK-36558:
---

 Summary: MySQL CDC Fails to parse array-typed key index binlog 
created between 8.0.17 and 8.0.18
 Key: FLINK-36558
 URL: https://issues.apache.org/jira/browse/FLINK-36558
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


MySQL 8.0.17 and 8.0.18 writes Array-typed key index differently into Binlog.
 * MySQL 8.0.17 used a different internal enum value for TYPED_ARRAY column type
 * MySQL 8.0.17 and 8.0.18 writes extraneous bytes after expected column 
metadata

It has been fixed since MySQL 8.0.19. Since MySQL takes Binlog format as an 
internal implementation detail[1] and doesn't guarantee the backwards 
compatibility, MySQL CDC has to deal with the versioning problem itself.

[1]   https://bugs.mysql.com/bug.php?id=105545



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36075) Add paginate partition strategy for mongodb connector

2024-10-15 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36075?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17889926#comment-17889926
 ] 

yux commented on FLINK-36075:
-

Sure! Please assign it to me.

> Add paginate partition strategy for mongodb connector
> -
>
> Key: FLINK-36075
> URL: https://issues.apache.org/jira/browse/FLINK-36075
> Project: Flink
>  Issue Type: Improvement
>  Components: Connectors / MongoDB
>Affects Versions: mongodb-1.2.0
>Reporter: Jiabao Sun
>Assignee: Jiabao Sun
>Priority: Major
> Fix For: mongodb-1.3.0
>
>
> Add paginate partition strategy for mongodb connector



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36515) Adds high-precision TIME type support in YAML pipeline

2024-10-11 Thread yux (Jira)
yux created FLINK-36515:
---

 Summary: Adds high-precision TIME type support in YAML pipeline
 Key: FLINK-36515
 URL: https://issues.apache.org/jira/browse/FLINK-36515
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, CDC treats `TimeType` in the same way as Flink Table API, and 
supports time types with zero-precision, but MySQL upstream may generate time 
data with up to precision 9.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36514) Unable to override include/exclude schema types in lenient mode

2024-10-11 Thread yux (Jira)
yux created FLINK-36514:
---

 Summary: Unable to override include/exclude schema types in 
lenient mode
 Key: FLINK-36514
 URL: https://issues.apache.org/jira/browse/FLINK-36514
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


If schema evolution behavior is set to LENIENT, Truncate / Drop table events 
will be ignored by default. However, there's currently no way for users to 
override this behavior due to the following code:

```java
if (excludedSETypes.isEmpty()
&& SchemaChangeBehavior.LENIENT.equals(schemaChangeBehavior)) {
// In lenient mode, we exclude DROP_TABLE and TRUNCATE_TABLE by default. This 
could be
// overridden by manually specifying excluded types.
Stream.of(SchemaChangeEventType.DROP_TABLE, 
SchemaChangeEventType.TRUNCATE_TABLE)
.map(SchemaChangeEventType::getTag)
.forEach(excludedSETypes::add);
}
```
 
If one wants to exclude no types, it's actually not possible since passing `[]` 
is equivalent to passing nothing, and `DROP` and `TRUNCATE` events will still 
be ignored.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36475) YAML Transform supports case-insensitive column names conversion

2024-10-10 Thread yux (Jira)
yux created FLINK-36475:
---

 Summary: YAML Transform supports case-insensitive column names 
conversion
 Key: FLINK-36475
 URL: https://issues.apache.org/jira/browse/FLINK-36475
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Some YAML pipeline sink has a case-insensitive semantic (Paimon, Hive, etc.), 
which means all given columns should be converted to lowercased letters. It's a 
chore for users to write individual transform rules just for changing column 
names, and a meta-option to automatically downcase column names would be nice.

> Originally discussed in 
> https://github.com/apache/flink-cdc/discussions/3612#discussioncomment-10910326

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36474) YAML Table-merging route should accept more type widening cases

2024-10-10 Thread yux (Jira)
yux created FLINK-36474:
---

 Summary: YAML Table-merging route should accept more type widening 
cases
 Key: FLINK-36474
 URL: https://issues.apache.org/jira/browse/FLINK-36474
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, there are only three specific type widening cases available when 
performing table-merging routes:
 * Integer Type Promotions
 * Float to Double Promotion
 * Char-like Type to String

Any other types (including some reasonable ones, like temporal types) will be 
rejected during wider type inferencing, which is a little surprising, since 
table-merging route could tolerate columns with various names and counts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36461) YAML job failed to schema evolve with unmatched transform tables

2024-10-09 Thread yux (Jira)
yux created FLINK-36461:
---

 Summary: YAML job failed to schema evolve with unmatched transform 
tables
 Key: FLINK-36461
 URL: https://issues.apache.org/jira/browse/FLINK-36461
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, such transform configuration will fail for any schema change events 
from tables except `foo.bar.baz`:

```yaml

transform:

  - source-table: foo.bar.baz

    projection: \*

```

The exception message is as follows:

 

{{2024-10-10 11:04:43:
java.lang.ArrayIndexOutOfBoundsException: -1
at java.util.ArrayList.elementData(ArrayList.java:424)
at java.util.ArrayList.get(ArrayList.java:437)
at 
org.apache.flink.cdc.common.utils.SchemaUtils.lambda$transformSchemaChangeEvent$11(SchemaUtils.java:371)
at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
at 
java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1384)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at 
java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472)
at 
java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566)
at 
org.apache.flink.cdc.common.utils.SchemaUtils.transformSchemaChangeEvent(SchemaUtils.java:386)
at 
org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.cacheChangeSchema(PreTransformOperator.java:272)
at 
org.apache.flink.cdc.runtime.operators.transform.PreTransformOperator.processElement(PreTransformOperator.java:248)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:310)
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at 
org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(IteratorSourceReaderBase.java:103)
at 
org.apache.flink.cdc.connectors.values.source.ValuesDataSource$EventIteratorReader.pollNext(ValuesDataSource.java:294)
at 
org.apache.flink.streaming.api.operators.SourceOperator.pollNext(SourceOperator.java:779)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:457)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:70)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:616)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1071)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1020)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:959)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:938)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:751)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:567)
at java.lang.Thread.run(Thread.java:879)}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36453) Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver compatbility

2024-10-09 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17887877#comment-17887877
 ] 

yux commented on FLINK-36453:
-

I'd love to take this ticket. [~leonard]

> Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver compatbility
> ---
>
> Key: FLINK-36453
> URL: https://issues.apache.org/jira/browse/FLINK-36453
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> DBZ-6502 fixes Oracle JDBC 23.x compatibility, but it wasn't shipped in 
> versions prior to 2.2.
> We may backport this bugfix for now before updating to Debezium 2.x.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36453) Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver compatbility

2024-10-09 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-36453:

Description: 
DBZ-6502 fixes Oracle JDBC 23.x compatibility, but it wasn't shipped in 
versions prior to 2.2.

We may backport this bugfix for now before updating to Debezium 2.x.

> Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver compatbility
> ---
>
> Key: FLINK-36453
> URL: https://issues.apache.org/jira/browse/FLINK-36453
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> DBZ-6502 fixes Oracle JDBC 23.x compatibility, but it wasn't shipped in 
> versions prior to 2.2.
> We may backport this bugfix for now before updating to Debezium 2.x.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36453) Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver compatbility

2024-10-09 Thread yux (Jira)
yux created FLINK-36453:
---

 Summary: Backport DBZ-6502 patch to fix Oracle 23.x JDBC driver 
compatbility
 Key: FLINK-36453
 URL: https://issues.apache.org/jira/browse/FLINK-36453
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux






--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36408) MySQL pipeline connector could not work with FLOAT type with precision

2024-09-29 Thread yux (Jira)
yux created FLINK-36408:
---

 Summary: MySQL pipeline connector could not work with FLOAT type 
with precision
 Key: FLINK-36408
 URL: https://issues.apache.org/jira/browse/FLINK-36408
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0
Reporter: yux


Currently, MySQL SourceRecordEmitter treats FLOAT(x) (float with explicit 
precision) as DOUBLE, but MetadataAccessor reports a plain "FLOAT", which 
causes discrepancy and corrupted deserialized data.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36403) K8s native mode Use ConfigMap to get a better experience

2024-09-29 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36403?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17885758#comment-17885758
 ] 

yux commented on FLINK-36403:
-

+1 that repackaging images are not convenient and necessary considering most 
parts are unchanged, and we may be able to public generic docker images without 
requiring users to build their own ones. Just two minor questions:

* How will this approach handle dependencies (including JDBC drivers, UDF jars, 
etc.)?
* Currently, deploy target is configured via CLI arguments. Will there be some 
discrepancies if we have `deploy:` configuration in YAML files?

P.S. I guess the design purpose is "Pipeline description YAML" should be a 
target-neutral abstract representation, and one may deploy a job everywhere 
(for example, in local cluster when testing, and remote when deploying) without 
changing YAML jobs.

cc [~renqs] [~ConradJam]

> K8s native mode Use ConfigMap to get a better experience
> 
>
> Key: FLINK-36403
> URL: https://issues.apache.org/jira/browse/FLINK-36403
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: gaoyan
>Priority: Major
>
> Currently, the operation of CDC's k8s native mode requires users to repackage 
> the YAML into the Docker image each time to update the task, which is a poor 
> experience. However, this issue can be resolved with a very simple method.
> When submitting a task, create a ConfigMap and write the YAML content into 
> it, then mount it to the container. At the same time, there is no need to 
> worry about resource release and deletion . The ConfigMap can be strongly 
> bound to the container, and k8s will automatically handle these issues for 
> us. If there are security concerns, we can also use a Secret.
> This method is simple and convenient, and does not require additional tools 
> such as a k8s Operator.
> Regarding deployment, I also have some suggestions. There are many 
> customizable parameters that can be configured for k8s native submissions. 
> Currently, doing these things only through CLI args is very difficult for 
> users. Writing them into the YAML for configuration will provide a better 
> experience, as shown below:
> {code:java}
> source:
>  x
> sink:
>  xx
> pipeline:
>  
> deploy:
>   target: k8s-native
>   jar: "/opt/cdc/cdc.jar"
>   namespace:
>   account:
>   {code}
> Would this approach be more user-friendly?
> I have done similar things in other communities. If the CDC community thinks 
> this idea is feasible, I am willing to work on this issue and submit a PR.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36221) Add specification about CAST ... AS ... built-in functions

2024-09-04 Thread yux (Jira)
yux created FLINK-36221:
---

 Summary: Add specification about CAST ... AS ... built-in functions
 Key: FLINK-36221
 URL: https://issues.apache.org/jira/browse/FLINK-36221
 Project: Flink
  Issue Type: Sub-task
Reporter: yux
 Fix For: cdc-3.2.0


FLINK-34877 adds CAST ... AS ... syntax in transform expressions, but there's 
no corresponding documentations yet.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36219) Add Flink compatibility matrix of CDC releases

2024-09-04 Thread yux (Jira)
yux created FLINK-36219:
---

 Summary: Add Flink compatibility matrix of CDC releases
 Key: FLINK-36219
 URL: https://issues.apache.org/jira/browse/FLINK-36219
 Project: Flink
  Issue Type: Sub-task
Reporter: yux
 Fix For: cdc-3.2.0


Now, CDC releases have their own preferences over Flink versions. For example, 
Flink 3.1.0- doesn't work with Flink 1.19.

Adding a compatibility table would be much cleaner.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36214) Error log when building flink-cdc-pipeline-udf-examples from source code

2024-09-04 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36214?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17879401#comment-17879401
 ] 

yux commented on FLINK-36214:
-

Seems it was brought in by FLINK-34876, will investigate this.

> Error log when building flink-cdc-pipeline-udf-examples from source code
> 
>
> Key: FLINK-36214
> URL: https://issues.apache.org/jira/browse/FLINK-36214
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: lincoln lee
>Priority: Minor
>
> There's an error log when building from source code(encountered on 3.2.0 rc & 
> master branch), but not fail the build. 
> {code}
> [INFO] --< org.apache.flink:flink-cdc-pipeline-udf-examples 
> >--
> [INFO] Building flink-cdc-pipeline-udf-examples 3.2.0                    
> [3/42]
> [INFO] [ jar 
> ]-
> [INFO]
> [INFO] --- maven-clean-plugin:3.1.0:clean (default-clean) @ 
> flink-cdc-pipeline-udf-examples ---
> [INFO] Deleting 
> /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/target
> [INFO]
> [INFO] --- flatten-maven-plugin:1.5.0:clean (flatten.clean) @ 
> flink-cdc-pipeline-udf-examples ---
> [INFO] Deleting 
> /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/.flattened-pom.xml
> [INFO]
> [INFO] --- spotless-maven-plugin:2.4.2:check (spotless-check) @ 
> flink-cdc-pipeline-udf-examples ---
> [INFO]
> [INFO] --- maven-checkstyle-plugin:2.17:check (validate) @ 
> flink-cdc-pipeline-udf-examples ---
> [INFO]
> [INFO] --- maven-enforcer-plugin:3.0.0-M1:enforce (enforce-maven-version) @ 
> flink-cdc-pipeline-udf-examples ---
> [INFO]
> [INFO] --- maven-remote-resources-plugin:1.5:process 
> (process-resource-bundles) @ flink-cdc-pipeline-udf-examples ---
> [INFO]
> [INFO] --- maven-resources-plugin:3.1.0:resources (default-resources) @ 
> flink-cdc-pipeline-udf-examples ---
> [INFO] Using 'UTF-8' encoding to copy filtered resources.
> [INFO] skip non existing resourceDirectory 
> /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/src/main/resources
> [INFO] Copying 3 resources
> [INFO]
> [INFO] --- flatten-maven-plugin:1.5.0:flatten (flatten) @ 
> flink-cdc-pipeline-udf-examples ---
> [INFO] Generating flattened POM of project 
> org.apache.flink:flink-cdc-pipeline-udf-examples:jar:3.2.0...
> [INFO]
> [INFO] --- scala-maven-plugin:4.9.2:add-source (scala-compile-first) @ 
> flink-cdc-pipeline-udf-examples ---
> [INFO] Add Source directory: 
> /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/src/main/scala
> [INFO] Add Test Source directory: 
> /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/src/test/scala
> [INFO]
> [INFO] --- scala-maven-plugin:4.9.2:compile (scala-compile-first) @ 
> flink-cdc-pipeline-udf-examples ---
> [INFO] Compiler bridge file: 
> /Users/lilin/.sbt/1.0/zinc/org.scala-sbt/org.scala-sbt-compiler-bridge_2.12-1.10.0-bin_2.12.16__52.0-1.10.0_20240505T232140.jar
> [INFO] compiling 8 Scala sources and 8 Java sources to 
> /Users/lilin/Downloads/veri-cdc/flink-cdc-3.2.0/flink-cdc-pipeline-udf-examples/target/classes
>  ...
> [ERROR] -release is only supported on Java 9 and higher
> [INFO] done compiling
> [INFO] compile in 8.2 s
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36193) Allow applying new SchemaChangeEvents to sink connectors

2024-09-01 Thread yux (Jira)
yux created FLINK-36193:
---

 Summary: Allow applying new SchemaChangeEvents to sink connectors
 Key: FLINK-36193
 URL: https://issues.apache.org/jira/browse/FLINK-36193
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


We've added `DropTableEvent` and `TruncateTableEvent` support and provides 
pre-schema backfill feature in FLINK-35243. However, there's no sink connectors 
supporting them for now.

This could be done for sink connectors including Doris, StarRocks, and Paimon.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36184) Transform Operator swallows schema changes from tables not present in transform rules

2024-08-29 Thread yux (Jira)
yux created FLINK-36184:
---

 Summary: Transform Operator swallows schema changes from tables 
not present in transform rules
 Key: FLINK-36184
 URL: https://issues.apache.org/jira/browse/FLINK-36184
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0, cdc-3.3.0
Reporter: yux


Currently, tables that are not present in transform blocks should be treated as 
if there's such a dummy fallback block:

transform:
  - source-table: "\.*.\.*" # capture all unmentioned tables
projection: "*" # keep all columns
# without filtering any rows
There's a bug in #3557's implementation, where schema change events should be 
filtered out if there's no wildcard (*) in transform rules. However, it also 
filters out those tables that are not defined in transform rules, which causes 
schema change events lost with the following example:

transform:
  - source-table: foo.bar.baz   # Another table that doesn't really matter
projection: ...
Here, since there's one transform block, TransformOperator will be added into 
operator chain. Now let's perform some schema change events in another table 
(like db.table), it will be filtered out since TransformOperator regards it as 
a asterisk-less table and does not require schema change events.

By checking if a table is transformed or not, we could set hasAsterisk flag map 
correctly and resolve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36183) Lenient mode doesn't work with route blocks

2024-08-29 Thread yux (Jira)
yux created FLINK-36183:
---

 Summary: Lenient mode doesn't work with route blocks
 Key: FLINK-36183
 URL: https://issues.apache.org/jira/browse/FLINK-36183
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.2.0, cdc-3.3.0
Reporter: yux


We should applySchemaChange (where the route rule works) first and lenientize 
its result then, or we may not be able to get evolved schema since tableId 
isn't routed:

Caused by: java.lang.IllegalStateException: Evolved schema does not exist, not 
ready for schema change event AddColumnEvent{tableId=kunni_test.customers, 
addedColumns=[ColumnWithPosition{column=`newCol2` VARCHAR(100), position=AFTER, 
existedColumnName=newCol}]}
at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lambda$lenientizeSchemaChangeEvent$3(SchemaRegistryRequestHandler.java:378)
at java.util.Optional.orElseThrow(Optional.java:290)
at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.lenientizeSchemaChangeEvent(SchemaRegistryRequestHandler.java:376)
at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.calculateDerivedSchemaChangeEvents(SchemaRegistryRequestHandler.java:360)
at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistryRequestHandler.handleSchemaChangeRequest(SchemaRegistryRequestHandler.java:184)
at 
org.apache.flink.cdc.runtime.operators.schema.coordinator.SchemaRegistry.lambda$handleCoordinationRequest$3(SchemaRegistry.java:273)
... 4 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36153) MySQL fails to handle schema change events In Timestamp or Earliest Offset startup mode

2024-08-25 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-36153:

Issue Type: Bug  (was: Improvement)

> MySQL fails to handle schema change events In Timestamp or Earliest Offset 
> startup mode
> ---
>
> Key: FLINK-36153
> URL: https://issues.apache.org/jira/browse/FLINK-36153
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> Currently, if MySQL source is trying to startup fro a binlog position where 
> there are schema changes within range, job will fail due to non-replayable 
> schema change events.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36153) MySQL fails to handle schema change events In Timestamp or Earliest Offset startup mode

2024-08-25 Thread yux (Jira)
yux created FLINK-36153:
---

 Summary: MySQL fails to handle schema change events In Timestamp 
or Earliest Offset startup mode
 Key: FLINK-36153
 URL: https://issues.apache.org/jira/browse/FLINK-36153
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, if MySQL source is trying to startup fro a binlog position where 
there are schema changes within range, job will fail due to non-replayable 
schema change events.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36153) MySQL fails to handle schema change events In Timestamp or Earliest Offset startup mode

2024-08-25 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17876559#comment-17876559
 ] 

yux commented on FLINK-36153:
-

[~Leonard] Please assign this to me.

> MySQL fails to handle schema change events In Timestamp or Earliest Offset 
> startup mode
> ---
>
> Key: FLINK-36153
> URL: https://issues.apache.org/jira/browse/FLINK-36153
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> Currently, if MySQL source is trying to startup fro a binlog position where 
> there are schema changes within range, job will fail due to non-replayable 
> schema change events.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36151) Add documentations for Schema Evolution related options

2024-08-25 Thread yux (Jira)
yux created FLINK-36151:
---

 Summary: Add documentations for Schema Evolution related options
 Key: FLINK-36151
 URL: https://issues.apache.org/jira/browse/FLINK-36151
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


CDC Documentations should be updated to reflect recent changes of schema change 
features, like new TRY_EVOLVE and LENIENT mode, `include.schema.change` and 
`exclude.schema.change` options.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36114) Schema registry should block when handling existing requests

2024-08-21 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-36114:

Issue Type: Bug  (was: Improvement)

> Schema registry should block when handling existing requests
> 
>
> Key: FLINK-36114
> URL: https://issues.apache.org/jira/browse/FLINK-36114
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>  Labels: pull-request-available
>
> Currently, SchemaRegistry asynchronously receives schema change requests from 
> SchemaOperator, and results of multiple requests might got mixed up together, 
> causing incorrect logic flow in multiple parallelism cases.
> Changing SchemaRegistry's behavior to accept requests in serial should 
> resolve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36105) CDC pipeline job could not restore from state in Flink 1.20

2024-08-21 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36105?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-36105:

Priority: Blocker  (was: Major)

> CDC pipeline job could not restore from state in Flink 1.20
> ---
>
> Key: FLINK-36105
> URL: https://issues.apache.org/jira/browse/FLINK-36105
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Blocker
>
> Currently, it is not possible to restore pipeline job with Flink 1.20 with 
> the following exception:
> Exception in thread "main" java.lang.NoClassDefFoundError: 
> org/apache/flink/runtime/jobgraph/RestoreMode
>   at 
> org.apache.flink.cdc.cli.CliFrontend.createSavepointRestoreSettings(CliFrontend.java:130)
>   at 
> org.apache.flink.cdc.cli.CliFrontend.createExecutor(CliFrontend.java:95)
>   at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:69)
> Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.runtime.jobgraph.RestoreMode
>   at 
> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>   at 
> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>   at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
>   ... 3 more
> Seems the Flink API mentioned above has been changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36114) Schema registry should block when handling existing requests

2024-08-21 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36114?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-36114:

Priority: Blocker  (was: Major)

> Schema registry should block when handling existing requests
> 
>
> Key: FLINK-36114
> URL: https://issues.apache.org/jira/browse/FLINK-36114
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Blocker
>  Labels: pull-request-available
>
> Currently, SchemaRegistry asynchronously receives schema change requests from 
> SchemaOperator, and results of multiple requests might got mixed up together, 
> causing incorrect logic flow in multiple parallelism cases.
> Changing SchemaRegistry's behavior to accept requests in serial should 
> resolve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36128) Promote LENIENT mode as the default schema evolution behavior

2024-08-21 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-36128:

Issue Type: Bug  (was: Improvement)

> Promote LENIENT mode as the default schema evolution behavior
> -
>
> Key: FLINK-36128
> URL: https://issues.apache.org/jira/browse/FLINK-36128
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> Currently, default schema evolution mode "EVOLVE" could not handle 
> exceptions, and might not be able to restore from existing state correctly 
> after failover. Before we can "manually trigger checkpoint" that was 
> introduced in Flink 1.19, making "LENIENT" a default option might be more 
> suitable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-36128) Promote LENIENT mode as the default schema evolution behavior

2024-08-21 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-36128?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-36128:

Priority: Blocker  (was: Major)

> Promote LENIENT mode as the default schema evolution behavior
> -
>
> Key: FLINK-36128
> URL: https://issues.apache.org/jira/browse/FLINK-36128
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Blocker
>
> Currently, default schema evolution mode "EVOLVE" could not handle 
> exceptions, and might not be able to restore from existing state correctly 
> after failover. Before we can "manually trigger checkpoint" that was 
> introduced in Flink 1.19, making "LENIENT" a default option might be more 
> suitable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36128) Promote LENIENT mode as the default schema evolution behavior

2024-08-21 Thread yux (Jira)
yux created FLINK-36128:
---

 Summary: Promote LENIENT mode as the default schema evolution 
behavior
 Key: FLINK-36128
 URL: https://issues.apache.org/jira/browse/FLINK-36128
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, default schema evolution mode "EVOLVE" could not handle exceptions, 
and might not be able to restore from existing state correctly after failover. 
Before we can "manually trigger checkpoint" that was introduced in Flink 1.19, 
making "LENIENT" a default option might be more suitable.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36114) Schema registry should block when handling existing requests

2024-08-20 Thread yux (Jira)
yux created FLINK-36114:
---

 Summary: Schema registry should block when handling existing 
requests
 Key: FLINK-36114
 URL: https://issues.apache.org/jira/browse/FLINK-36114
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, SchemaRegistry asynchronously receives schema change requests from 
SchemaOperator, and results of multiple requests might got mixed up together, 
causing incorrect logic flow in multiple parallelism cases.

Changing SchemaRegistry's behavior to accept requests in serial should resolve 
this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36105) CDC pipeline job could not restore from state in Flink 1.20

2024-08-19 Thread yux (Jira)
yux created FLINK-36105:
---

 Summary: CDC pipeline job could not restore from state in Flink 
1.20
 Key: FLINK-36105
 URL: https://issues.apache.org/jira/browse/FLINK-36105
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, it is not possible to restore pipeline job with Flink 1.20 with the 
following exception:

Exception in thread "main" java.lang.NoClassDefFoundError: 
org/apache/flink/runtime/jobgraph/RestoreMode
at 
org.apache.flink.cdc.cli.CliFrontend.createSavepointRestoreSettings(CliFrontend.java:130)
at 
org.apache.flink.cdc.cli.CliFrontend.createExecutor(CliFrontend.java:95)
at org.apache.flink.cdc.cli.CliFrontend.main(CliFrontend.java:69)
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.runtime.jobgraph.RestoreMode
at 
java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
at 
java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
... 3 more

Seems the Flink API mentioned above has been changed.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36093) PreTransform operator wrongly filters out columns when multiple transform rules were defined

2024-08-19 Thread yux (Jira)
yux created FLINK-36093:
---

 Summary: PreTransform operator wrongly filters out columns when 
multiple transform rules were defined
 Key: FLINK-36093
 URL: https://issues.apache.org/jira/browse/FLINK-36093
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, such transform rule could not work:

```yaml
transform:
  - projection: 'A' as result
filter: tag >= 0
  - projection: score as result
filter: tag < 0
```

Here, `score` column will be filtered out in PreTransform stage, since it 
wasn't referenced in the first transform rule. As a result, the following 
transform rule will fail since `score` does not exist in PostTransform operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36092) Transform doesn't fully support with schema evolution

2024-08-19 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36092?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17874799#comment-17874799
 ] 

yux commented on FLINK-36092:
-

Noticed this bug when running DataGen -> StarRocks fuzzing test. [~Leonard] 
Please assign this to me.

> Transform doesn't fully support with schema evolution
> -
>
> Key: FLINK-36092
> URL: https://issues.apache.org/jira/browse/FLINK-36092
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> Currently, transformed table could not fully support upstream schema 
> evolutions.
> Need to add more test cases to test if add / alterType / rename / drop column 
> events works with wildcard matchers and non-wildcard matchers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36092) Transform doesn't fully support with schema evolution

2024-08-19 Thread yux (Jira)
yux created FLINK-36092:
---

 Summary: Transform doesn't fully support with schema evolution
 Key: FLINK-36092
 URL: https://issues.apache.org/jira/browse/FLINK-36092
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, transformed table could not fully support upstream schema evolutions.
Need to add more test cases to test if add / alterType / rename / drop column 
events works with wildcard matchers and non-wildcard matchers.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36041) Eliminate Calcite dependency during runtime

2024-08-12 Thread yux (Jira)
yux created FLINK-36041:
---

 Summary: Eliminate Calcite dependency during runtime
 Key: FLINK-36041
 URL: https://issues.apache.org/jira/browse/FLINK-36041
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, runtime operator `PreTransformOp` and `PostTransformOp` heavily 
relies on Calcite to parse expression rules. Calcite is a heavy dependency and 
quite easily to cause conflicts (since it's a Flink dependency, too).

It would be better if we could construct those abstract grammar data structures 
early (in composer, before submitting jobs). It could improve runtime execution 
performance and provide better error messages given malformed expressions.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35985) SUBSTRING function not available in transform rules

2024-08-12 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872902#comment-17872902
 ] 

yux commented on FLINK-35985:
-

Thanks [~MOBIN]'s suggestion! +1 for providing both functions to keep 
consistency with Flink SQL.
Kindly remind that FLINK-35991 just refactored builtin functions declaration, 
so make sure you've rebased before working on this : )

> SUBSTRING function not available in transform rules
> ---
>
> Key: FLINK-35985
> URL: https://issues.apache.org/jira/browse/FLINK-35985
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> Currently, one could not write `SUBSTRING(str FROM idx FOR len)` in the way 
> that suggested in the doumentations[1], while the workable function, 
> `SUBSTR(str, idx, len)`, isn't mentioned anywhere.
> Either the code or the docs needs update to avoid confusing users.
> [1] 
> https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/core-concept/transform/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-36034) Remove dependencies of Flink table planner in Transform module

2024-08-11 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-36034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872765#comment-17872765
 ] 

yux commented on FLINK-36034:
-

[~Leonard] I'm willing to investigate this.

> Remove dependencies of Flink table planner in Transform module
> --
>
> Key: FLINK-36034
> URL: https://issues.apache.org/jira/browse/FLINK-36034
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> Currently, transform module `BuiltinScalarFunction` redundantly relies on 
> `org.apache.flink.table.functions.BuiltInFunctionDefinition`. Getting rid of 
> it should allow CDC jobs running on Kubernetes clusters where such dependency 
> isn't provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36034) Remove dependencies of Flink table planner in Transform module

2024-08-11 Thread yux (Jira)
yux created FLINK-36034:
---

 Summary: Remove dependencies of Flink table planner in Transform 
module
 Key: FLINK-36034
 URL: https://issues.apache.org/jira/browse/FLINK-36034
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, transform module `BuiltinScalarFunction` redundantly relies on 
`org.apache.flink.table.functions.BuiltInFunctionDefinition`. Getting rid of it 
should allow CDC jobs running on Kubernetes clusters where such dependency 
isn't provided.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35615) CDC3.0, the pipeline of mysql-doris , do not support the MySQL field type timestamp(6)

2024-08-08 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872198#comment-17872198
 ] 

yux commented on FLINK-35615:
-

This has been resolved in https://github.com/apache/flink-cdc/pull/3511.

> CDC3.0, the pipeline of mysql-doris ,  do not support the MySQL field type 
> timestamp(6)
> ---
>
> Key: FLINK-35615
> URL: https://issues.apache.org/jira/browse/FLINK-35615
> Project: Flink
>  Issue Type: Bug
>Affects Versions: cdc-3.1.0
>Reporter: ConorZhao
>Priority: Major
>  Labels: pull-request-available
>
> 2024-06-14 16:45:24
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
> at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
> at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
> at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
> at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
> at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 8
> at 
> com.ververica.cdc.common.data.binary.BinarySegmentUtils.getLongMultiSegments(BinarySegmentUtils.java:731)
> at 
> com.ververica.cdc.common.data.binary.BinarySegmentUtils.getLong(BinarySegmentUtils.java:721)
> at 
> com.ververica.cdc.common.data.binary.BinarySegmentUtils.readLocalZonedTimestampData(BinarySegmentUtils.java:1033)
> at 
> com.ververica.cdc.common.data.binary.BinaryRecordData.getLocalZonedTimestampData(BinaryRecordData.java:244)
> at 
> com.ververica.cdc.connectors.doris.sink.DorisRowConverter.lambda$createExternalConverter$cc45f215$1(DorisRowConverter.java:113)
> at 
> com.ververica.cdc.connectors.dori

[jira] [Commented] (FLINK-35774) The cache of transform is not updated after process schema change event

2024-08-08 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872193#comment-17872193
 ] 

yux commented on FLINK-35774:
-

Resolved by https://github.com/apache/flink-cdc/pull/3285.

> The cache of transform is not updated after process schema change event
> ---
>
> Key: FLINK-35774
> URL: https://issues.apache.org/jira/browse/FLINK-35774
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: Wenkai Qi
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 1m
>  Remaining Estimate: 1m
>
> The cache of transform is not updated after process schema change event.
> For example, when add column event, tableInfo is not updated in 
> TransformSchema and TransformData.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35852) When used through the transform function, the decimal(10,2) type field value in the MySQL source table becomes 100 times the original value after being transferred to

2024-08-08 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17872192#comment-17872192
 ] 

yux commented on FLINK-35852:
-

Resolved in https://github.com/apache/flink-cdc/pull/3285.

> When used through the transform function, the decimal(10,2) type field value 
> in the MySQL source table becomes 100 times the original value after being 
> transferred to the target table.
> 
>
> Key: FLINK-35852
> URL: https://issues.apache.org/jira/browse/FLINK-35852
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
> Environment: flink-1.18.0、 
> flink-cdc-3.1.1、mysql-5.7.43、Doris-2.0.6、CentOS Linux release 8.5.2111
>Reporter: zheng_shengsheng
>Priority: Major
> Attachments: image-2024-07-16-14-46-53-982.png, 
> image-2024-07-16-14-48-07-981.png
>
>
> When I use CDC's yaml mode to collect MySQL data to Doris, I use the latest 
> supported transform feature.
> Finally, I found that the database table in MySQL collected and the 
> decimal(10,2) type fields in the Mysql source table were automatically 
> converted into decimal(19,0) in Doris, and then the value size became 100 
> times the original size. As follows
>  
> !image-2024-07-16-14-48-07-981.png!
> !image-2024-07-16-14-46-53-982.png!
>  
> The core part of yaml is as follows:
> {code:java}
> // code placeholder
> source:
>   type: mysql
>   hostname: node2
>   port: 3306
>   username: 
>   password: 
>   tables: sys_canteen_consume_conf,sys_canteen_consume_rec,sys_order_course,
>   server-id: 5513
>   connectionTimeZone: GMT+8
>   scan.startup.mode: initial
> sink:
>   type: doris
>   fenodes: node3:8030
>   username: admin
>   password: 
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1
> transform:
>   - source-table: jeecg-boot2.sys_\.*
>     projection: concat('LYDB2','') as tenant_code, *
>     primary-keys: tenant_code,id
>   - source-table: jeecg-boot2.sys_user_depart
>     projection: concat('LYDB2','') as tenant_code, *
>     primary-keys: tenant_code,ID  
> pipeline:
>   name: test,server-id-5513,flinkCDC-3.1.1
>   parallelism: 1 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-36006) Add documentations for Flink CDC CLI

2024-08-08 Thread yux (Jira)
yux created FLINK-36006:
---

 Summary: Add documentations for Flink CDC CLI
 Key: FLINK-36006
 URL: https://issues.apache.org/jira/browse/FLINK-36006
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, there's no specification about CDC CLI usage in documentations, 
while such information is only available with `./flink-cdc.sh --help`. Adding 
them should help users referencing options more easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35991) Resolve conflicting definitions in transform SqlFunctionTable definition

2024-08-06 Thread yux (Jira)
yux created FLINK-35991:
---

 Summary: Resolve conflicting definitions in transform 
SqlFunctionTable definition
 Key: FLINK-35991
 URL: https://issues.apache.org/jira/browse/FLINK-35991
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, transform operator chains self-defined SqlFunctions and standard 
function tables, which might be a problem since there are some conflicting 
function declarations.

Getting rid of `SqlStdOperatorTable` and declaring supported functions only 
should resolve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35986) NULL literal is not supported in Transform rules

2024-08-05 Thread yux (Jira)
yux created FLINK-35986:
---

 Summary: NULL literal is not supported in Transform rules
 Key: FLINK-35986
 URL: https://issues.apache.org/jira/browse/FLINK-35986
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Sometimes one may want to explicitly write down a NULL value, for example in 
some CASE - WHEN branches or when calling some UDF functions. However, it is 
not possible to write down such expressions, and an exception will be thrown:

Caused by: java.lang.UnsupportedOperationException: Unsupported type: NULL
at 
org.apache.flink.cdc.runtime.typeutils.DataTypeConverter.convertCalciteRelDataTypeToDataType(DataTypeConverter.java:181)
at 
org.apache.flink.cdc.runtime.parser.TransformParser.generateProjectionColumns(TransformParser.java:189)
   ...



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35985) SUBSTRING function not available in transform rules

2024-08-05 Thread yux (Jira)
yux created FLINK-35985:
---

 Summary: SUBSTRING function not available in transform rules
 Key: FLINK-35985
 URL: https://issues.apache.org/jira/browse/FLINK-35985
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, one could not write `SUBSTRING(str FROM idx FOR len)` in the way 
that suggested in the doumentations[1], while the workable function, 
`SUBSTR(str, idx, len)`, isn't mentioned anywhere.

Either the code or the docs needs update to avoid confusing users.

[1] 
https://nightlies.apache.org/flink/flink-cdc-docs-release-3.1/docs/core-concept/transform/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35984) Job crashes when metadata column names present in transform rules

2024-08-05 Thread yux (Jira)
yux created FLINK-35984:
---

 Summary: Job crashes when metadata column names present in 
transform rules
 Key: FLINK-35984
 URL: https://issues.apache.org/jira/browse/FLINK-35984
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Given such a transform rule:

transform:
  projection: \*, '__namespace_name__schema_name__table_name__' AS 
string_literal

Obviously this shouldn't insert any metadata columns. However since metadata 
column existence check was done by searching identifier string in statement, 
without considering any syntax info:

```java
// TransformParser.java#L357
if (transformStatement.contains(DEFAULT_NAMESPACE_NAME) ...
```

Transform operator will mistakenly append metadata columns into Janino 
arguments list, and crash the job:

Caused by: java.lang.IllegalArgumentException: wrong number of arguments
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.codehaus.janino.ExpressionEvaluator.evaluate(ExpressionEvaluator.java:541)
at 
org.codehaus.janino.ExpressionEvaluator.evaluate(ExpressionEvaluator.java:533)
at 
org.apache.flink.cdc.runtime.operators.transform.ProjectionColumnProcessor.evaluate(ProjectionColumnProcessor.java:64)
at 
org.apache.flink.cdc.runtime.operators.transform.TransformProjectionProcessor.processData(TransformProjectionProcessor.java:153)
at 
org.apache.flink.cdc.runtime.operators.transform.TransformDataOperator.processProjection(TransformDataOperator.java:387)
at 
org.apache.flink.cdc.runtime.operators.transform.TransformDataOperator.processDataChangeEvent(TransformDataOperator.java:328)
at 
org.apache.flink.cdc.runtime.operators.transform.TransformDataOperator.processElement(TransformDataOperator.java:190)
at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:237)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:146)
at 
org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35983) Job crashes when using wildcard (*) match with metadata column

2024-08-05 Thread yux (Jira)
yux created FLINK-35983:
---

 Summary: Job crashes when using wildcard (*) match with metadata 
column
 Key: FLINK-35983
 URL: https://issues.apache.org/jira/browse/FLINK-35983
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


One may write such a projection rule:

transform:
  - projection: '*, __namespace_name__, __schema_name__, __table_name__'

to append some metadata columns at the end of existing columns. However this 
will crash the job since once a metadata column was declared, since wildcard 
matches metadata column, too:

Caused by: java.lang.IllegalArgumentException: Field names must be unique. 
Found duplicates: [__namespace_name__, __schema_name__, __table_name__]
at 
org.apache.flink.cdc.common.types.RowType.validateFields(RowType.java:158)
at org.apache.flink.cdc.common.types.RowType.(RowType.java:54)
at org.apache.flink.cdc.common.types.RowType.of(RowType.java:183)
at org.apache.flink.cdc.common.types.RowType.of(RowType.java:175)
at 
org.apache.flink.cdc.runtime.typeutils.DataTypeConverter.toRowType(DataTypeConverter.java:55)
at 
org.apache.flink.cdc.runtime.operators.transform.TableChangeInfo.of(TableChangeInfo.java:100)
at 
org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator.cacheCreateTable(TransformSchemaOperator.java:183)
at 
org.apache.flink.cdc.runtime.operators.transform.TransformSchemaOperator.processElement(TransformSchemaOperator.java:168)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:75)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:50)
at 
org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
at 
org.apache.flink.streaming.runtime.tasks.SourceOperatorStreamTask$AsyncDataOutputToOutput.emitRecord(SourceOperatorStreamTask.java:309)
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:110)
at 
org.apache.flink.streaming.api.operators.source.SourceOutputWithWatermarks.collect(SourceOutputWithWatermarks.java:101)
at 
org.apache.flink.api.connector.source.lib.util.IteratorSourceReaderBase.pollNext(IteratorSourceReaderBase.java:111)
at 
org.apache.flink.cdc.connectors.values.source.ValuesDataSource$EventIteratorReader.pollNext(ValuesDataSource.java:294)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:419)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:562)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:858)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:807)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
at 
org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:932)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35982) Transform metadata config doesn't work if no projection block was provided

2024-08-05 Thread yux (Jira)
yux created FLINK-35982:
---

 Summary: Transform metadata config doesn't work if no projection 
block was provided
 Key: FLINK-35982
 URL: https://issues.apache.org/jira/browse/FLINK-35982
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


One may readjust source table primary keys, partition keys, table options by 
specifying them in a Transform block like this:

transform:
  - projection: '*'
primary-keys: order_id, product_name
partition-keys: order_id
table-options: bucket=1

However, if projection field is omitted (which fallbacks to default behavior 
that does not change any source table columns), such configuration will not 
take effect:

transform:
  - primary-keys: order_id, product_name
partition-keys: order_id
table-options: bucket=1 # These options will not apply



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35981) Transform rule doesn't support referencing one column more than once

2024-08-05 Thread yux (Jira)
yux created FLINK-35981:
---

 Summary: Transform rule doesn't support referencing one column 
more than once
 Key: FLINK-35981
 URL: https://issues.apache.org/jira/browse/FLINK-35981
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, transform rule (projection / filtering) doesn't support referencing 
one column more than once, which means if we write such a projection rule:

transform:
  - projection: \*, age * age AS age_square
filter: age < 18 OR age > 60

Janino compiler will crash with the following exception stack. Seems duplicated 
columns were added into Janino arguments list:

Caused by: 
org.apache.flink.shaded.guava31.com.google.common.util.concurrent.UncheckedExecutionException:
 org.apache.flink.api.common.InvalidProgramException: Expression cannot be 
compiled. This is a bug. Please file an issue.
Expression: import static 
org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;age * age
at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2055)
at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966)
at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863)
at 
org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.compileExpression(TransformExpressionCompiler.java:46)
... 18 more
Caused by: org.apache.flink.api.common.InvalidProgramException: Expression 
cannot be compiled. This is a bug. Please file an issue.
Expression: import static 
org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;age * age
at 
org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.lambda$compileExpression$0(TransformExpressionCompiler.java:62)
at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868)
at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533)
at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282)
at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159)
at 
org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
... 21 more
Caused by: org.codehaus.commons.compiler.CompileException: Line 1, Column 82: 
Redefinition of parameter "age"




--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35980) Add transform test coverage in Integrated / E2e tests

2024-08-05 Thread yux (Jira)
yux created FLINK-35980:
---

 Summary: Add transform test coverage in Integrated / E2e tests
 Key: FLINK-35980
 URL: https://issues.apache.org/jira/browse/FLINK-35980
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, there are no enough UT/E2e test cases to cover transform features 
like built-in scalar functions, temporary functions, complex expressions, etc. 
Adding them should cover more use-cases and reduce the possibility of 
accidentally introduced mistakes.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35912) SqlServer CDC doesn't chunk UUID-typed columns correctly

2024-07-28 Thread yux (Jira)
yux created FLINK-35912:
---

 Summary: SqlServer CDC doesn't chunk UUID-typed columns correctly
 Key: FLINK-35912
 URL: https://issues.apache.org/jira/browse/FLINK-35912
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


As reported by GitHub user @LiPL2017, SqlServer CDC doesn't chunk UUID-typed 
columns correctly since UUID comparison isn't implemented correctly[1].

[1] 
https://learn.microsoft.com/en-us/sql/connect/ado-net/sql/compare-guid-uniqueidentifier-values?view=sql-server-ver16



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Comment Edited] (FLINK-35889) mongo cdc restore from expired resume token and job status still running but expect failed

2024-07-24 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868347#comment-17868347
 ] 

yux edited comment on FLINK-35889 at 7/24/24 12:08 PM:
---

Sure, I noticed that such behaviour could be controlled by 
`MongoSourceTask#sourceConfig#tolerateErrors` configuration. Could you please 
share more information about your configuration, like are you setting 
`errors.tolerance` to `none`?


was (Author: JIRAUSER300787):
Sure, I noticed that such behaviour could be controlled by 
`MongoSourceTask#sourceConfig#tolerateErrors` configuration. Should we add a 
new option for this?

> mongo cdc restore from expired resume token and job status still running but 
> expect failed
> --
>
> Key: FLINK-35889
> URL: https://issues.apache.org/jira/browse/FLINK-35889
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Darren_Han
>Priority: Blocker
>
> version
> mongodb: 3.6
> flink: 1.14.6
> mongo-cdc:2.4.2
>  
> Restarting job from a savepoint/checpoint which contains expired resume 
> token/point, job status is always running and  do not capture change data, 
> printing logs continuously.
> Here is some example logs:
> 2024-07-23 11:11:04,214 INFO  
> com.mongodb.kafka.connect.source.MongoSourceTask             [] - An 
> exception occurred when trying to get the next item from the Change Stream
> com.mongodb.MongoQueryException: Query failed with error code 280 and error 
> message 'resume of change stream was not possible, as the resume token was 
> not found. \{_data: BinData(0, "xx")}'
> 2024-07-23 17:53:27,330 INFO  
> com.mongodb.kafka.connect.source.MongoSourceTask             [] - An 
> exception occurred when trying to get the next item from the Change Stream
> com.mongodb.MongoQueryException: Query failed with error code 280 and error 
> message 'resume of change notification was not possible, as the resume point 
> may no longer be in the oplog. ' on server
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35889) mongo cdc restore from expired resume token and job status still running but expect failed

2024-07-24 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35889?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868347#comment-17868347
 ] 

yux commented on FLINK-35889:
-

Sure, I noticed that such behaviour could be controlled by 
`MongoSourceTask#sourceConfig#tolerateErrors` configuration. Should we add a 
new option for this?

> mongo cdc restore from expired resume token and job status still running but 
> expect failed
> --
>
> Key: FLINK-35889
> URL: https://issues.apache.org/jira/browse/FLINK-35889
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Darren_Han
>Priority: Blocker
>
> version
> mongodb: 3.6
> flink: 1.14.6
> mongo-cdc:2.4.2
>  
> Restarting job from a savepoint/checpoint which contains expired resume 
> token/point, job status is always running and  do not capture change data, 
> printing logs continuously.
> Here is some example logs:
> 2024-07-23 11:11:04,214 INFO  
> com.mongodb.kafka.connect.source.MongoSourceTask             [] - An 
> exception occurred when trying to get the next item from the Change Stream
> com.mongodb.MongoQueryException: Query failed with error code 280 and error 
> message 'resume of change stream was not possible, as the resume token was 
> not found. \{_data: BinData(0, "xx")}'
> 2024-07-23 17:53:27,330 INFO  
> com.mongodb.kafka.connect.source.MongoSourceTask             [] - An 
> exception occurred when trying to get the next item from the Change Stream
> com.mongodb.MongoQueryException: Query failed with error code 280 and error 
> message 'resume of change notification was not possible, as the resume point 
> may no longer be in the oplog. ' on server
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35883) Wildcard projection inserts column at wrong place

2024-07-23 Thread yux (Jira)
yux created FLINK-35883:
---

 Summary: Wildcard projection inserts column at wrong place
 Key: FLINK-35883
 URL: https://issues.apache.org/jira/browse/FLINK-35883
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


In this case where a wildcard projection was declared:
```yaml
transform:
  - projection: \*, 'extras' AS extras
```
For upstream schema [a, b, c], transform operator should send [a, b, c, extras] 
to downstream.

However, if another column 'd' was inserted at the end, upstream schema would 
be [a, b, c, d], and one might expect transformed schema to be [a, b, c, d, 
extras]. But it's [a, b, c, extras, d], since `AddColumnEvent{d, 
position=LAST}` was applied to [a, b, c, extras] after the projection process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35883) Wildcard projection inserts column at wrong place

2024-07-23 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35883?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868022#comment-17868022
 ] 

yux commented on FLINK-35883:
-

I'd like to investigate this based on FLINK-35272.

> Wildcard projection inserts column at wrong place
> -
>
> Key: FLINK-35883
> URL: https://issues.apache.org/jira/browse/FLINK-35883
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> In this case where a wildcard projection was declared:
> ```yaml
> transform:
>   - projection: \*, 'extras' AS extras
> ```
> For upstream schema [a, b, c], transform operator should send [a, b, c, 
> extras] to downstream.
> However, if another column 'd' was inserted at the end, upstream schema would 
> be [a, b, c, d], and one might expect transformed schema to be [a, b, c, d, 
> extras]. But it's [a, b, c, extras, d], since `AddColumnEvent{d, 
> position=LAST}` was applied to [a, b, c, extras] after the projection process.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35634) Add a CDC quickstart utility

2024-07-22 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867744#comment-17867744
 ] 

yux commented on FLINK-35634:
-

Paimon / kafka sink options have been added.

> Add a CDC quickstart utility
> 
>
> Key: FLINK-35634
> URL: https://issues.apache.org/jira/browse/FLINK-35634
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Minor
>
> Currently, it's not very easy to initialize a CDC pipeline job from scratch, 
> requiring user to configure lots of Flink configurations manually.
> This ticket suggests creating an extra component like `tiup` and `rustup` to 
> help user creating and submitting CDC job quickly. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35868) Bump Mongo driver version to support Mongo 7.0+

2024-07-19 Thread yux (Jira)
yux created FLINK-35868:
---

 Summary: Bump Mongo driver version to support Mongo 7.0+
 Key: FLINK-35868
 URL: https://issues.apache.org/jira/browse/FLINK-35868
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, MongoDB CDC connector depends on mongodb-driver v4.9.1, which 
doesn't support Mongo Server 7.0+[1]. Upgrading dependency version would be 
nice since Mongo 7.0 has been released nearly a year ago.

[1] https://www.mongodb.com/docs/drivers/java/sync/current/compatibility/



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35634) Add a CDC quickstart utility

2024-07-19 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867204#comment-17867204
 ] 

yux commented on FLINK-35634:
-

Just implemented a primitive demo for `cdcup` in 
https://github.com/yuxiqian/cdcup. Looking forward to any comments / 
suggestions from [~Leonard] and [~kunni].

> Add a CDC quickstart utility
> 
>
> Key: FLINK-35634
> URL: https://issues.apache.org/jira/browse/FLINK-35634
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: yux
>Priority: Minor
>
> Currently, it's not very easy to initialize a CDC pipeline job from scratch, 
> requiring user to configure lots of Flink configurations manually.
> This ticket suggests creating an extra component like `tiup` and `rustup` to 
> help user creating and submitting CDC job quickly. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35634) Add a CDC quickstart utility

2024-07-19 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35634?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17867228#comment-17867228
 ] 

yux commented on FLINK-35634:
-

Seems Paimon / Kafka sink are missing. Will add them soon.

> Add a CDC quickstart utility
> 
>
> Key: FLINK-35634
> URL: https://issues.apache.org/jira/browse/FLINK-35634
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Minor
>
> Currently, it's not very easy to initialize a CDC pipeline job from scratch, 
> requiring user to configure lots of Flink configurations manually.
> This ticket suggests creating an extra component like `tiup` and `rustup` to 
> help user creating and submitting CDC job quickly. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35852) When used through the transform function, the decimal(10,2) type field value in the MySQL source table becomes 100 times the original value after being transferred to

2024-07-16 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17866290#comment-17866290
 ] 

yux commented on FLINK-35852:
-

After some quick debugging I noticed that this has been resolved when 
implementing FLINK-35272 (https://github.com/apache/flink-cdc/pull/3285). But 
I'm not quite sure the underlying reason behind this weird issue. Any thoughts 
from [~wink]?

> When used through the transform function, the decimal(10,2) type field value 
> in the MySQL source table becomes 100 times the original value after being 
> transferred to the target table.
> 
>
> Key: FLINK-35852
> URL: https://issues.apache.org/jira/browse/FLINK-35852
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
> Environment: flink-1.18.0、 
> flink-cdc-3.1.1、mysql-5.7.43、Doris-2.0.6、CentOS Linux release 8.5.2111
>Reporter: zheng_shengsheng
>Priority: Major
> Attachments: image-2024-07-16-14-46-53-982.png, 
> image-2024-07-16-14-48-07-981.png
>
>
> When I use CDC's yaml mode to collect MySQL data to Doris, I use the latest 
> supported transform feature.
> Finally, I found that the database table in MySQL collected and the 
> decimal(10,2) type fields in the Mysql source table were automatically 
> converted into decimal(19,0) in Doris, and then the value size became 100 
> times the original size. As follows
>  
> !image-2024-07-16-14-48-07-981.png!
> !image-2024-07-16-14-46-53-982.png!
>  
> The core part of yaml is as follows:
> {code:java}
> // code placeholder
> source:
>   type: mysql
>   hostname: node2
>   port: 3306
>   username: 
>   password: 
>   tables: sys_canteen_consume_conf,sys_canteen_consume_rec,sys_order_course,
>   server-id: 5513
>   connectionTimeZone: GMT+8
>   scan.startup.mode: initialsink:
>   type: doris
>   fenodes: node3:8030
>   username: admin
>   password: 
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1transform:
>   - source-table: jeecg-boot2.sys_\.*
>     projection: concat('LYDB2','') as tenant_code, *
>     primary-keys: tenant_code,id
>   - source-table: jeecg-boot2.sys_user_depart
>     projection: concat('LYDB2','') as tenant_code, *
>     primary-keys: tenant_code,ID  pipeline:
>   name: test,server-id-5513,flinkCDC-3.1.1
>   parallelism: 1 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35852) When used through the transform function, the decimal(10,2) type field value in the MySQL source table becomes 100 times the original value after being transferred to

2024-07-15 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17866255#comment-17866255
 ] 

yux commented on FLINK-35852:
-

Thanks for [~zheng_shengsheng]'s detailed report, I will nvestigate this.

> When used through the transform function, the decimal(10,2) type field value 
> in the MySQL source table becomes 100 times the original value after being 
> transferred to the target table.
> 
>
> Key: FLINK-35852
> URL: https://issues.apache.org/jira/browse/FLINK-35852
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
> Environment: flink-1.18.0、 
> flink-cdc-3.1.1、mysql-5.7.43、Doris-2.0.6、CentOS Linux release 8.5.2111
>Reporter: zheng_shengsheng
>Priority: Major
> Attachments: image-2024-07-16-14-46-53-982.png, 
> sp20240716_144157_519.png, sp20240716_144344_710.png
>
>
> When I use CDC's yaml mode to collect MySQL data to Doris, I use the latest 
> supported transform feature.
> Finally, I found that the database table in MySQL collected and the 
> decimal(10,2) type fields in the Mysql source table were automatically 
> converted into decimal(19,0) in Doris, and then the value size became 100 
> times the original size. As follows
> The core part of yaml is as follows:
> {code:java}
> // code placeholder
> source:
>   type: mysql
>   hostname: node2
>   port: 3306
>   username: 
>   password: 
>   tables: sys_canteen_consume_conf,sys_canteen_consume_rec,sys_order_course,
>   server-id: 5513
>   connectionTimeZone: GMT+8
>   scan.startup.mode: initialsink:
>   type: doris
>   fenodes: node3:8030
>   username: admin
>   password: 
>   table.create.properties.light_schema_change: true
>   table.create.properties.replication_num: 1transform:
>   - source-table: jeecg-boot2.sys_\.*
>     projection: concat('LYDB2','') as tenant_code, *
>     primary-keys: tenant_code,id
>   - source-table: jeecg-boot2.sys_user_depart
>     projection: concat('LYDB2','') as tenant_code, *
>     primary-keys: tenant_code,ID  pipeline:
>   name: test,server-id-5513,flinkCDC-3.1.1
>   parallelism: 1 {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35491) [JUnit5 Migration] Module: Flink CDC modules

2024-07-15 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35491?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17866214#comment-17866214
 ] 

yux commented on FLINK-35491:
-

I'd like to take this ticket if needed.

> [JUnit5 Migration] Module: Flink CDC modules
> 
>
> Key: FLINK-35491
> URL: https://issues.apache.org/jira/browse/FLINK-35491
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: Muhammet Orazov
>Priority: Major
>
> Migrate Junit4 tests to Junit5 for the following modules:
>  * flink-cdc-common
>  * flink-cdc-composer
>  * flink-cdc-runtime
>  * flink-cdc-connect/flink-cdc-pipeline-connectors
>  * flink-cdc-e2e-tests



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35834) Update Doris Dependency to 1.6.2 to use group commit.

2024-07-14 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35834?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17865828#comment-17865828
 ] 

yux commented on FLINK-35834:
-

+1, bumping doris connector to 1.6.2 should also resolve FLINK-35072 and 
FLINK-35090.

> Update Doris Dependency to 1.6.2 to use group commit.
> -
>
> Key: FLINK-35834
> URL: https://issues.apache.org/jira/browse/FLINK-35834
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: LvYanquan
>Priority: Minor
> Fix For: cdc-3.3.0
>
>
> Doris support streamload group commit to improve performance of writing.
> Refer to https://github.com/apache/doris-flink-connector/pull/412



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35813) Transform schema operator failed to trigger checkpoint with bounded source

2024-07-11 Thread yux (Jira)
yux created FLINK-35813:
---

 Summary: Transform schema operator failed to trigger checkpoint 
with bounded source
 Key: FLINK-35813
 URL: https://issues.apache.org/jira/browse/FLINK-35813
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, transform schema operator clears its state field after a bounded job 
finishes. Any following checkpoint requests in snapshotState method will cause 
an NPE. Here's a minimum reproducible example:

```
source:
  type: values
sink:
  type: values
transform:
   ...
```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35805) Add "OpType" metadata column in transform

2024-07-10 Thread yux (Jira)
yux created FLINK-35805:
---

 Summary: Add "OpType" metadata column in transform
 Key: FLINK-35805
 URL: https://issues.apache.org/jira/browse/FLINK-35805
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, there's no way to retrieve Operation type in transform projection / 
filtering rules. Users may want to tag data records or implement logical delete 
tags.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35783) Flink CDC Could not start the yaml Job

2024-07-08 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863766#comment-17863766
 ] 

yux commented on FLINK-35783:
-

It is probably because pipeline jar & SQL jar has overlapping classes which 
will conflict. Using `ADD JAR` statement[1] in SQL job to load SQL jar 
dynamically instead of putting it into `FLINK_HOME/lib` might help.

[1] https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/sql/jar/

> Flink CDC Could not start the yaml Job
> --
>
> Key: FLINK-35783
> URL: https://issues.apache.org/jira/browse/FLINK-35783
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: layhuts
>Priority: Major
>
> * flink版本1.19.1
>  * flink CDC版本3.1.1
>  * 在${FLINK_HOME}/lib下增加了 mysql-connector-java-8.0.27.jar 和 
> flink-sql-connector-mysql-cdc-3.1.1.jar
>  * 在flink-cdc/lib下增加了flink-cdc-pipeline-connector-mysql-3.1.1.jar 和 
> flink-cdc-pipeline-connector-doris-3.1.1.jar
> 第一次使用
> {code:java}
> bin/flink-cdc.sh ***.yaml {code}
>  
> 提交作业提示java.lang.NoClassDefFoundError:org/apache/flink/cdc/runtime/typeutils/EventTypeInfo
> {code:java}
> Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/cdc/runtime/typeutils/EventTypeInfo   at 
> java.lang.Class.getDeclaredFields0(Native Method)   at 
> java.lang.Class.privateGetDeclaredFields(Class.java:2583)   at 
> java.lang.Class.getDeclaredField(Class.java:2068)   at 
> java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1872)   at 
> java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79)   at 
> java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506)   at 
> java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)   at 
> java.security.AccessController.doPrivileged(Native Method)   at 
> java.io.ObjectStreamClass.(ObjectStreamClass.java:494)   at 
> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)   at 
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028)   at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875)   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028)   at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2209)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)   at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
>    at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
>    at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
>    at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:496)
>    at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:294)
>    at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:173)
>    ... 19 more Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.cdc.runtime.typeutils.EventTypeInfo   at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:387)   at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:418)   at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)   at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:351)   ... 52 more {code}
> 按照提示在${FLINK_HOME}/lib下增加了 flink-cdc-runtime-3.1.1.jar 后再次运行出现如下问题:
> {code:java}
> Exception in thread "main" org.apache.flink.util.FlinkException: Failed to 
> execute job 'Sync mid_cloud Database to Doris'.
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2455)
>     at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:2421)
>     at 
> org.apache.flink.cdc.composer.f

[jira] [Resolved] (FLINK-35077) Add package license check for Flink CDC modules.

2024-07-08 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35077?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux resolved FLINK-35077.
-
Fix Version/s: cdc-3.1.1
 Release Note: Fixed in 
https://github.com/apache/flink-cdc/commit/ca1470d5dd48f9238d13a350be99c174ff600b7b.
   Resolution: Fixed

> Add package license check for Flink CDC modules.
> 
>
> Key: FLINK-35077
> URL: https://issues.apache.org/jira/browse/FLINK-35077
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Minor
>  Labels: pull-request-available
> Fix For: cdc-3.1.1
>
>
> Currently, Flink project has CI scripts checking if dependencies with 
> incompatible licenses are introduced.
> Flink CDC module heavily relies on external libraries (especially 
> connectors), so running similar checking scripts during every CI would be 
> helpful preventing developers introducing questionable dependencies by 
> accident.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35783) Flink CDC Could not start the yaml Job

2024-07-08 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35783?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17863759#comment-17863759
 ] 

yux commented on FLINK-35783:
-

Hi [~layhuts],
As you're submitting a pipeline job, `flink-sql-connector-mysql-cdc-3.1.1.jar` 
for Flink SQL job is not necessary here. Also, `flink-cdc-runtime-3.1.1.jar` 
doesn't need to be copied either, since it has been packaged into the 
`flink-cdc-dist` jar.

Please try adjusting jar libraries as follows and retry after *restarting Flink 
cluster*:
In $FLINK_HOME/lib:
* mysql-connector-java-8.0.27.jar
In $FLINK_CDC_HOME/lib:
* flink-cdc-pipeline-connector-mysql-3.1.1.jar
* flink-cdc-pipeline-connector-doris-3.1.1.jar

Please reply to this thread if the problem persists.

> Flink CDC Could not start the yaml Job
> --
>
> Key: FLINK-35783
> URL: https://issues.apache.org/jira/browse/FLINK-35783
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.1
>Reporter: layhuts
>Priority: Major
>
> * flink版本1.19.1
>  * flink CDC版本3.1.1
>  * 在${FLINK_HOME}/lib下增加了 mysql-connector-java-8.0.27.jar 和 
> flink-sql-connector-mysql-cdc-3.1.1.jar
>  * 在flink-cdc/lib下增加了flink-cdc-pipeline-connector-mysql-3.1.1.jar 和 
> flink-cdc-pipeline-connector-doris-3.1.1.jar
> 第一次使用
> {code:java}
> bin/flink-cdc.sh ***.yaml {code}
>  
> 提交作业提示java.lang.NoClassDefFoundError:org/apache/flink/cdc/runtime/typeutils/EventTypeInfo
> {code:java}
> Caused by: java.lang.NoClassDefFoundError: 
> org/apache/flink/cdc/runtime/typeutils/EventTypeInfo   at 
> java.lang.Class.getDeclaredFields0(Native Method)   at 
> java.lang.Class.privateGetDeclaredFields(Class.java:2583)   at 
> java.lang.Class.getDeclaredField(Class.java:2068)   at 
> java.io.ObjectStreamClass.getDeclaredSUID(ObjectStreamClass.java:1872)   at 
> java.io.ObjectStreamClass.access$700(ObjectStreamClass.java:79)   at 
> java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:506)   at 
> java.io.ObjectStreamClass$3.run(ObjectStreamClass.java:494)   at 
> java.security.AccessController.doPrivileged(Native Method)   at 
> java.io.ObjectStreamClass.(ObjectStreamClass.java:494)   at 
> java.io.ObjectStreamClass.lookup(ObjectStreamClass.java:391)   at 
> java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:681)   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028)   at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875)   at 
> java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2028)   at 
> java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1875)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2209)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2454)   at 
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2378)   at 
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2236)   
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1692)   at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:508)   at 
> java.io.ObjectInputStream.readObject(ObjectInputStream.java:466)   at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:539)
>    at 
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:527)
>    at 
> org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
>    at 
> org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:496)
>    at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:294)
>    at 
> org.apache.flink.runtime.executiongraph.ExecutionJobVertex.(ExecutionJobVertex.java:173)
>    ... 19 more Caused by: java.lang.ClassNotFoundException: 
> org.apache.flink.cdc.runtime.typeutils.EventTypeInfo   at 
> java.net.URLClassLoader.findClass(URLClassLoader.java:387)   at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:418)   at 
> sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:355)   at 
> java.lang.ClassLoader.loadClass(ClassLoader.java:351)   ... 52 more {code}
> 按照提示在${FLINK_HOME}/lib下增加了 flink-cdc-runtime-3.1.1.jar 后再次运行出现如下问题:
> {code:java}
> Exception in thread "main" org.apache.flink.util.FlinkException: Failed to 
> execute job 'Sync mid_cloud Database to Doris'.
>     at 
> org.apache.f

[jira] [Created] (FLINK-35781) Make pipeline parallelism config optional

2024-07-07 Thread yux (Jira)
yux created FLINK-35781:
---

 Summary: Make pipeline parallelism config optional
 Key: FLINK-35781
 URL: https://issues.apache.org/jira/browse/FLINK-35781
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, Flink CDC `PIPELINE_PARALLELISM` option is forcefully required in 
pipeline definition, which turns out to be unnecessary since Flink already has 
a fallback parallelism option.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35762) Cache hashCode for immutable map key classes

2024-07-04 Thread yux (Jira)
yux created FLINK-35762:
---

 Summary: Cache hashCode for immutable map key classes
 Key: FLINK-35762
 URL: https://issues.apache.org/jira/browse/FLINK-35762
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


As suggested by [~kunni], hash code caching is a common optimization in Java 
world (for example, java.lang.String uses such optimization to reduce duplicate 
hash calculation since String as a hashmap key is quite common). Such 
optimization could be applied in CDC to optimize execution performance.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35736) Add E2e migration scripts for Flink CDC

2024-07-01 Thread yux (Jira)
yux created FLINK-35736:
---

 Summary: Add E2e migration scripts for Flink CDC
 Key: FLINK-35736
 URL: https://issues.apache.org/jira/browse/FLINK-35736
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, there's no E2e migration tests in Flink CDC CI, and it's not very 
convenient for testers to run migration tests to verify RC release before each 
release.

Adding a automation migration testing script could help verifying CDC backwards 
compatibility better.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35615) CDC3.0, the pipeline of mysql-doris , do not support the MySQL field type timestamp(6)

2024-06-28 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35615?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860925#comment-17860925
 ] 

yux commented on FLINK-35615:
-

I could reproduce this issue by inserting a timestamp with no nanoSeconds part 
into a `TIMESTAMP(6)` field. Seems it's a MySQL source side issue.

> CDC3.0, the pipeline of mysql-doris ,  do not support the MySQL field type 
> timestamp(6)
> ---
>
> Key: FLINK-35615
> URL: https://issues.apache.org/jira/browse/FLINK-35615
> Project: Flink
>  Issue Type: Bug
>Affects Versions: cdc-3.1.0
>Reporter: ConorZhao
>Priority: Major
>
> 2024-06-14 16:45:24
> org.apache.flink.runtime.JobException: Recovery is suppressed by 
> NoRestartBackoffTimeStrategy
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:176)
> at 
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:107)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.recordTaskFailure(DefaultScheduler.java:285)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:276)
> at 
> org.apache.flink.runtime.scheduler.DefaultScheduler.onTaskFailed(DefaultScheduler.java:269)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.onTaskExecutionStateUpdate(SchedulerBase.java:764)
> at 
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:741)
> at 
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:83)
> at 
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:488)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:309)
> at 
> org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:307)
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:222)
> at 
> org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:85)
> at 
> org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:168)
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
> at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
> at 
> org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
> at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
> at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
> at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
> at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
> at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
> at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
> at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
> at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:253)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by: java.lang.ArrayIndexOutOfBoundsException: 8
> at 
> com.ververica.cdc.common.data.binary.BinarySegmentUtils.getLongMultiSegments(BinarySegmentUtils.java:731)
> at 
> com.ververica.cdc.common.data.binary.BinarySegmentUtils.getLong(BinarySegmentUtils.java:721)
> at 
> com.ververica.cdc.common.data.binary.BinarySegmentUtils.readLocalZonedTimestampData(BinarySegmentUtils.java:1033)
> at 
> com.ververica.cdc.common.data.binary.BinaryRecordData.getLocalZonedTimestampData(BinaryRecordData.java:244)
> at 
> com.ververica.cdc.connectors.doris.sink.DorisRowConverter.lambda$createExternalConverter$cc45f215$1(DorisRowConverter.java:113)
> at 
> com.v

[jira] [Commented] (FLINK-35654) Add Flink CDC verification guide in docs

2024-06-26 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35654?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17860289#comment-17860289
 ] 

yux commented on FLINK-35654:
-

Hi [~m.orazow], I've submitted a PR for this. Are you interested in reviewing 
this?

> Add Flink CDC verification guide in docs
> 
>
> Key: FLINK-35654
> URL: https://issues.apache.org/jira/browse/FLINK-35654
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> Currently, ASF voting process requires vast quality verification before 
> releasing any new versions, including:
>  * Tarball checksum verification
>  * Compile from source code
>  * Run pipeline E2e tests
>  * Run migration tests
>  * Check if jar was packaged with correct JDK version
>  * ...
> Adding verification guide in Flink CDC docs should help developers verify 
> future releases more easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35700) Loosen CDC pipeline options validation

2024-06-25 Thread yux (Jira)
yux created FLINK-35700:
---

 Summary: Loosen CDC pipeline options validation
 Key: FLINK-35700
 URL: https://issues.apache.org/jira/browse/FLINK-35700
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


FLINK-35121 adds pipeline configuration validation, rejecting any unknown 
options, which turns to be too strict, and it's not possible to create 
customized configuration extensions. Also, Flink doesn't reject unknown entries 
in flink-conf / config.yaml, just silently ignores them. It might be better for 
CDC to follow such behavior.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35664) Flink CDC pipeline transform supports mask function

2024-06-20 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17856665#comment-17856665
 ] 

yux commented on FLINK-35664:
-

Hi [~melin], just to confirm, are you suggesting adding built-in functions like 
`mask_inner` and `mask_outer` in MySQL 
([https://dev.mysql.com/doc/refman/5.7/en/data-masking-usage.html|https://dev.mysql.com/doc/refman/5.7/en/data-masking-usage.html)?])
 to transform API?

> Flink CDC pipeline transform supports mask function
> ---
>
> Key: FLINK-35664
> URL: https://issues.apache.org/jira/browse/FLINK-35664
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: melin
>Priority: Major
> Fix For: cdc-3.2.0
>
>
> Flink CDC pipeline transform supports mask function



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35654) Add Flink CDC verification guide in docs

2024-06-19 Thread yux (Jira)
yux created FLINK-35654:
---

 Summary: Add Flink CDC verification guide in docs
 Key: FLINK-35654
 URL: https://issues.apache.org/jira/browse/FLINK-35654
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, ASF voting process requires vast quality verification before 
releasing any new versions, including:
 * Tarball checksum verification
 * Compile from source code
 * Run pipeline E2e tests
 * Run migration tests
 * Check if jar was packaged with correct JDK version
 * ...

Adding verification guide in Flink CDC docs should help developers verify 
future releases more easily.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35651) Unable to call argument-less built-in functions with parentheses

2024-06-19 Thread yux (Jira)
yux created FLINK-35651:
---

 Summary: Unable to call argument-less built-in functions with 
parentheses
 Key: FLINK-35651
 URL: https://issues.apache.org/jira/browse/FLINK-35651
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, CDC Pipeline transform module handles some built-in functions 
specially, including LOCALTIME, LOCALTIMESTAMP, CURRENT_TIME, CURRENT_DATE, and 
CURRENT_TIMESTAMP.

In SystemFunctionUtils implementation, they have arguments actually, but are 
silently bound to current _{_}epoch_time{_}_ and _{_}time_zone{_}_ variable. 
However, user could not call these functions with parentheses directly because 
function signature doesn't match.

For example,

```yaml

transform:

  - projection: CURRENT_TIMESTAMP AS TS

```

is OK since it was automatically expanded to something like 
`currentTimestamp(_{_}epoch_time{_}{_}, __time_zone{_}_)`.

But for this definition:

```yaml

transform:

  - projection: CURRENT_TIMESTAMP() AS TS

```

Transform will fail at runtime:

> org.apache.calcite.runtime.CalciteContextException: From line 1, column 12 to 
> line 1, column 30: No match found for function signature CURRENT_TIMESTAMP()

which could be quite confusing to users.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35648) Pipeline job doesn't support multiple routing

2024-06-18 Thread yux (Jira)
yux created FLINK-35648:
---

 Summary: Pipeline job doesn't support multiple routing
 Key: FLINK-35648
 URL: https://issues.apache.org/jira/browse/FLINK-35648
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, any upstream could be routed at most once, which means if we wrote 
such route  definition:

routes:

  - source-table: db.(A|B)

     sink-table: terminal.one

  - source0table: db.(B|C)

     sink-table: terminal.two

Any upstream schema / data changes from db.B will be sent to terminal.one 
{*}only{*}, not to terminal.two since it has been handled by the first route 
rule.

This ticket suggests adding a route behavior option (FIRST_MATCH / COMPLETE) to 
configure if all route rules should be applied or only the first matched rule 
(for backwards compatibility.).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35647) Pipeline route rule supports placeholder replacement

2024-06-18 Thread yux (Jira)
yux created FLINK-35647:
---

 Summary: Pipeline route rule supports placeholder replacement
 Key: FLINK-35647
 URL: https://issues.apache.org/jira/browse/FLINK-35647
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, we must provide explicit sink-table in pipeline route rules, which 
means if we'd like to route all tables in specific database and change names in 
pattern, there's no choice but write all rules separately, which is a chore.

There's already an implementation 
([https://github.com/apache/flink-cdc/pull/2908)] to use some placeholder 
syntax like (db.source<> -> db.sink<>) to perform replacement in batch, which 
could solve this problem.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35634) Add a CDC quickstart utility

2024-06-18 Thread yux (Jira)
yux created FLINK-35634:
---

 Summary: Add a CDC quickstart utility
 Key: FLINK-35634
 URL: https://issues.apache.org/jira/browse/FLINK-35634
 Project: Flink
  Issue Type: New Feature
  Components: Flink CDC
Reporter: yux


Currently, it's not very easy to initialize a CDC pipeline job from scratch, 
requiring user to configure lots of Flink configurations manually.

This ticket suggests creating an extra component like `tiup` and `rustup` to 
help user creating and submitting CDC job quickly. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35633) Early verification and clearer error message of pipeline YAML definition

2024-06-18 Thread yux (Jira)
yux created FLINK-35633:
---

 Summary: Early verification and clearer error message of pipeline 
YAML definition 
 Key: FLINK-35633
 URL: https://issues.apache.org/jira/browse/FLINK-35633
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, little verifications are applied before submitting YAML pipeline job 
to Flink cluster, and errors in Transform / Route rules will be exposed as a 
runtime exception, which is hard to debug and investigate.

This ticket suggests performing Transform / Route expression validation in CDC 
CLI before submitting YAML jobs, providing clearer and more descriptive error 
messages in early.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35261) Flink CDC pipeline transform doesn't support decimal-type comparison

2024-06-12 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-35261:

Parent: FLINK-34877
Issue Type: Sub-task  (was: Improvement)

> Flink CDC pipeline transform doesn't support decimal-type comparison
> 
>
> Key: FLINK-35261
> URL: https://issues.apache.org/jira/browse/FLINK-35261
> Project: Flink
>  Issue Type: Sub-task
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> It would be convenient if we can filter by comparing decimal to number 
> literals like:
> {{transform:}}
> {{  - source-table: XXX}}
> {{    filter: price > 50}}
> where price is a Decimal typed column. However currently such expression is 
> not supported, and a runtime exception will be thrown as follows:
>  
> Caused by: org.apache.flink.api.common.InvalidProgramException: Expression 
> cannot be compiled. This is a bug. Please file an issue.
> Expression: import static 
> org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;PRICEALPHA > 50
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.lambda$compileExpression$0(TransformExpressionCompiler.java:62)
>  
> ~[blob_p-c3b34ad5a5a3a0bc443bb738e308b20b1da04a1f-8d419e2c927baeb0eeb40fb35c0a52dc:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.compileExpression(TransformExpressionCompiler.java:46)
>  
> ~[blob_p-c3b34ad5a5a3a0bc443bb738e308b20b1da04a1f-8d419e2c927baeb0eeb40fb35c0a52dc:3.2-SNAPSHOT]
>     ... 17 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 1, Column 89: 
> Cannot compare types "java.math.BigDecimal" and "int"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35261) Flink CDC pipeline transform doesn't support decimal-type comparison

2024-06-12 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17854308#comment-17854308
 ] 

yux commented on FLINK-35261:
-

Thanks for [~wink] 's clarification! I'll put this ticket as a subtask of 
FLINK-34877 since it could be resolved by type conversion.

> Flink CDC pipeline transform doesn't support decimal-type comparison
> 
>
> Key: FLINK-35261
> URL: https://issues.apache.org/jira/browse/FLINK-35261
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> It would be convenient if we can filter by comparing decimal to number 
> literals like:
> {{transform:}}
> {{  - source-table: XXX}}
> {{    filter: price > 50}}
> where price is a Decimal typed column. However currently such expression is 
> not supported, and a runtime exception will be thrown as follows:
>  
> Caused by: org.apache.flink.api.common.InvalidProgramException: Expression 
> cannot be compiled. This is a bug. Please file an issue.
> Expression: import static 
> org.apache.flink.cdc.runtime.functions.SystemFunctionUtils.*;PRICEALPHA > 50
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.lambda$compileExpression$0(TransformExpressionCompiler.java:62)
>  
> ~[blob_p-c3b34ad5a5a3a0bc443bb738e308b20b1da04a1f-8d419e2c927baeb0eeb40fb35c0a52dc:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4868)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3533)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2282)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2159)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2049)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache.get(LocalCache.java:3966)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.shaded.guava31.com.google.common.cache.LocalCache$LocalManualCache.get(LocalCache.java:4863)
>  
> ~[blob_p-d4bee45ff47f13d247ff78e6f3d164170cf71835-4816fa00c7fd16a7096498e5ed3caaee:3.2-SNAPSHOT]
>     at 
> org.apache.flink.cdc.runtime.operators.transform.TransformExpressionCompiler.compileExpression(TransformExpressionCompiler.java:46)
>  
> ~[blob_p-c3b34ad5a5a3a0bc443bb738e308b20b1da04a1f-8d419e2c927baeb0eeb40fb35c0a52dc:3.2-SNAPSHOT]
>     ... 17 more
> Caused by: org.codehaus.commons.compiler.CompileException: Line 1, Column 89: 
> Cannot compare types "java.math.BigDecimal" and "int"



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35242) Add per-type schema evolution behavior configuration

2024-06-11 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35242?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-35242:

Description: 
> Update: Changed `fine grained` terminology to avoid confusion between 
> fine-grained job resource management feature.

Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or 
throwing an exception. However such configuration strategy doesn't cover all 
user cases and requires want more fine-grained strategy configuration.

This ticket suggests adding one more strategy "try_evolve" or 
"evolve_when_available". It's basically like "evolving" option, but doesn't 
throw an exception if such operation fails, which provides more flexibility.

Also, this ticket suggests allowing user to configure per-schema-event 
strategy, so users could evolve some types of event (like rename column) and 
reject some unwelcomed events (like truncate table, remove column).

  was:
Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or 
throwing an exception. However such configuration strategy doesn't cover all 
user cases and requires want more fine-grained strategy configuration.

This ticket suggests adding one more strategy "try_evolve" or 
"evolve_when_available". It's basically like "evolving" option, but doesn't 
throw an exception if such operation fails, which provides more flexibility.

Also, this ticket suggests allowing user to configure per-schema-event 
strategy, so users could evolve some types of event (like rename column) and 
reject some dangerous events (like truncate table, remove column).

Summary: Add per-type schema evolution behavior configuration  (was: 
Add fine-grained schema evolution strategy)

> Add per-type schema evolution behavior configuration
> 
>
> Key: FLINK-35242
> URL: https://issues.apache.org/jira/browse/FLINK-35242
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Assignee: yux
>Priority: Major
>  Labels: pull-request-available
> Fix For: cdc-3.2.0
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> > Update: Changed `fine grained` terminology to avoid confusion between 
> > fine-grained job resource management feature.
> Currently, Flink CDC allows three SE strategy: evolving it, ignoring it, or 
> throwing an exception. However such configuration strategy doesn't cover all 
> user cases and requires want more fine-grained strategy configuration.
> This ticket suggests adding one more strategy "try_evolve" or 
> "evolve_when_available". It's basically like "evolving" option, but doesn't 
> throw an exception if such operation fails, which provides more flexibility.
> Also, this ticket suggests allowing user to configure per-schema-event 
> strategy, so users could evolve some types of event (like rename column) and 
> reject some unwelcomed events (like truncate table, remove column).



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35527) Polish quickstart guide & clean stale links in docs

2024-06-05 Thread yux (Jira)
yux created FLINK-35527:
---

 Summary: Polish quickstart guide & clean stale links in docs
 Key: FLINK-35527
 URL: https://issues.apache.org/jira/browse/FLINK-35527
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: yux
 Fix For: cdc-3.2.0


Currently, there's still a lot of stale links in Flink CDC docs, including some 
download links pointing to Ververica maven repositories. Need to clean them up 
to avoid user conflicts.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35503) OracleE2eITCase fails with error ORA-12528 on Mac M2

2024-06-02 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35503?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17851479#comment-17851479
 ] 

yux commented on FLINK-35503:
-

Hi [~sakkurn],
Currently [~gongzhongqiang] tweaks Oracle EE docker image for running E2e tests 
and repackaged it as goodboy008/oracle-19.3.0-ee:non-cdb. However it supports 
amd64 only, and requires Rosetta (or something similar) to run.

Oracle community discussions[2] revealed that ORA-03113 also appears when 
running cross-architecture emulation, perhaps caused by a bug in Rosetta 2, but 
is hard to investigate and debug since they're not open sourced.

As this PR [3] has brought official arm64 architecture support to Oracle docker 
image, I think we can repackage Oracle E2e docker image to support both amd64 
and arm64 architecture, which could help Mac developers run E2e tests easier. 
I'd like to help implementing this if needed.

[1] https://hub.docker.com/r/gvenzl/oracle-xe/tags
[2] https://github.com/oracle/docker-images/discussions/1951
[3] https://github.com/oracle/docker-images/pull/2659 

> OracleE2eITCase fails with error ORA-12528 on Mac M2
> 
>
> Key: FLINK-35503
> URL: https://issues.apache.org/jira/browse/FLINK-35503
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
> Environment:  
>  * Mac M2 (Apple Silicon)
>  * using docker desktop with Rosetta enabled for amd64 emulation
>  
>Reporter: Saketh Kurnool
>Priority: Blocker
> Attachments: com.ververica.cdc.connectors.tests.OracleE2eITCase.txt, 
> oracle-docker-setup-logs.txt
>
>
> Hello Flink CDC community,
> I am attempting to run `OracleE2eITCase` (in the cdc source connector e2e 
> tests), and I am running into the following runtime exception: 
> {code:java}
> java.sql.SQLException: 
> Listener refused the connection with the following error:
> ORA-12528, TNS:listener: all appropriate instances are blocking new 
> connections
>  
>     at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:854)
>     at 
> oracle.jdbc.driver.PhysicalConnection.connect(PhysicalConnection.java:793)
>     at 
> oracle.jdbc.driver.T4CDriverExtension.getConnection(T4CDriverExtension.java:57)
>     at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:747)
>     at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:562)
>     at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:677)
>     at java.sql/java.sql.DriverManager.getConnection(DriverManager.java:228)
>     at 
> com.ververica.cdc.connectors.tests.OracleE2eITCase.getOracleJdbcConnection(OracleE2eITCase.java:197)
>     at 
> com.ververica.cdc.connectors.tests.OracleE2eITCase.testOracleCDC(OracleE2eITCase.java:149)
>     at java.base/java.lang.reflect.Method.invoke(Method.java:567)
>     at 
> org.apache.flink.util.TestNameProvider$1.evaluate(TestNameProvider.java:45)
>     at 
> org.testcontainers.containers.FailureDetectingExternalResource$1.evaluate(FailureDetectingExternalResource.java:29)
> Caused by: oracle.net.ns.NetException: Listener refused the connection with 
> the following error:
> ORA-12528, TNS:listener: all appropriate instances are blocking new 
> connections
>  
>     at oracle.net.ns.NSProtocolNIO.negotiateConnection(NSProtocolNIO.java:284)
>     at oracle.net.ns.NSProtocol.connect(NSProtocol.java:340)
>     at oracle.jdbc.driver.T4CConnection.connect(T4CConnection.java:1596)
>     at oracle.jdbc.driver.T4CConnection.logon(T4CConnection.java:588)
>     ... 11 more{code}
> I have attached the test results to this issue.
> `OracleE2eITCase` runs the `goodboy008/oracle-19.3.0-ee:non-cdb` docker 
> image. I am able to reproduce the same issue when I run this docker image 
> locally - my observation is that dockerized Oracle DB instance is not being 
> set up properly, as I notice another ORA in the setup logs (`ORA-03113: 
> end-of-file on communication channel`). I have also attached the logs from 
> the docker image setup to this issue. To reproduce the ORA-12528 issue 
> locally, I:
>  * ran: `docker run goodboy008/oracle-19.3.0-ee:non-cdb`
>  * ssh'ed into the db pod
>  * ran: `sqlplus sys/top_secret@//localhost:1521/ORCLCDB as sysdba`
> Any insight/workaround on getting this e2e test and the docker image running 
> on my machine would be much appreciated. I'm also happy to provide any other 
> information regarding my setup in the comments. Thank you!
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35464) Flink CDC 3.1 breaks operator state compatiblity

2024-05-27 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849704#comment-17849704
 ] 

yux commented on FLINK-35464:
-

[~Leonard] I'm willing to investigate this.

> Flink CDC 3.1 breaks operator state compatiblity
> 
>
> Key: FLINK-35464
> URL: https://issues.apache.org/jira/browse/FLINK-35464
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: yux
>Priority: Critical
> Fix For: cdc-3.1.1
>
>
> Flink CDC 3.1 changed how SchemaRegistry [de]serializes state data, which 
> causes any checkpoint states saved with earlier version could not be restored 
> in version 3.1.0.
> This could be resolved by adding serialization versioning control logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35464) Flink CDC 3.1 breaks operator state compatiblity

2024-05-27 Thread yux (Jira)
yux created FLINK-35464:
---

 Summary: Flink CDC 3.1 breaks operator state compatiblity
 Key: FLINK-35464
 URL: https://issues.apache.org/jira/browse/FLINK-35464
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: yux
 Fix For: cdc-3.1.1


Flink CDC 3.1 changed how SchemaRegistry [de]serializes state data, which 
causes any checkpoint states saved with earlier version could not be restored 
in version 3.1.0.

This could be resolved by adding serialization versioning control logic.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35441) Add CDC upgrade compatibility tests

2024-05-23 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35441?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-35441:

Issue Type: Improvement  (was: Bug)

> Add CDC upgrade compatibility tests
> ---
>
> Key: FLINK-35441
> URL: https://issues.apache.org/jira/browse/FLINK-35441
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> Currently, there's no test cases to guarantee checkpoint state compatibility 
> between different CDC versions like Flink's SerializerUpgradeTestBase.
> Adding it should help CDC users upgrading version with more confidence.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35441) Add CDC upgrade compatibility tests

2024-05-23 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35441?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17849142#comment-17849142
 ] 

yux commented on FLINK-35441:
-

[~renqs] I'd love to take this ticket if needed.

> Add CDC upgrade compatibility tests
> ---
>
> Key: FLINK-35441
> URL: https://issues.apache.org/jira/browse/FLINK-35441
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> Currently, there's no test cases to guarantee checkpoint state compatibility 
> between different CDC versions like Flink's SerializerUpgradeTestBase.
> Adding it should help CDC users upgrading version with more confidence.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35441) Add CDC upgrade compatibility tests

2024-05-23 Thread yux (Jira)
yux created FLINK-35441:
---

 Summary: Add CDC upgrade compatibility tests
 Key: FLINK-35441
 URL: https://issues.apache.org/jira/browse/FLINK-35441
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Currently, there's no test cases to guarantee checkpoint state compatibility 
between different CDC versions like Flink's SerializerUpgradeTestBase.

Adding it should help CDC users upgrading version with more confidence.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35415) CDC Fails to create sink with Flink 1.19

2024-05-21 Thread yux (Jira)
yux created FLINK-35415:
---

 Summary: CDC Fails to create sink with Flink 1.19
 Key: FLINK-35415
 URL: https://issues.apache.org/jira/browse/FLINK-35415
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Affects Versions: cdc-3.1.0
Reporter: yux
 Fix For: cdc-3.2.0


Currently, Flink CDC doesn't work with Flink 1.19 with the following exception:

Exception in thread "main" java.lang.NoSuchMethodError: 'void 
org.apache.flink.streaming.runtime.operators.sink.CommitterOperatorFactory.(org.apache.flink.api.connector.sink2.TwoPhaseCommittingSink,
 boolean, boolean)'

The reason is Flink CDC uses Flink @Internal API and it was changed in 1.19 
update.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35385) upgrader flink dependency version to 1.19

2024-05-21 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35385?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17848411#comment-17848411
 ] 

yux commented on FLINK-35385:
-

Seems https://github.com/apache/flink-cdc/issues/3327 was caused by this since 
CommitterOperatorFactory API has changed in Flink 1.19.

[~renqs] I'd like to take this if needed.

> upgrader flink dependency version to 1.19
> -
>
> Key: FLINK-35385
> URL: https://issues.apache.org/jira/browse/FLINK-35385
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Affects Versions: cdc-3.1.0
>Reporter: Hongshun Wang
>Priority: Minor
> Fix For: cdc-3.2.0
>
>
> Flink 1.19 was released on 2024-03-18  and the connectors have not yet
> caught up. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-35329) Support modify table/column comments

2024-05-09 Thread yux (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-35329?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17845203#comment-17845203
 ] 

yux commented on FLINK-35329:
-

Hi [~melin], I'm trying to add AlterTableCommentEvent / AlterColumnCommentEvent 
support in PR https://github.com/apache/flink-cdc/pull/3296, PTAL.

> Support modify table/column comments
> 
>
> Key: FLINK-35329
> URL: https://issues.apache.org/jira/browse/FLINK-35329
> Project: Flink
>  Issue Type: New Feature
>  Components: Flink CDC
>Affects Versions: cdc-3.2.0
>Reporter: melin
>Priority: Major
>
> The database has ddl sql to change the comment of the table/column. It is 
> recommended to add the change comment event. You can also set unsynchronized 
> comments. Some scenarios cannot synchronize comments.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Closed] (FLINK-35312) Insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes_

2024-05-09 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35312?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux closed FLINK-35312.
---
Resolution: Done

This ticket has been resolved by https://github.com/apache/flink-cdc/pull/2551.

> Insufficient number of arguments were supplied for the procedure or function 
> cdc.fn_cdc_get_all_changes_
> 
>
> Key: FLINK-35312
> URL: https://issues.apache.org/jira/browse/FLINK-35312
> Project: Flink
>  Issue Type: Bug
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> h3. Flink version
> 1.17.0
> h3. Flink CDC version
> 2.4.1
> h3. Database and its version
> sql server 2014
> h3. Minimal reproduce step
> 1
> h3. What did you expect to see?
> Caused by: java.lang.RuntimeException: SplitFetcher thread 22 received 
> unexpected exception while polling the records
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> Caused by: org.apache.kafka.connect.errors.RetriableException: An exception 
> occurred in the change event producer. This connector will be restarted.
> at 
> io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46)
> at 
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:458)
> at 
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138)
> at 
> com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$LsnSplitReadTask.execute(SqlServerStreamFetchTask.java:161)
> at 
> com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerScanFetchTask.execute(SqlServerScanFetchTask.java:123)
> at 
> com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:95)
> ... 5 more
> Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: An insufficient 
> number of arguments were supplied for the procedure or function 
> cdc.fn_cdc_get_all_changes_ ... .
> at 
> com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:265)
> at 
> com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5471)
> at 
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1794)
> at 
> com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1052)
> at 
> io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
> at 
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:269)
> at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606)
> at 
> io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329)
> at 
> io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:251)
> ... 9 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35316) Add CDC e2e test case for on Flink 1.19

2024-05-08 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-35316:

Priority: Minor  (was: Major)

> Add CDC e2e test case for on Flink 1.19
> ---
>
> Key: FLINK-35316
> URL: https://issues.apache.org/jira/browse/FLINK-35316
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Priority: Minor
>
> Since Flink 1.19 has been generally available, Flink CDC is expected to be 
> used with it. E2e test cases should cover this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Updated] (FLINK-35316) Add CDC e2e test case for on Flink 1.19

2024-05-08 Thread yux (Jira)


 [ 
https://issues.apache.org/jira/browse/FLINK-35316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

yux updated FLINK-35316:

Issue Type: Improvement  (was: Bug)

> Add CDC e2e test case for on Flink 1.19
> ---
>
> Key: FLINK-35316
> URL: https://issues.apache.org/jira/browse/FLINK-35316
> Project: Flink
>  Issue Type: Improvement
>  Components: Flink CDC
>Reporter: yux
>Priority: Major
>
> Since Flink 1.19 has been generally available, Flink CDC is expected to be 
> used with it. E2e test cases should cover this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35317) Flink CDC CLI Supports submitting multiple YAML job at once

2024-05-08 Thread yux (Jira)
yux created FLINK-35317:
---

 Summary: Flink CDC CLI Supports submitting multiple YAML job at 
once
 Key: FLINK-35317
 URL: https://issues.apache.org/jira/browse/FLINK-35317
 Project: Flink
  Issue Type: Improvement
  Components: Flink CDC
Reporter: yux


Currently, Flink CDC CLI only allows submitting one YAML pipeline job each 
time. It would be convenient if users can submit multiple .yml files at once 
like this:

{{./bin/flink-cdc.sh job1.yml job2.yml --flink-home /opt/flink ...}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35316) Add CDC e2e test case for on Flink 1.19

2024-05-08 Thread yux (Jira)
yux created FLINK-35316:
---

 Summary: Add CDC e2e test case for on Flink 1.19
 Key: FLINK-35316
 URL: https://issues.apache.org/jira/browse/FLINK-35316
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


Since Flink 1.19 has been generally available, Flink CDC is expected to be used 
with it. E2e test cases should cover this.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Created] (FLINK-35312) Insufficient number of arguments were supplied for the procedure or function cdc.fn_cdc_get_all_changes_

2024-05-08 Thread yux (Jira)
yux created FLINK-35312:
---

 Summary: Insufficient number of arguments were supplied for the 
procedure or function cdc.fn_cdc_get_all_changes_
 Key: FLINK-35312
 URL: https://issues.apache.org/jira/browse/FLINK-35312
 Project: Flink
  Issue Type: Bug
  Components: Flink CDC
Reporter: yux


h3. Flink version

1.17.0
h3. Flink CDC version

2.4.1
h3. Database and its version

sql server 2014
h3. Minimal reproduce step

1
h3. What did you expect to see?

Caused by: java.lang.RuntimeException: SplitFetcher thread 22 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:165)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:114)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.connect.errors.RetriableException: An exception 
occurred in the change event producer. This connector will be restarted.
at io.debezium.pipeline.ErrorHandler.setProducerThrowable(ErrorHandler.java:46)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:458)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.execute(SqlServerStreamingChangeEventSource.java:138)
at 
com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerStreamFetchTask$LsnSplitReadTask.execute(SqlServerStreamFetchTask.java:161)
at 
com.ververica.cdc.connectors.sqlserver.source.reader.fetch.SqlServerScanFetchTask.execute(SqlServerScanFetchTask.java:123)
at 
com.ververica.cdc.connectors.base.source.reader.external.IncrementalSourceScanFetcher.lambda$submitTask$0(IncrementalSourceScanFetcher.java:95)
... 5 more
Caused by: com.microsoft.sqlserver.jdbc.SQLServerException: An insufficient 
number of arguments were supplied for the procedure or function 
cdc.fn_cdc_get_all_changes_ ... .
at 
com.microsoft.sqlserver.jdbc.SQLServerException.makeFromDatabaseError(SQLServerException.java:265)
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet$FetchBuffer.nextRow(SQLServerResultSet.java:5471)
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.fetchBufferNext(SQLServerResultSet.java:1794)
at 
com.microsoft.sqlserver.jdbc.SQLServerResultSet.next(SQLServerResultSet.java:1052)
at 
io.debezium.pipeline.source.spi.ChangeTableResultSet.next(ChangeTableResultSet.java:63)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.lambda$executeIteration$1(SqlServerStreamingChangeEventSource.java:269)
at io.debezium.jdbc.JdbcConnection.prepareQuery(JdbcConnection.java:606)
at 
io.debezium.connector.sqlserver.SqlServerConnection.getChangesForTables(SqlServerConnection.java:329)
at 
io.debezium.connector.sqlserver.SqlServerStreamingChangeEventSource.executeIteration(SqlServerStreamingChangeEventSource.java:251)
... 9 more



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


  1   2   >