kaja78 opened a new issue #12880:
URL: https://github.com/apache/pulsar/issues/12880


   **Describe the bug**
   The topic configuration property of PulsarKafkaConnectSinkConfig is not 
passed to Kafka sink. Instead of it, the original Pulsar topic name is passed.
   
   In the KafkaConnectSink.toSinkRecord() method, the topic name is derived 
using:
   
   `final String topic = sourceRecord.getTopicName().orElse(topicName);`
   
   The topicName instance variable holds topic name from 
PulsarKafkaConnectSinkConfig. However I guess, there is always topicName 
present on sourceRecord, so topic property from configuration is never passed 
to Kafka sink.
   
   **Expected behaviour**
   Simply the topic from PulsarKafkaConnectSinkConfig should be always used in 
KafkaConnectSink.toSinkRecord().
   `final String topic = topicName;`
   
   _Actually it would make sense to reverse the original logic => if the topic 
is not set in sink configuration file, then use pulsar topic name. However, the 
topic configuration property is currently required, so it just should be always 
used._
   
   **Additional context**
   I identified this issue while testing 
https://github.com/datastax/snowflake-connector. It uses KafkaConnectSink as 
parent class.
   The target table name is derived from Kafka topic name in 
com.snowflake.kafka.connector.SnowflakeSinkConnector. Since I am not able to 
configure the topic name in sink configuration file, pulsar messages are stored 
into tables with ugly names following pattern: 
PERSISTENT___[tenant]_[namespace]_[topic]_[hash] .
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to