[ 
https://issues.apache.org/jira/browse/KAFKA-10387?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Arjun Satish updated KAFKA-10387:
---------------------------------
    Description: 
Let's say we try to create a connector with the following config:

{code:java}
{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "tasks.max": "1",
  "database.hostname": "localhost",
  "database.port": 32843,
  "database.user": "test",
  "database.password": "test",
  "database.dbname": "test",
  "database.server.name": "tcpsql",
  "table.whitelist": "public.numerics",
  "transforms": "abc",
  "transforms.abc.type": "io.debezium.transforms.ExtractNewRecordState",
  "topic.creation.default.partitions": "1",
  "topic.creation.default.replication.factor": "1"
}
{code}


this fails with the following error in the Connector worker:

{code:java}
[2020-08-11 02:47:05,908] ERROR Failed to start task deb-0 
(org.apache.kafka.connect.runtime.Worker:560)
org.apache.kafka.connect.errors.ConnectException: 
org.apache.kafka.common.config.ConfigException: Unknown configuration 
'transforms.abc.type'
        at 
org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:296)
        at 
org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:605)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:555)
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1251)
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:127)
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1266)
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1262)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
        at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.kafka.common.config.ConfigException: Unknown 
configuration 'transforms.abc.type'
        at 
org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:159)
        at 
org.apache.kafka.connect.runtime.SourceConnectorConfig$EnrichedSourceConnectorConfig.get(SourceConnectorConfig.java:57)
        at 
org.apache.kafka.connect.runtime.SourceConnectorConfig.get(SourceConnectorConfig.java:141)
        at 
org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:216)
        at 
org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:281)
        ... 10 more
{code}

connector creation works fine, if we remove the topic.creation properties 
above. 

Not entirely sure but it looks like the piece of code that might need a fix is 
here (as it does not add {{transforms.*}} configs into the returned ConfigDef 
instances: 
https://github.com/apache/kafka/blob/2.6.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java#L94

  was:
Let's say we try to create a connector with the following config:

{code:java}
{
  "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
  "tasks.max": "1",
  "database.hostname": "localhost",
  "database.port": 32843,
  "database.user": "test",
  "database.password": "test",
  "database.dbname": "test",
  "database.server.name": "tcpsql",
  "table.whitelist": "public.numerics",
  "transforms": "abc",
  "transforms.abc.type": "io.debezium.transforms.ExtractNewRecordState",
  "topic.creation.default.partitions": "1",
  "topic.creation.default.replication.factor": "1"
}
{code}


this fails with the following error in the Connector worker:

{code:java}
[2020-08-11 02:47:05,908] ERROR Failed to start task deb-0 
(org.apache.kafka.connect.runtime.Worker:560)
org.apache.kafka.connect.errors.ConnectException: 
org.apache.kafka.common.config.ConfigException: Unknown configuration 
'transforms.abc.type'
        at 
org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:296)
        at 
org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:605)
        at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:555)
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1251)
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:127)
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1266)
        at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1262)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
        at 
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
        at java.base/java.lang.Thread.run(Thread.java:832)
Caused by: org.apache.kafka.common.config.ConfigException: Unknown 
configuration 'transforms.abc.type'
        at 
org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:159)
        at 
org.apache.kafka.connect.runtime.SourceConnectorConfig$EnrichedSourceConnectorConfig.get(SourceConnectorConfig.java:57)
        at 
org.apache.kafka.connect.runtime.SourceConnectorConfig.get(SourceConnectorConfig.java:141)
        at 
org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:216)
        at 
org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:281)
        ... 10 more
{code}

connector creation works fine, if we remove the topic.creation properties 
above. 

Not entirely sure but it looks like the piece of code that might need a fix is 
here (as it does not add transforms.* configs when creating 
EnrichedConnectorConfig instances: 
https://github.com/apache/kafka/blob/2.6.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java#L94


> Cannot include SMT configs with source connector that include 
> topic.creation.* properties
> -----------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10387
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10387
>             Project: Kafka
>          Issue Type: Bug
>          Components: KafkaConnect
>    Affects Versions: 2.6.0
>            Reporter: Arjun Satish
>            Priority: Major
>
> Let's say we try to create a connector with the following config:
> {code:java}
> {
>   "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
>   "tasks.max": "1",
>   "database.hostname": "localhost",
>   "database.port": 32843,
>   "database.user": "test",
>   "database.password": "test",
>   "database.dbname": "test",
>   "database.server.name": "tcpsql",
>   "table.whitelist": "public.numerics",
>   "transforms": "abc",
>   "transforms.abc.type": "io.debezium.transforms.ExtractNewRecordState",
>   "topic.creation.default.partitions": "1",
>   "topic.creation.default.replication.factor": "1"
> }
> {code}
> this fails with the following error in the Connector worker:
> {code:java}
> [2020-08-11 02:47:05,908] ERROR Failed to start task deb-0 
> (org.apache.kafka.connect.runtime.Worker:560)
> org.apache.kafka.connect.errors.ConnectException: 
> org.apache.kafka.common.config.ConfigException: Unknown configuration 
> 'transforms.abc.type'
>       at 
> org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:296)
>       at 
> org.apache.kafka.connect.runtime.Worker.buildWorkerTask(Worker.java:605)
>       at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:555)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1251)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1700(DistributedHerder.java:127)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1266)
>       at 
> org.apache.kafka.connect.runtime.distributed.DistributedHerder$10.call(DistributedHerder.java:1262)
>       at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>       at 
> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:630)
>       at java.base/java.lang.Thread.run(Thread.java:832)
> Caused by: org.apache.kafka.common.config.ConfigException: Unknown 
> configuration 'transforms.abc.type'
>       at 
> org.apache.kafka.common.config.AbstractConfig.get(AbstractConfig.java:159)
>       at 
> org.apache.kafka.connect.runtime.SourceConnectorConfig$EnrichedSourceConnectorConfig.get(SourceConnectorConfig.java:57)
>       at 
> org.apache.kafka.connect.runtime.SourceConnectorConfig.get(SourceConnectorConfig.java:141)
>       at 
> org.apache.kafka.common.config.AbstractConfig.getClass(AbstractConfig.java:216)
>       at 
> org.apache.kafka.connect.runtime.ConnectorConfig.transformations(ConnectorConfig.java:281)
>       ... 10 more
> {code}
> connector creation works fine, if we remove the topic.creation properties 
> above. 
> Not entirely sure but it looks like the piece of code that might need a fix 
> is here (as it does not add {{transforms.*}} configs into the returned 
> ConfigDef instances: 
> https://github.com/apache/kafka/blob/2.6.0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceConnectorConfig.java#L94



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to