Hello Mario.
I didn't look into this in detail, but are you using the new plugin discovery 
mechanism? 
https://kafka.apache.org/documentation/#connectconfigs_plugin.discovery

Regards,
________________________________
From: Mario Fiore Vitale <[email protected]>
Sent: Friday, October 3, 2025 17:14
To: [email protected] <[email protected]>
Subject: Weird behavior on Kafka Connect class loading

You don't often get email from [email protected]. Learn why this 
is important<https://aka.ms/LearnAboutSenderIdentification>

EXTERNAL SENDER. Do not click links or open attachments unless you recognize 
the sender and know the content is safe. DO NOT provide your username or 
password.


Hi all,

I have the DebeziumOpenLineageEmitter[1] class in the debezium-openlineage-api 
that internally has a static map to maintain the registered emitter, the key of 
this map is "connectoLogicalName-taskid"
Then there is the OpenLineage[2] SMT, which is part of the Debezium core. In 
this SMT, I simply pass the same context to instantiate the same emitter via 
the connector.

Now I'm running the following image

FROM 
quay.io/debezium/connect:3.3.0.Final<http://quay.io/debezium/connect:3.3.0.Final>

ENV MAVEN_REPO="https://repo1.maven.org/maven2";
ENV GROUP_ID="io/debezium"
ENV DEBEZIUM_VERSION="3.3.0.Final"
ENV ARTIFACT_ID="debezium-openlineage-core"
ENV CLASSIFIER="-libs"

COPY log4j.properties /kafka/config/log4j.properties

# Add OpenLineage
RUN mkdir -p /tmp/openlineage-libs && \
    curl 
"$MAVEN_REPO/$GROUP_ID/$ARTIFACT_ID/$DEBEZIUM_VERSION/$ARTIFACT_ID-${DEBEZIUM_VERSION}${CLASSIFIER}.tar.gz"
 -o /tmp/debezium-openlineage-core-libs.tar.gz && \
    tar -xzvf /tmp/debezium-openlineage-core-libs.tar.gz -C 
/tmp/openlineage-libs --strip-components=1

RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-postgres/
RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-mongodb/
ADD openlineage.yml /kafka/

So is practically debezium connect image with just openlineage jars copied into 
postgres and mongodb connector folders.

When I register the PostgreSQL connector

{
  "name": "inventory-connector-postgres",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.server.id<http://database.server.id/>": "184054",
    "database.dbname": "postgres",
    "topic.prefix": "inventory",
    "snapshot.mode": "initial",
    "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
    "schema.history.internal.kafka.topic": "schema-changes.inventory",
    "slot.name<http://slot.name/>": "postgres",
    "openlineage.integration.enabled": "true",
    "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
    "openlineage.integration.job.description": "This connector does cdc for 
products",
    "openlineage.integration.tags": "env=prod,team=cdc",
    "openlineage.integration.owners": "Mario=maintainer,John Doe=Data 
scientist,IronMan=superero",
    "transforms": "openlineage",
    "transforms.openlineage.type": 
"io.debezium.transforms.openlineage.OpenLineage"
  }
}

I get the following error

2025-10-03T14:22:09,761 ERROR  ||  
WorkerSourceTask{id=inventory-connector-postgres-0} Task threw an uncaught and 
unrecoverable exception. Task is being killed and will not recover until 
manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
handler
    at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:260)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:180)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:58)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:415)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:376)
 ~[connect-runtime-4.1.0.jar:?]
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243) 
~[connect-runtime-4.1.0.jar:?]
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298) 
~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:83)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254)
 ~[connect-runtime-4.1.0.jar:?]
    at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
 ~[?:?]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) ~[?:?]
    at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
 ~[?:?]
    at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
 ~[?:?]
    at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
Caused by: java.lang.IllegalStateException: DebeziumOpenLineageEmitter not 
initialized for connector ConnectorContext[connectorLogicalName=inventory, 
connectorName=postgresql, taskId=0, version=null, config=null]. Call init() 
first.
    at 
io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:176)
 ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
    at 
io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:153)
 ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
    at 
io.debezium.transforms.openlineage.OpenLineage.apply(OpenLineage.java:74) 
~[debezium-core-3.3.0.Final.jar:3.3.0.Final]
    at 
org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:95)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:58)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:208)
 ~[connect-runtime-4.1.0.jar:?]
    at 
org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
 ~[connect-runtime-4.1.0.jar:?]
    ... 13 more

Full log attached

This is evidence that the emitters map is not shared between the connector and 
the SMT.

The situation becomes weirder if I remove all connectors from the image except 
PostgreSQL and MongoDB.
In that case, the PostgreSQL connector works perfectly.

The plugins are in the folder /kafka/connect (that is, the only plugin.path 
configured folder), each under a dedicated folder with their dependencies.

I then started to add more connectors, and it continued to work until I added 
the SQL Server connector.
To summarize, the problem arises when I put one or all of [sqlserver, 
spanner,vitess].

Am I correct that Kafka Connect guarantees that each connector is loaded with 
an isolated class loader with its dependencies so that the static emitters 
should be shared between the Connector and the SMT?

To add more, if I run the image from 3.2.0.Final (so Kafka 4.0.0) with all 
connectors, it works fine.

Any help is very appreciated.

[1] 
https://github.com/debezium/debezium/blob/main/debezium-openlineage/debezium-openlineage-api/src/main/java/io/debezium/openlineage/DebeziumOpenLineageEmitter.java
[2] 
https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/transforms/openlineage/OpenLineage.java
CONFIDENTIALITY NOTICE: This email message (and any attachment) is intended 
only for the individual or entity to which it is addressed. The information in 
this email is confidential and may contain information that is legally 
privileged or exempt from disclosure under applicable law. If you are not the 
intended recipient, you are strictly prohibited from reading, using, publishing 
or disseminating such information and upon receipt, must permanently delete the 
original and destroy any copies. We take steps to protect against viruses and 
other defects but advise you to carry out your own checks and precautions as 
Kambi does not accept any liability for any which remain. Thank you for your 
co-operation.

Reply via email to