[ 
https://issues.apache.org/jira/browse/KAFKA-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16928550#comment-16928550
 ] 

Oliver Weiler commented on KAFKA-6895:
--------------------------------------

Hi, how is this supposed to work with timestamps? From my understanding, this 
information is impossible to deduce from the payload alone.

> Schema Inferencing for JsonConverter
> ------------------------------------
>
>                 Key: KAFKA-6895
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6895
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Allen Tang
>            Priority: Minor
>
> Though there does exist a converter in the connect-json library called 
> "JsonConverter", there are limitations as to the domain of JSON payloads this 
> converter is compatible with on the Sink Connector side when serializing them 
> into Kafka Connect datatypes; When reading byte arrays from Kafka, the 
> JsonConverter expects its inputs to be a JSON envelope that contains the 
> fields "schema" and "payload", otherwise it'll throw a DataException 
> reporting:
>  ??JsonConverter with schemas.enable requires "schema" and "payload" fields 
> and may not contain additional fields. If you are trying to deserialize plain 
> JSON data, set schemas.enable=false in your converter configuration.??
>  (when schemas.enable is true) or
>  ??JSON value converted to Kafka Connect must be in envelope containing 
> schema??
>  (when schemas.enable is false)
>  For example, if your JSON payload looks something on the order of:
> { "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 
> 1501834166000 }
> This will not be compatible for Sink Connectors that require the schema for 
> data ingest when mapping from Kafka Connect datatypes to, for example, JDBC 
> datatypes. Rather, that data is expected to be structured like so:
> { "schema": \{ "type": "struct", "fields": [{ "type": "int32", "optional": 
> true, "field": "c1" }, \{ "type": "string", "optional": true, "field": "c2" 
> }, \{ "type": "int64", "optional": false, "name": 
> "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" 
> }, \{ "type": "int64", "optional": false, "name": 
> "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" 
> }], "optional": false, "name": "foobar" }, "payload": \{ "c1": 10000, "c2": 
> "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }
> The "schema" is a necessary component in order to dictate to the 
> JsonConverter how to map the payload's JSON datatypes to Kafka Connect 
> datatypes on the consumer side.
> Introduce a new configuration for the JsonConverter class called 
> "schemas.infer.enable". When this flag is set to "false", the existing 
> behavior is exhibited. When it's set to "true", infer the schema from the 
> contents of the JSON record, and return that as part of the SchemaAndValue 
> object for Sink Connectors.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to