[ 
https://issues.apache.org/jira/browse/KAFKA-6895?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Allen Tang updated KAFKA-6895:
------------------------------
    Description: 
Though there does exist a converter in the connect-json library called 
"JsonConverter", there are limitations as to the domain of JSON payloads this 
converter is compatible with on the Sink Connector side when serializing them 
into Kafka Connect datatypes; When reading byte arrays from Kafka, the 
JsonConverter expects its inputs to be a JSON envelope that contains the fields 
"schema" and "payload", otherwise it'll throw a DataException reporting:
 ??JsonConverter with schemas.enable requires "schema" and "payload" fields and 
may not contain additional fields. If you are trying to deserialize plain JSON 
data, set schemas.enable=false in your converter configuration.??
 (when schemas.enable is true) or
 ??JSON value converted to Kafka Connect must be in envelope containing schema??
 (when schemas.enable is false)
 For example, if your JSON payload looks something on the order of:

_{ "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 
1501834166000 }_

This will not be compatible for Sink Connectors that require the schema for 
data ingest when mapping from Kafka Connect datatypes to, for example, JDBC 
datatypes. Rather, that data is expected to be structured like so:
 _{ "schema": \{ "type": "struct", "fields": [{ "type": "int32", "optional": 
true, "field": "c1" }, \{ "type": "string", "optional": true, "field": "c2" }, 
\{ "type": "int64", "optional": false, "name": 
"org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" 
}, \{ "type": "int64", "optional": false, "name": 
"org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" 
}], "optional": false, "name": "foobar" }, "payload": \{ "c1": 10000, "c2": 
"bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }_

The "schema" is a necessary component in order to dictate to the JsonConverter 
how to map the payload's JSON datatypes to Kafka Connect datatypes on the 
consumer side.

 

Introduce a new configuration for the JsonConverter class called 
"schemas.infer.enable". When this flag is set to "false", the existing behavior 
is exhibited. When it's set to "true", infer the schema from the contents of 
the JSON record, and return that as part of the SchemaAndValue object for Sink 
Connectors.

  was:
Though there does exist a converter in the connect-json library called 
"JsonConverter", there are limitations as to the domain of JSON payloads this 
converter is compatible with on the Sink Connector side when serializing them 
into Kafka Connect datatypes; When reading byte arrays from Kafka, the 
JsonConverter expects its inputs to be a JSON envelope that contains the fields 
"schema" and "payload", otherwise it'll throw a DataException reporting:
 ??JsonConverter with schemas.enable requires "schema" and "payload" fields and 
may not contain additional fields. If you are trying to deserialize plain JSON 
data, set schemas.enable=false in your converter configuration.??
 (when schemas.enable is true) or
 ??JSON value converted to Kafka Connect must be in envelope containing schema??
 (when schemas.enable is false)
 For example, if your JSON payload looks something on the order of:
 ??

{ "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 
1501834166000 }


 This will not be compatible for Sink Connectors that require the schema for 
data ingest when mapping from Kafka Connect datatypes to, for example, JDBC 
datatypes. Rather, that data is expected to be structured like so:
{ "schema": \{ "type": "struct", "fields": [{ "type": "int32", "optional": 
true, "field": "c1" }, \{ "type": "string", "optional": true, "field": "c2" }, 
\{ "type": "int64", "optional": false, "name": 
"org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" 
}, \{ "type": "int64", "optional": false, "name": 
"org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" 
}], "optional": false, "name": "foobar" }, "payload": \{ "c1": 10000, "c2": 
"bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }

The "schema" is a necessary component in order to dictate to the JsonConverter 
how to map the payload's JSON datatypes to Kafka Connect datatypes on the 
consumer side.

 

Introduce a new configuration for the JsonConverter class called 
"schemas.infer.enable". When this flag is set to "false", the existing behavior 
is exhibited. When it's set to "true", infer the schema from the contents of 
the JSON record, and return that as part of the SchemaAndValue object for Sink 
Connectors.


> Schema Inferencing for JsonConverter
> ------------------------------------
>
>                 Key: KAFKA-6895
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6895
>             Project: Kafka
>          Issue Type: New Feature
>          Components: KafkaConnect
>            Reporter: Allen Tang
>            Priority: Minor
>
> Though there does exist a converter in the connect-json library called 
> "JsonConverter", there are limitations as to the domain of JSON payloads this 
> converter is compatible with on the Sink Connector side when serializing them 
> into Kafka Connect datatypes; When reading byte arrays from Kafka, the 
> JsonConverter expects its inputs to be a JSON envelope that contains the 
> fields "schema" and "payload", otherwise it'll throw a DataException 
> reporting:
>  ??JsonConverter with schemas.enable requires "schema" and "payload" fields 
> and may not contain additional fields. If you are trying to deserialize plain 
> JSON data, set schemas.enable=false in your converter configuration.??
>  (when schemas.enable is true) or
>  ??JSON value converted to Kafka Connect must be in envelope containing 
> schema??
>  (when schemas.enable is false)
>  For example, if your JSON payload looks something on the order of:
> _{ "c1": 10000, "c2": "bar", "create_ts": 1501834166000, "update_ts": 
> 1501834166000 }_
> This will not be compatible for Sink Connectors that require the schema for 
> data ingest when mapping from Kafka Connect datatypes to, for example, JDBC 
> datatypes. Rather, that data is expected to be structured like so:
>  _{ "schema": \{ "type": "struct", "fields": [{ "type": "int32", "optional": 
> true, "field": "c1" }, \{ "type": "string", "optional": true, "field": "c2" 
> }, \{ "type": "int64", "optional": false, "name": 
> "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "create_ts" 
> }, \{ "type": "int64", "optional": false, "name": 
> "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "update_ts" 
> }], "optional": false, "name": "foobar" }, "payload": \{ "c1": 10000, "c2": 
> "bar", "create_ts": 1501834166000, "update_ts": 1501834166000 } }_
> The "schema" is a necessary component in order to dictate to the 
> JsonConverter how to map the payload's JSON datatypes to Kafka Connect 
> datatypes on the consumer side.
>  
> Introduce a new configuration for the JsonConverter class called 
> "schemas.infer.enable". When this flag is set to "false", the existing 
> behavior is exhibited. When it's set to "true", infer the schema from the 
> contents of the JSON record, and return that as part of the SchemaAndValue 
> object for Sink Connectors.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to