I'm working on Flink SQL client. Input data is json format and contains
nested json.

I'm trying to query the nested json from the table and expecting the output
to be nested json instead of string.

I've build the environment file to define a table schema as:

> format:
>
      type: json
>       fail-on-missing-field: false
>       json-schema: >
>         {
>           type: 'object',
>           properties: {
>             'lon': {
>               type: 'string'
>             },
>             'rideTime': {
>               type: 'string'
>             },
>             'nested': {
>               type: 'object',
>               properties: {
>                 'inner': {
>                   type: 'string'
>                 },
>                 'nested1': {
>                   type: 'object',
>                   properties: {
>                     'inner1': {
>                       type: 'string'
>                     }
>                   }
>                 }
>               }
>             },
>             'name': {
>               type: 'string'
>             }
>           }
>         }
>       derive-schema: false
>     schema:
>       - name: 'lon'
>         type: VARCHAR
>       - name: 'rideTime'
>         type: VARCHAR
>       - name: 'nested'
>         type: ROW<`inner` STRING, `nested1` ROW<`inner1` STRING>>
>       - name: 'name'
>         type: VARCHAR


Sink table schema:

> format:
>       type: json
>       fail-on-missing-field: false
>       derive-schema: true
>     schema:
>       - name: 'nested'
>         type: ROW<`inner` STRING>
>

Queries Been trying the following queries
Flink SQL> insert into nestedSink select nested.`inner` as `nested.inner`
from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink [nestedSink] do not match.
Query result schema: [nested.inner: String]
TableSink schema:    [nested: Row]

Flink SQL> insert into nestedSink select nested from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.table.api.ValidationException: Field types of query result
and registered TableSink [nestedSink] do not match.
Query result schema: [nested: Row]
TableSink schema:    [nested: Row]

Flink SQL> insert into nestedSink select nested.`inner` as nested.`inner`
from nestedJsonStream;
[INFO] Submitting SQL update statement to the cluster...
[ERROR] Could not execute SQL statement. Reason:
org.apache.flink.sql.parser.impl.ParseException: Encountered "." at line 1,
column 55.
Was expecting one of:
    <EOF>
    "EXCEPT" ...
    "FETCH" ...
    "FROM" ...
    "INTERSECT" ...
    "LIMIT" ...
    "OFFSET" ...
    "ORDER" ...
    "MINUS" ...
    "UNION" ...
    "," ...

Help me understand the problem with my schema/query?
Also would like to add new columns and nested colums.

Thanks
Srikanth

Reply via email to