Hi,

Flink SQL JSON format supports nested formats like the schema that you posted. Maybe the renaming with `from()` works not as expected. Did you try it without the `from()` where schema fields are equal to JSON fields?

Alternatively, you could also define the schema only and use the `deriveSchema()` mode of the format.

Btw there is a big bug in the JSON format that could affect how rows are parsed (https://issues.apache.org/jira/browse/FLINK-11727).

Maybe it is worth it to write your own format and perform the JSON parsing logic how you would like it.

Regards,
Timo

Am 04.03.19 um 08:38 schrieb 杨光:
Hi,
i am trying the flink sql api to read json formate data from kafka topic.
My json schema is a nested json like this
{
  "type": "object",
  "properties": {
    "table": {
      "type": "string"
    },
    "str2": {
      "type": "string"
    },
    "obj1": {
      "type": "object",
      "properties": {
        "rkey": {
          "type": "string"
        },
        "val": {
          "type": "string"
        },
        "lastTime": {
          "type": "number"
        }
      },
      "required": ["lastTime", "rkey", "val"]
    },
    "obj2": {
      "type": "object",
      "properties": {
        "val": {
          "type": "string"
        },
        "lastTime": {
          "type": "number"
        }
      },
      "required": ["lastTime", "val"]
    }
  },
  "required": ["table", "str2", "obj1", "obj2"]
}

i define a table sechema like this.

Schema schemaDesc1 = new Schema()
        .......
        .field("tablestr", Types.STRING).from("table")
        .......
        .field("rkey", Types.STRING).from("rkey");


when i run a debug case ,i got error about the "rkey" field (the file in the nest obj1) " SQL validation failed. Table field 'rkey' was resolved to TableSource return type field 'rkey', but field 'rkey' was not found in the return type Row".

My question is :does the org.apache.flink.table.descriptors.Json format support nested json schema? If does ,how can i set the right format or schema ? If not ,then how can i apply flink sql api on nested json data source.


Reply via email to