你好 使用的是<flink.version>1.11.1</flink.version>版本的
在 2020-07-28 16:02:30,"明启 孙" <374060...@qq.com> 写道: >你的flink什么版本 > >发送自 Windows 10 版邮件应用 > >发件人: air23 >发送时间: 2020年7月28日 15:36 >收件人: user-zh@flink.apache.org >主题: Re:Re: Re: 解析kafka的mysql binlog问题 > >格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 > > >{ > "data":[ > { > "op_id":"97037138", > "order_id":"84172164" > } > ], > "database":"order_11", > "es":1595720375000, > "id":17469027, > "isDdl":false, > "mysqlType":{ > "op_id":"int(11)", > "order_id":"int(11)" > }, > "old":null, > "pkNames":[ > "op_id" > ], > "sql":"", > "sqlType":{ > "op_id":4, > "order_id":4 > }, > "table":"order_product", > "ts":1595720375837, > "type":"INSERT" >} > > > > > > > > > > > > > > > > > >在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道: >>有kafka 中json 数据的样例不? >>有没有看过 TaskManager 中有没有异常 log 信息? >> >> >> >>On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote: >> >>> 你好 测试代码如下 >>> >>> >>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >>> " `data` VARCHAR , " + >>> " `table` VARCHAR " + >>> ") WITH (" + >>> " 'connector' = 'kafka'," + >>> " 'topic' = 'source_databases'," + >>> " 'properties.bootstrap.servers' = '***'," + >>> " 'properties.group.id' = 'real1'," + >>> " 'format' = 'json'," + >>> " 'scan.startup.mode' = 'earliest-offset'" + >>> ")"; >>> public static void main(String[] args) throws Exception { >>> >>> >>> //bink table >>> StreamExecutionEnvironment bsEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> EnvironmentSettings bsSettings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>> StreamTableEnvironment bsTableEnv = >>> StreamTableEnvironment.create(bsEnv, bsSettings); >>> >>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >>> >>> >>> tableResult.print(); >>> >>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >>> >>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >>> >>> bsEnv.execute("aa"); >>> >>> } >>> >>> >>> >>> >>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >>> ,order_operation_time >>> ,inventory_batch_log >>> ,order_log >>> ,order_address_book >>> ,product_inventory >>> ,order_physical_relation >>> ,bil_business_attach >>> ,picking_detail >>> ,picking_detail >>> ,orders >>> >>> >>> >>> >>> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >>> 看到例子都是useOldPlanner 来转table的。 >>> 致谢 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道: >>> >抱歉,还是没有看到附件。 >>> >如果是文本的话,你可以直接贴到邮件里。 >>> > >>> >On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote: >>> > >>> >> 我再上传一次 >>> >> >>> >> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道: >>> >> >>> >> Hi, >>> >> 你的附件好像没有上传。 >>> >> >>> >> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote: >>> >> >>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >>> >> > >>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" >>> + >>> >> > " `data` VARCHAR , " + >>> >> > " `table` VARCHAR " + >>> >> > ") WITH (" + >>> >> > " 'connector' = 'kafka'," + >>> >> > " 'topic' = 'order_source'," + >>> >> > " 'properties.bootstrap.servers' = '***'," + >>> >> > " 'properties.group.id' = 'real1'," + >>> >> > " 'format' = 'json'," + >>> >> > " 'scan.startup.mode' = 'earliest-offset'" + >>> >> > ")"; >>> >> > >>> >> > >>> >> > 具体见附件 有打印 >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> >>> >> >>> >