I have found a way??
  select row(msg.f1) from table.

 

 ------------------ ???????? ------------------
  ??????: "????????"<2463...@qq.com>;
 ????????: 2019??9??4??(??????) ????10:57
 ??????: 
"????????"<2463...@qq.com>;"JingsongLee"<lzljs3620...@aliyun.com>;"user"<user@flink.apache.org>;
 
 ????: ??????question

 

 I want to output the query results to kafka, json format is as follows??
 {
 "id": "123",
 "serial": "6b0c2d26",
 "msg": {
  "f1": "5677"
 }
}
  How to define the format and schema of kafka sink??
 thanks!

 

 ------------------ ???????? ------------------
  ??????: "????????"<2463...@qq.com>;
 ????????: 2019??9??3??(??????) ????8:18
 ??????: "JingsongLee"<lzljs3620...@aliyun.com>;"user"<user@flink.apache.org>;
 
 ????: ??????question

 

thank you??  
  Let me try??

 

 ------------------ ???????? ------------------
  ??????: "JingsongLee"<lzljs3620...@aliyun.com>;
 ????????: 2019??9??3??(??????) ????7:53
 ??????: "????????"<2463...@qq.com>;"user"<user@flink.apache.org>;
 
 ????: Re:question

 

should be schema.field(??msg??, Types.ROW(...))?  And you should select msg.f1 
from table.
  

 Best
 Jingsong Lee
 

 


 ???????????? iPhone??
   ------------------Original Mail ------------------
 From:???????? <2463...@qq.com>
 Date:2019-09-03 09:22:41
 Recipient:user <user@flink.apache.org>
 Subject:question

  How do you do:
 My problem is flink table format and table schema mapping.
 The input data is similar to the following json format??
 { "id": "123", "serial": "6b0c2d26", "msg": { "f1": "5677" } } The format code 
for TableSource is as follows: new Json().schema(Types.ROW(new String[] { "id", 
"serial", "msg" }, new TypeInformation << ? > [] { Types.STRING(), 
Types.STRING(), Types.ROW(new String[] { "f1" }, new TypeInformation << ? > [] 
{ Types.STRING() }) }));
 

 The schema part of TableSource is as follows:
 Schema schema = new Schema(); schema.field("id", Types.STRING()); 
schema.field("serial", Types.STRING());
 

 I don't know how to define the f1 field of msg in the schema. I tried 
schema.field("f1", Types.STRING()) before; but I will report an error. What is 
the correct method?
 The following SQL can be run correctly:
 select id,serial,f1 from table; 
 

 My flink version is 1.8.1,use flink table & SQL API
 

 thanks;

Reply via email to