lisa created FLINK-40051:
----------------------------

             Summary: erorr: No coordinator registered for operator  when set 
operator.uid.prefix in mysql to hudi pipeline yaml
                 Key: FLINK-40051
                 URL: https://issues.apache.org/jira/browse/FLINK-40051
             Project: Flink
          Issue Type: Bug
          Components: Flink CDC
    Affects Versions: 1.20.5
         Environment: hadoop: 3.0.0-cdh6.2.0 (jdk8)

hive: 2.1.1-cdh6.2.0(jdk8)

 

flink 1.20.5 (jdk11)

flink-cdc: 3.6.0 (jdk11)

hudi: 1.1.1(jdk11)

 

jobmanager and taskmanger runtime jdk: jdk11
            Reporter: lisa


error message:

```

2026-07-02 15:58:13,538 INFO  
org.apache.flink.runtime.executiongraph.ExecutionGraph       [] - 
flush_event_alignment -> Map -> multi_table_write -> Flat Map -> Sink Writer: 
Hudi Sink (1/1) 
(e3b354fc809643b605c0cefc06b8ae08_02a786a3af36d73d4b8a95dab13f5e8d_0_2) 
switched from RUNNING to FAILED on container_e17_1767939498468_0254_01_000002 @ 
your_machine.com (dataPort=37289).
org.apache.flink.util.FlinkException: No coordinator registered for operator 
9899a42c64d67ef3172b7e3be3c1bbb9
    at 
org.apache.flink.runtime.scheduler.DefaultOperatorCoordinatorHandler.deliverOperatorEventToCoordinator(DefaultOperatorCoordinatorHandler.java:120)
 ~[flink-dist-1.20.5.jar:1.20.5]
    at 
org.apache.flink.runtime.scheduler.SchedulerBase.deliverOperatorEventToCoordinator(SchedulerBase.java:1099)
 ~[flink-dist-1.20.5.jar:1.20.5]
    at 
org.apache.flink.runtime.jobmaster.JobMaster.sendOperatorEventToCoordinator(JobMaster.java:638)
 ~[flink-dist-1.20.5.jar:1.20.5]
    at jdk.internal.reflect.GeneratedMethodAccessor26.invoke(Unknown Source) 
~[?:?]
    at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 ~[?:?]
    at java.base/java.lang.reflect.Method.invoke(Method.java:566) ~[?:?]
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.lambda$handleRpcInvocation$1(PekkoRpcActor.java:318)
 ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at 
org.apache.flink.runtime.concurrent.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
 ~[flink-dist-1.20.5.jar:1.20.5]
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcInvocation(PekkoRpcActor.java:316)
 ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleRpcMessage(PekkoRpcActor.java:229)
 ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at 
org.apache.flink.runtime.rpc.pekko.FencedPekkoRpcActor.handleRpcMessage(FencedPekkoRpcActor.java:88)
 ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at 
org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleMessage(PekkoRpcActor.java:174)
 ~[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at scala.PartialFunction.applyOrElse(PartialFunction.scala:127) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at 
org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at 
org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:272) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:233) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at org.apache.pekko.dispatch.Mailbox.exec(Mailbox.scala:245) 
[flink-rpc-akka6c2fb337-a8fa-4d72-8726-ead5316a2a55.jar:1.20.5]
    at 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290) [?:?]
    at 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
 [?:?]
    at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656) 
[?:?]
    at 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594) 
[?:?]
    at 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
 [?:?]

```

 

(1) after remove the config "operator.uid.prefix: operator.uid.prefix: 
mysql_hudi_user_behavior" from yaml file the job runs ok , 

(2) the uid 9899a42c64d67ef3172b7e3be3c1bbb9 would not change, even if i 
restart the job or change the "mysql_hudi_user_behavior" to 
"mysql_hudi_user_behavior1". the uid 9899a42c64d67ef3172b7e3be3c1bbb9 would not 
change at all.

 

pipeline job file:

 

```yaml

source:
  type: mysql
  hostname: 10.192.100.01
  port: 3306
  username: root
  password: qaz123
  tables: test.user_behavior
  server-id: 5400-5403
  server-time-zone: Asia/Shanghai
  scan.startup.mode: initial
  heartbeat.interval: 30s

sink:
  type: hudi
  name: Hudi Sink
  path: "hdfs://mycluster/apps/hive/warehouse/managed/dc"
  hoodie.table.type: MERGE_ON_READ

transform:
  - source-table: test.user_behavior
    table-options:
      ordering.fields: ts

pipeline:
  name: "MySQL to Hudi Pipeline"
  parallelism: 1
  operator.uid.prefix: mysql_hudi_user_behavior

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to