Hi everyone,
I trying to send events from the topic
This is the sink json configuration I’ve used:
{
"name":"CPSConnector",
"config":{
"connector.class":"com.google.pubsub.kafka.sink.CloudPubSubSinkConnector",
"tasks.max":"1",
"topics":"STREAM-CUSTOMER-ACCOUNTS",
"cps.topic":"test",
"cps.project":"test-dev",
"maxBufferSize":"10"
}
}
and these are the ‘converter’ configuration I have in the properties file
(basically they are the default ones):
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
but I ‘m getting this trace in the log of Connect:
"org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka
Connect data failed due to serialization error: \n\tat
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:304)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:748)\nCaused by:
org.apache.kafka.common.errors.SerializationException:
com.fasterxml.jackson.core.JsonParseException: Unrecognized token
'ac0e69cb': was expecting ('true', 'false' or 'null')\n at [Source:
(byte[])\"ac0e69cb-5cab-4134-90c1-91ac70ce8b11\"; line: 1, column:
10]\nCaused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized
token 'ac0e69cb': was expecting ('true', 'false' or 'null')\n at [Source:
(byte[])\"ac0e69cb-5cab-4134-90c1-91ac70ce8b11\"; line: 1, column:
10]\n\tat
com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1798)\n\tat
com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:673)\n\tat
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3527)\n\tat
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2622)\n\tat
com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)\n\tat
com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)\n\tat
com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4030)\n\tat
com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2559)\n\tat
org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)\n\tat
org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:302)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:453)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:287)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:198)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:166)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)\n\tat
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)\n\tat
java.util.concurrent.FutureTask.run(FutureTask.java:266)\n\tat
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)\n\tat
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)\n\tat
java.lang.Thread.run(Thread.java:748)\n"
I´m not sure what configuration needs to be able to send the event
Thanks