kasparjarek opened a new issue, #30: URL: https://github.com/apache/pulsar-connectors/issues/30
# Issue Pulsar Kafka Connect adaptor uses the same converters for both data and offset storage. When using `AvroConverter` the adaptor uses `MockSchemaRegistryClient`: - Schema IDs stored in memory mock only - After restart, offset deserialization fails with "Error retrieving Avro value schema for id 1 ... Subject Not Found; error code: 40401" - Connector loses offset tracking functionality Why is this not an issue for data messages using Avro with mock Schema Registry? For the data topic the schema is published with property `__AVRO_READ_OFFSET__` instructing clients to trim first X bytes ([code link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/schema/KafkaSchemaWrappedSchema.java#L44)). But the client for offsets is hardcoded to use Schema.BYTES, so no such schema is being published to offset topic ([code link](https://github.com/apache/pulsar-connectors/blob/314b9defb6888804e3c3ec75085b015b7b16ae1b/kafka-connect-adaptor/src/main/java/org/apache/pulsar/io/kafka/connect/PulsarOffsetBackingStore.java#L157)). # How Kafka Connect handles offset converters Kafka Connect used to have specific configurations to setup converters for offsets called `internal.key.converter` and `internal.value.converter`. Those were removed and replaced with hard coded JSON converters with `schema.enable` set to `false`. Here is quote from change log to version 2.0.0 ([doc link](https://kafka.apache.org/30/getting-started/upgrade/#notable-changes-in-200)): > In earlier releases, Connect’s worker configuration required the internal.key.converter and internal.value.converter properties. In 2.0, these are [no longer required](https://cwiki.apache.org/confluence/x/AZQ7B) and default to the JSON converter. You may safely remove these properties from your Connect standalone and distributed worker configurations: internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter.schemas.enable=false # Fix The adaptor should create separate converters for offsets instead of reusing the ones used for data. The same way as Kafka Connect does it. **Backward compatible fix** - For older version of pulsar connectors, to keep the behavior backward compatible, I would propose adding a new optional configuration properties. When set, new converters for offsets will be created allowing usage of `JsonConverter` for offsets (no schema registry dependency) while keeping `AvroConverter` for data. - `offset.storage.topic.key.converter` - `offset.storage.topic.value.converter` **For new major version**, I would propose to create a new converters hardcoded as JSON with `schema.enabled=false` to be compatible with the Kafka Connect logic. I am happy to prepare a fix. Should I create PR for the "major version" fix in this repo and backward compatible fix in the https://github.com/apache/pulsar repo? --- Related issue https://github.com/apache/pulsar-connectors/issues/29 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
