因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。 1.12 中已经支持读取复杂结构为 string 类型了。
Best, Jark On Tue, 28 Jul 2020 at 15:36, air23 <wangfei23_...@163.com> wrote: > 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 > > > { > "data":[ > { > "op_id":"97037138", > "order_id":"84172164" > } > ], > "database":"order_11", > "es":1595720375000, > "id":17469027, > "isDdl":false, > "mysqlType":{ > "op_id":"int(11)", > "order_id":"int(11)" > }, > "old":null, > "pkNames":[ > "op_id" > ], > "sql":"", > "sqlType":{ > "op_id":4, > "order_id":4 > }, > "table":"order_product", > "ts":1595720375837, > "type":"INSERT" > } > > > > > > > > > > > > > > > > > > 在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道: > >有kafka 中json 数据的样例不? > >有没有看过 TaskManager 中有没有异常 log 信息? > > > > > > > >On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote: > > > >> 你好 测试代码如下 > >> > >> > >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + > >> " `data` VARCHAR , " + > >> " `table` VARCHAR " + > >> ") WITH (" + > >> " 'connector' = 'kafka'," + > >> " 'topic' = 'source_databases'," + > >> " 'properties.bootstrap.servers' = '***'," + > >> " 'properties.group.id' = 'real1'," + > >> " 'format' = 'json'," + > >> " 'scan.startup.mode' = 'earliest-offset'" + > >> ")"; > >> public static void main(String[] args) throws Exception { > >> > >> > >> //bink table > >> StreamExecutionEnvironment bsEnv = > >> StreamExecutionEnvironment.getExecutionEnvironment(); > >> EnvironmentSettings bsSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > >> StreamTableEnvironment bsTableEnv = > >> StreamTableEnvironment.create(bsEnv, bsSettings); > >> > >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); > >> > >> > >> tableResult.print(); > >> > >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); > >> > >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); > >> > >> bsEnv.execute("aa"); > >> > >> } > >> > >> > >> > >> > >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog > >> ,order_operation_time > >> ,inventory_batch_log > >> ,order_log > >> ,order_address_book > >> ,product_inventory > >> ,order_physical_relation > >> ,bil_business_attach > >> ,picking_detail > >> ,picking_detail > >> ,orders > >> > >> > >> > >> > >> 另外再问个问题。1.11版本 blink 不能datastream转table吗? > >> 看到例子都是useOldPlanner 来转table的。 > >> 致谢 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道: > >> >抱歉,还是没有看到附件。 > >> >如果是文本的话,你可以直接贴到邮件里。 > >> > > >> >On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote: > >> > > >> >> 我再上传一次 > >> >> > >> >> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道: > >> >> > >> >> Hi, > >> >> 你的附件好像没有上传。 > >> >> > >> >> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote: > >> >> > >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table > 不能取到data呢?* > >> >> > > >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable > (\n" > >> + > >> >> > " `data` VARCHAR , " + > >> >> > " `table` VARCHAR " + > >> >> > ") WITH (" + > >> >> > " 'connector' = 'kafka'," + > >> >> > " 'topic' = 'order_source'," + > >> >> > " 'properties.bootstrap.servers' = '***'," + > >> >> > " 'properties.group.id' = 'real1'," + > >> >> > " 'format' = 'json'," + > >> >> > " 'scan.startup.mode' = 'earliest-offset'" + > >> >> > ")"; > >> >> > > >> >> > > >> >> > 具体见附件 有打印 > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > >> >> > >> >