Maciej Bryński created FLINK-19786:
--------------------------------------

             Summary: Flink doesn't set proper nullability for Logical types 
for Confluent Avro Serialization
                 Key: FLINK-19786
                 URL: https://issues.apache.org/jira/browse/FLINK-19786
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.12.0
            Reporter: Maciej Bryński


When Flink is creating schema in registry nullability is not properly set for 
logical types.
Examples. Table:


{code:sql}
create table `test_logical_null` (
        `string_field` STRING,
        `timestamp_field` TIMESTAMP(3)
) WITH (
  'connector' = 'kafka', 
  'topic' = 'test-logical-null', 
  'properties.bootstrap.servers' = 'localhost:9092', 
  'properties.group.id' = 'test12345', 
   'scan.startup.mode' = 'earliest-offset', 
  'format' = 'avro-confluent', -- Must be set to 'avro-confluent' to configure 
this format.
  'avro-confluent.schema-registry.url' = 'http://localhost:8081', -- URL to 
connect to Confluent Schema Registry
  'avro-confluent.schema-registry.subject' = 'test-logical-null' -- Subject 
name to write to the Schema Registry service; required for sinks
)
{code}

Schema:


{code:json}
{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "string_field",
      "type": [
        "string",
        "null"
      ]
    },
    {
      "name": "timestamp_field",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}
{code}

For not null fields:

{code:sql}
create table `test_logical_notnull` (
        `string_field` STRING NOT NULL,
        `timestamp_field` TIMESTAMP(3) NOT NULL
) WITH (
  'connector' = 'kafka', 
  'topic' = 'test-logical-notnull', 
  'properties.bootstrap.servers' = 'localhost:9092', 
  'properties.group.id' = 'test12345', 
   'scan.startup.mode' = 'earliest-offset', 
  'format' = 'avro-confluent', -- Must be set to 'avro-confluent' to configure 
this format.
  'avro-confluent.schema-registry.url' = 'http://localhost:8081', -- URL to 
connect to Confluent Schema Registry
  'avro-confluent.schema-registry.subject' = 'test-logical-notnull-value' -- 
Subject name to write to the Schema Registry service; required for sinks
);
{code}
Schema

{code:json}
{
  "type": "record",
  "name": "record",
  "fields": [
    {
      "name": "string_field",
      "type": "string"
    },
    {
      "name": "timestamp_field",
      "type": {
        "type": "long",
        "logicalType": "timestamp-millis"
      }
    }
  ]
}
{code}
As we can see for string_field we have proper union with null (for nullable 
field). For timestamp_field in both examples union is missing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to