On Tue, Oct 10, 2017 at 8:31 AM, Jehan Bruggeman <jehanbrugge...@gmail.com> wrote:
> Hello, > > I'm trying to use a custom converter with Kafka Connect and I cannot seem > to get it right. I'm hoping someone has experience with this and could help > me figure it out ! > > > Initial situation > ================ > > - my custom converter's class path is 'custom.CustomStringConverter'. > > - to avoid any mistakes, my custom converter is currently just a copy/paste > of the pre-existing StringConverter (of course, this will change when I'll > get it to work). > https://github.com/apache/kafka/blob/trunk/connect/api/ > src/main/java/org/apache/kafka/connect/storage/StringConverter.java > > - I have a kafka connect cluster of 3 nodes, The nodes are running > confluent's official docker images ( confluentinc/cp-kafka-connect:3.3.0 > ). > > - Each node is configured to load a jar with my converter in it (using a > docker volume). > Can you explain this in more detail? Make sure that you add the JAR to the classpath. > > > > What happens ? > ================ > > When the connectors start, they correctly load the jars and find the custom > converter. Indeed, this is what I see in the logs : > > [2017-10-10 13:06:46,274] INFO Registered loader: > PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom- > converter-1.0-SNAPSHOT.jar} > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199) > [2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132) > [...] > [2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and > 'CustomString' to plugin 'custom.CustomStringConverter' > (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293) > > I then POST a JSON config to one of the connector nodes to create my > connector : > > { > "name": "hdfsSinkCustom", > "config": { > "topics": "yellow", > "tasks.max": "1", > "key.converter": "org.apache.kafka.connect.storage.StringConverter", > "value.converter": "custom.CustomStringConverter", > "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector", > "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink", > "topics.dir": "yellow_storage", > "flush.size": "1", > "rotate.interval.ms": "1000" > } > } > > And receive the following reply : > > { > "error_code": 400, > "message": "Connector configuration is invalid and contains the > following 1 error(s):\nInvalid value custom.CustomStringConverter for > configuration value.converter: Class custom.CustomStringConverter could not > be found.\nYou can also find the above list of errors at the endpoint > `/{connectorType}/config/validate`" > } > > ================ > > If I try running Kafka Connect stadnalone, the error message is the same. > > Has anybody faced this already ? What am I missing ? > > Many thanks to anybody reading this ! > > Jehan >