[ https://issues.apache.org/jira/browse/KAFKA-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928550#comment-16928550 ]
Oliver Weiler commented on KAFKA-6895: -------------------------------------- Hi, how is this supposed to work with timestamps? From my understanding, this information is impossible to deduce from the payload alone. > Schema Inferencing for JsonConverter > ------------------------------------ > > Key: KAFKA-6895 > URL: https://issues.apache.org/jira/browse/KAFKA-6895 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect > Reporter: Allen Tang > Priority: Minor > > Though there does exist a converter in the connect-json library called > "JsonConverter", there are limitations as to the domain of JSON payloads this > converter is compatible with on the Sink Connector side when serializing them > into Kafka Connect datatypes; When reading byte arrays from Kafka, the > JsonConverter expects its inputs to be a JSON envelope that contains the > fields "schema" and "payload", otherwise it'll throw a DataException > reporting: > ??JsonConverter with schemas.enable requires "schema" and "payload" fields > and may not contain additional fields. If you are trying to deserialize plain > JSON data, set schemas.enable=false in your converter configuration.?? > (when schemas.enable is true) or > ??JSON value converted to Kafka Connect must be in envelope containing > schema?? > (when schemas.enable is false) > For example, if your JSON payload looks something on the order of: > { "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": > 1501834166000 } > This will not be compatible for Sink Connectors that require the schema for > data ingest when mapping from Kafka Connect datatypes to, for example, JDBC > datatypes. Rather, that data is expected to be structured like so: > { "schema": \{ "type": "struct", "fields": [{ "type": "int32", "optional": > true, "field": "c1" }, \{ "type": "string", "optional": true, "field": "c2" > }, \{ "type": "int64", "optional": false, "name": > "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" > }, \{ "type": "int64", "optional": false, "name": > "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" > }], "optional": false, "name": "foobar" }, "payload": \{ "c1": 10000, "c2": > "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } } > The "schema" is a necessary component in order to dictate to the > JsonConverter how to map the payload's JSON datatypes to Kafka Connect > datatypes on the consumer side. > Introduce a new configuration for the JsonConverter class called > "schemas.infer.enable". When this flag is set to "false", the existing > behavior is exhibited. When it's set to "true", infer the schema from the > contents of the JSON record, and return that as part of the SchemaAndValue > object for Sink Connectors. -- This message was sent by Atlassian Jira (v8.3.2#803003)