[ 
https://issues.apache.org/jira/browse/KAFKA-19758?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18028391#comment-18028391
 ] 

Mario Fiore Vitale edited comment on KAFKA-19758 at 10/8/25 3:04 PM:
---------------------------------------------------------------------

> I don't think you and I are using the word "isolation" in the same way, and 
> maybe that's the cause of some of the friction here. The boundaries of 
> isolation are still specified at installation time in the same way as they 
> were before, only now users can choose older versions for each plugin.

For me, isolation means loading with the same classLoader. 

> It feels strange (that 1.8+1.1 and 1.9+1.0 are broken) because static field 
> sharing adds context that operators have to keep in mind when doing upgrades. 

This could happen not only for static field sharing but for any breaking 
change. For example, imagine a source connector that renames some field in a 
non-backward-compatible manner (in a major release) and and SMT should adapt to 
that change. An operator has to keep in mind this right?

> FWIW, static field sharing between plugins has never been in the mental model 
> for Connect plugins, and the threading model/initialization order/lifecycle 
> of plugins isn't well defined enough for static fields to be safe to use. 
> That is why I want to push back strongly on this practice,

As said previously, also before this issue, I had a bad feeling about this 
static field sharing, and now I have the proof that it was a design flaw. I'm 
currently looking for a different way to achieve it. That said, here the 
problem is another. 

If my understanding is correct, please correct me if I'm wrong, in KC version 
until 4.0 the isolation of the connector was done just with directory 
separation inside the `plugin.path` folder. Level 1 directories are isolated 
from each other. This is similar to what happens to war files in application 
servers, where each war is loaded with a different class loader. 

With KC 4.1 this is no more true, since all is guided by version and your level 
1 directory has no more importance for class loader isolation since you could 
have a connector in a level 0 folder, let's say connector a, that could then 
use a plugin (SMT, Converter, etc) that is in another lavel 1 folder, so loaded 
with a different class loader, just because it is the one thath mathes the 
desidered version. 

If my understanding of the past and current status for me, this is a huge 
behavior change that shouldn't have happened in a minor release, and so we need 
to be as backward compatible as possible. 


was (Author: JIRAUSER303467):
> I don't think you and I are using the word "isolation" in the same way, and 
> maybe that's the cause of some of the friction here. The boundaries of 
> isolation are still specified at installation time in the same way as they 
> were before, only now users can choose older versions for each plugin.

For me, isolation means loading with the same classLoader. 

> It feels strange (that 1.8+1.1 and 1.9+1.0 are broken) because static field 
> sharing adds context that operators have to keep in mind when doing upgrades. 

This could happen not only for static field sharing but for any breaking 
change. For example, imagine a source connector that renames some field in a 
non-backward-compatible manner (in a major release) and and SMT should adapt to 
that change. An operator has to keep in mind this right?

> FWIW, static field sharing between plugins has never been in the mental model 
> for Connect plugins, and the threading model/initialization order/lifecycle 
> of plugins isn't well defined enough for static fields to be safe to use. 
> That is why I want to push back strongly on this practice,

As said previously, also before this issue, I had a bad feeling about this 
static field sharing, and now I have the proof that it was a design flaw. I'm 
currently looking for a different way to achieve it. That said, here the 
problem is another. 

If my understanding is correct, please correct me if I'm wrong, in KC version 
until 4.0 the isolation of the connector was done just with directory 
separation inside the `plugin.path` folder. Level 1 directories are isolated 
from each other. This is similar to what happens to war files in application 
servers, where each war is loaded with a different class loader. 

With KC 4.1 this is no more true, since all is guided by version and your level 
1 directory has no more importance for class loader isolation since you could 
have a connector in a level 0 folder, let's say connector a, could then use a 
plugin (SMT, Converter, etc) that is in another lavel 0 folder, so loaded with 
a different class loader, just because it is the one thath mathes the desidered 
version. 

If my understanding of the past and current status for me, this is a huge 
behavior change that shouldn't have happened in a minor release, and so we need 
to be as backward compatible as possible. 

> Weird behavior on Kafka Connect 4.1 class loading
> -------------------------------------------------
>
>                 Key: KAFKA-19758
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19758
>             Project: Kafka
>          Issue Type: Bug
>          Components: connect
>    Affects Versions: 4.1.0
>            Reporter: Mario Fiore Vitale
>            Assignee: Mickael Maison
>            Priority: Blocker
>         Attachments: connect-service.log
>
>
> I have the 
> [DebeziumOpenLineageEmitter|https://github.com/debezium/debezium/blob/main/debezium-openlineage/debezium-openlineage-api/src/main/java/io/debezium/openlineage/DebeziumOpenLineageEmitter.java]
>  class in the *debezium-openlineage-api* that internally has a static map to 
> maintain the registered emitter, the key of this map is 
> "connectoLogicalName-taskid"
> Then there is the [OpenLineage 
> SMT|https://github.com/debezium/debezium/blob/main/debezium-core/src/main/java/io/debezium/transforms/openlineage/OpenLineage.java],
>  which is part of the *debezium-core.* In this SMT, I simply pass the same 
> context to instantiate the same emitter via the connector.
> Now I'm running the following image
> {code:java}
> FROM quay.io/debezium/connect:3.3.0.Final
> ENV MAVEN_REPO="https://repo1.maven.org/maven2";
> ENV GROUP_ID="io/debezium"
> ENV DEBEZIUM_VERSION="3.3.0.Final"
> ENV ARTIFACT_ID="debezium-openlineage-core"
> ENV CLASSIFIER="-libs"
> COPY log4j.properties /kafka/config/log4j.properties
> Add OpenLineage
> RUN mkdir -p /tmp/openlineage-libs && \
>     curl 
> "$MAVEN_REPO/$GROUP_ID/$ARTIFACT_ID/$DEBEZIUM_VERSION/$ARTIFACT_ID-${DEBEZIUM_VERSION}${CLASSIFIER}.tar.gz"
>  -o /tmp/debezium-openlineage-core-libs.tar.gz && \
>     tar -xzvf /tmp/debezium-openlineage-core-libs.tar.gz -C 
> /tmp/openlineage-libs --strip-components=1
> RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-postgres/
> RUN cp -r /tmp/openlineage-libs/* /kafka/connect/debezium-connector-mongodb/
> ADD openlineage.yml /kafka/ {code}
> So is practically debezium connect image with just openlineage jars copied 
> into postgres and mongodb connector folders.
> When I register the PostgreSQL connector
> {code:java}
> {
>   "name": "inventory-connector-postgres",
>   "config": {
>     "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
>     "tasks.max": "1",
>     "database.hostname": "postgres",
>     "database.port": "5432",
>     "database.user": "postgres",
>     "database.password": "postgres",
>     "database.server.id": "184054",
>     "database.dbname": "postgres",
>     "topic.prefix": "inventory",
>     "snapshot.mode": "initial",
>     "schema.history.internal.kafka.bootstrap.servers": "kafka:9092",
>     "schema.history.internal.kafka.topic": "schema-changes.inventory",
>     "slot.name": "postgres",
>     "openlineage.integration.enabled": "true",
>     "openlineage.integration.config.file.path": "/kafka/openlineage.yml",
>     "openlineage.integration.job.description": "This connector does cdc for 
> products",
>     "openlineage.integration.tags": "env=prod,team=cdc",
>     "openlineage.integration.owners": "Mario=maintainer,John Doe=Data 
> scientist,IronMan=superero",
>     "transforms": "openlineage",
>     "transforms.openlineage.type": 
> "io.debezium.transforms.openlineage.OpenLineage"
>   }
> } {code}
>  
> I get the following error
> {code:java}
> 2025-10-03T14:22:09,761 ERROR  ||  
> WorkerSourceTask{id=inventory-connector-postgres-0} Task threw an uncaught 
> and unrecoverable exception. Task is being killed and will not recover until 
> manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
> org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error 
> handler
>     at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:260)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:180)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:58)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:415)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:376)
>  ~[connect-runtime-4.1.0.jar:?]
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:243) 
> ~[connect-runtime-4.1.0.jar:?]
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:298) 
> ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:83)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:254)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
>  ~[?:?]
>     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317) 
> ~[?:?]
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
>  ~[?:?]
>     at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
>  ~[?:?]
>     at java.base/java.lang.Thread.run(Thread.java:1583) [?:?]
> Caused by: java.lang.IllegalStateException: DebeziumOpenLineageEmitter not 
> initialized for connector ConnectorContext[connectorLogicalName=inventory, 
> connectorName=postgresql, taskId=0, version=null, config=null]. Call init() 
> first.
>     at 
> io.debezium.openlineage.DebeziumOpenLineageEmitter.getEmitter(DebeziumOpenLineageEmitter.java:176)
>  ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
>     at 
> io.debezium.openlineage.DebeziumOpenLineageEmitter.emit(DebeziumOpenLineageEmitter.java:153)
>  ~[debezium-openlineage-api-3.3.0.Final.jar:3.3.0.Final]
>     at 
> io.debezium.transforms.openlineage.OpenLineage.apply(OpenLineage.java:74) 
> ~[debezium-core-3.3.0.Final.jar:3.3.0.Final]
>     at 
> org.apache.kafka.connect.runtime.TransformationStage.apply(TransformationStage.java:95)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.TransformationChain.lambda$apply$0(TransformationChain.java:58)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:208)
>  ~[connect-runtime-4.1.0.jar:?]
>     at 
> org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
>  ~[connect-runtime-4.1.0.jar:?]
>     ... 13 more {code}
> Full logs [^connect-service.log]
>  
> This is evidence that the emitters map is not shared between the connector 
> and the SMT.
> The situation becomes weirder if I remove all connectors from the image 
> except PostgreSQL and MongoDB.
> In that case, the PostgreSQL connector works perfectly.
> The plugins are in the folder */kafka/connect* (that is, the only 
> `plugin.path` configured folder), each under a dedicated folder with their 
> dependencies. 
> I then started to add more connectors, and it continued to work until I added 
> the SQL Server connector.
> To summarize, the problem arises when I put one or all of [sqlserver, 
> spanner,vitess].
>  
> The commonality for these connectors seems to be that they support 
> multi-task. The others don't. 
> Am I correct that Kafka Connect guarantees that each connector is loaded with 
> an isolated class loader with its dependencies so that the static emitters 
> should be shared between the Connector and the SMT?
> To add more, if I run the image from 3.2.0.Final (so Kafka 4.0.0) with all 
> connectors, it works fine.
> I did other tests, and things are more and more weird. All tests were done 
> with *{{plugin.path=/kafka/connect}}* and *KC 4.1*
> My original tests were with this directory structure
>  
> {code:java}
> /kafka/connect
> |___ debezium-connector-postgres
> |___ debezium-connector-mongodb
> |___ debezium-connector-sqlserver{code}
>  
> In this case, each connector should be isolated from each others (having a 
> dedicated class loader). In that case, the sharing between the connector and 
> SMT does not work for KC 4.0
> Then I tried with
>  
> {code:java}
> /kafka/connect
> |___ debezium-connectors
>      |___ debezium-connector-postgres
>      |___ debezium-connector-mongodb
>      |___ debezium-connector-sqlserver{code}
>  
> So all connectors are not isolated and share the same class loader. In this 
> case, no issue. And I'll say that this is expected.
> Then I tried with
>  
> {code:java}
> /kafka/connect
> |___ debezium-connectors
> |    |___ debezium-connector-postgres
> |    |___ debezium-connector-mongodb
> |___ debezium-connector-sqlserver{code}
>  
> where *{{postgres}}* and *{{mongodb}}* are not isolated (same classloader) 
> and *{{sqlserver}}* is isolated (different classloader), and in this case, it 
> still works. I expected this to fail as with the first setup.
> The SMT is in the *debezium-core* jar that and each connector has its own copy
> So in each connector folder, there are:
> {code:java}
> debezium-api-3.3.0.Final.jar
> debezium-common-3.3.0.Final.jar
> debezium-connector-[connectorName]-3.3.0.Final.jar
> debezium-core-3.3.0.Final.jar
> debezium-openlineage-api-3.3.0.Final.jar{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to