On Tue, Oct 10, 2017 at 8:31 AM, Jehan Bruggeman <jehanbrugge...@gmail.com>
wrote:

> Hello,
>
> I'm trying to use a custom converter with Kafka Connect and I cannot seem
> to get it right. I'm hoping someone has experience with this and could help
> me figure it out !
>
>
> Initial situation
> ================
>
> - my custom converter's class path is 'custom.CustomStringConverter'.
>
> - to avoid any mistakes, my custom converter is currently just a copy/paste
> of the pre-existing StringConverter (of course, this will change when I'll
> get it to work).
> https://github.com/apache/kafka/blob/trunk/connect/api/
> src/main/java/org/apache/kafka/connect/storage/StringConverter.java
>
> - I have a kafka connect cluster of 3 nodes, The nodes are running
> confluent's official docker images ( confluentinc/cp-kafka-connect:3.3.0
> ).
>
> - Each node is configured to load a jar with my converter in it (using a
> docker volume).
>

Can you explain this in more detail? Make sure that you add the JAR to the
classpath.


>
>
>
> What happens ?
> ================
>
> When the connectors start, they correctly load the jars and find the custom
> converter. Indeed, this is what I see in the logs :
>
> [2017-10-10 13:06:46,274] INFO Registered loader:
> PluginClassLoader{pluginLocation=file:/opt/custom-connectors/custom-
> converter-1.0-SNAPSHOT.jar}
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:199)
> [2017-10-10 13:06:46,274] INFO Added plugin 'custom.CustomStringConverter'
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:132)
> [...]
> [2017-10-10 13:07:43,454] INFO Added aliases 'CustomStringConverter' and
> 'CustomString' to plugin 'custom.CustomStringConverter'
> (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader:293)
>
> I then POST a JSON config to one of the connector nodes to create my
> connector :
>
> {
>   "name": "hdfsSinkCustom",
>   "config": {
>     "topics": "yellow",
>     "tasks.max": "1",
>     "key.converter": "org.apache.kafka.connect.storage.StringConverter",
>     "value.converter": "custom.CustomStringConverter",
>     "connector.class": "io.confluent.connect.hdfs.HdfsSinkConnector",
>     "hdfs.url": "hdfs://hdfs-namenode:8020/hdfs-sink",
>     "topics.dir": "yellow_storage",
>     "flush.size": "1",
>     "rotate.interval.ms": "1000"
>   }
> }
>
> And receive the following reply :
>
> {
>    "error_code": 400,
>    "message": "Connector configuration is invalid and contains the
> following 1 error(s):\nInvalid value custom.CustomStringConverter for
> configuration value.converter: Class custom.CustomStringConverter could not
> be found.\nYou can also find the above list of errors at the endpoint
> `/{connectorType}/config/validate`"
> }
>
> ================
>
> If I try running Kafka Connect stadnalone, the error message is the same.
>
> Has anybody faced this already ? What am I missing ?
>
> Many thanks to anybody reading this !
>
> Jehan
>

Reply via email to