因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
1.12 中已经支持读取复杂结构为 string 类型了。

Best,
Jark

On Tue, 28 Jul 2020 at 15:36, air23 <wangfei23_...@163.com> wrote:

> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
> {
>     "data":[
>         {
>             "op_id":"97037138",
>             "order_id":"84172164"
>         }
>     ],
>     "database":"order_11",
>     "es":1595720375000,
>     "id":17469027,
>     "isDdl":false,
>     "mysqlType":{
>         "op_id":"int(11)",
>         "order_id":"int(11)"
>     },
>     "old":null,
>     "pkNames":[
>         "op_id"
>     ],
>     "sql":"",
>     "sqlType":{
>         "op_id":4,
>         "order_id":4
>     },
>     "table":"order_product",
>     "ts":1595720375837,
>     "type":"INSERT"
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道:
> >有kafka 中json 数据的样例不?
> >有没有看过 TaskManager 中有没有异常 log 信息?
> >
> >
> >
> >On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote:
> >
> >> 你好 测试代码如下
> >>
> >>
> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> >> " `data` VARCHAR , " +
> >> " `table` VARCHAR " +
> >> ") WITH (" +
> >> " 'connector' = 'kafka'," +
> >> " 'topic' = 'source_databases'," +
> >> " 'properties.bootstrap.servers' = '***'," +
> >> " 'properties.group.id' = 'real1'," +
> >> " 'format' = 'json'," +
> >> " 'scan.startup.mode' = 'earliest-offset'" +
> >> ")";
> >> public static void main(String[] args) throws Exception {
> >>
> >>
> >> //bink table
> >> StreamExecutionEnvironment bsEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >>     EnvironmentSettings bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >>     StreamTableEnvironment bsTableEnv =
> >> StreamTableEnvironment.create(bsEnv, bsSettings);
> >>
> >>     TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
> >>
> >>
> >> tableResult.print();
> >>
> >>     Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
> >>
> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
> >>
> >> bsEnv.execute("aa");
> >>
> >> }
> >>
> >>
> >>
> >>
> >> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> >> ,order_operation_time
> >> ,inventory_batch_log
> >> ,order_log
> >> ,order_address_book
> >> ,product_inventory
> >> ,order_physical_relation
> >> ,bil_business_attach
> >> ,picking_detail
> >> ,picking_detail
> >> ,orders
> >>
> >>
> >>
> >>
> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> >> 看到例子都是useOldPlanner 来转table的。
> >> 致谢
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道:
> >> >抱歉,还是没有看到附件。
> >> >如果是文本的话,你可以直接贴到邮件里。
> >> >
> >> >On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote:
> >> >
> >> >> 我再上传一次
> >> >>
> >> >> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道:
> >> >>
> >> >> Hi,
> >> >> 你的附件好像没有上传。
> >> >>
> >> >> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote:
> >> >>
> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
> 不能取到data呢?*
> >> >> >
> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
> (\n"
> >> +
> >> >> >         " `data` VARCHAR , " +
> >> >> >         " `table` VARCHAR " +
> >> >> >         ") WITH (" +
> >> >> >         " 'connector' = 'kafka'," +
> >> >> >         " 'topic' = 'order_source'," +
> >> >> >         " 'properties.bootstrap.servers' = '***'," +
> >> >> >         " 'properties.group.id' = 'real1'," +
> >> >> >         " 'format' = 'json'," +
> >> >> >         " 'scan.startup.mode' = 'earliest-offset'" +
> >> >> >         ")";
> >> >> >
> >> >> >
> >> >> > 具体见附件 有打印
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >> >>
> >>
>

回复