kaja78 opened a new issue #12880: URL: https://github.com/apache/pulsar/issues/12880
**Describe the bug** The topic configuration property of PulsarKafkaConnectSinkConfig is not passed to Kafka sink. Instead of it, the original Pulsar topic name is passed. In the KafkaConnectSink.toSinkRecord() method, the topic name is derived using: `final String topic = sourceRecord.getTopicName().orElse(topicName);` The topicName instance variable holds topic name from PulsarKafkaConnectSinkConfig. However I guess, there is always topicName present on sourceRecord, so topic property from configuration is never passed to Kafka sink. **Expected behaviour** Simply the topic from PulsarKafkaConnectSinkConfig should be always used in KafkaConnectSink.toSinkRecord(). `final String topic = topicName;` _Actually it would make sense to reverse the original logic => if the topic is not set in sink configuration file, then use pulsar topic name. However, the topic configuration property is currently required, so it just should be always used._ **Additional context** I identified this issue while testing https://github.com/datastax/snowflake-connector. It uses KafkaConnectSink as parent class. The target table name is derived from Kafka topic name in com.snowflake.kafka.connector.SnowflakeSinkConnector. Since I am not able to configure the topic name in sink configuration file, pulsar messages are stored into tables with ugly names following pattern: PERSISTENT___[tenant]_[namespace]_[topic]_[hash] . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org