你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致 另外想请教下 1.11 版本 datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的, 但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。
在 2020-07-28 16:02:18,"Jark Wu" <imj...@gmail.com> 写道: >因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。 >1.12 中已经支持读取复杂结构为 string 类型了。 > >Best, >Jark > >On Tue, 28 Jul 2020 at 15:36, air23 <wangfei23_...@163.com> wrote: > >> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 >> >> >> { >> "data":[ >> { >> "op_id":"97037138", >> "order_id":"84172164" >> } >> ], >> "database":"order_11", >> "es":1595720375000, >> "id":17469027, >> "isDdl":false, >> "mysqlType":{ >> "op_id":"int(11)", >> "order_id":"int(11)" >> }, >> "old":null, >> "pkNames":[ >> "op_id" >> ], >> "sql":"", >> "sqlType":{ >> "op_id":4, >> "order_id":4 >> }, >> "table":"order_product", >> "ts":1595720375837, >> "type":"INSERT" >> } >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道: >> >有kafka 中json 数据的样例不? >> >有没有看过 TaskManager 中有没有异常 log 信息? >> > >> > >> > >> >On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote: >> > >> >> 你好 测试代码如下 >> >> >> >> >> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >> >> " `data` VARCHAR , " + >> >> " `table` VARCHAR " + >> >> ") WITH (" + >> >> " 'connector' = 'kafka'," + >> >> " 'topic' = 'source_databases'," + >> >> " 'properties.bootstrap.servers' = '***'," + >> >> " 'properties.group.id' = 'real1'," + >> >> " 'format' = 'json'," + >> >> " 'scan.startup.mode' = 'earliest-offset'" + >> >> ")"; >> >> public static void main(String[] args) throws Exception { >> >> >> >> >> >> //bink table >> >> StreamExecutionEnvironment bsEnv = >> >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> EnvironmentSettings bsSettings = >> >> >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> >> StreamTableEnvironment bsTableEnv = >> >> StreamTableEnvironment.create(bsEnv, bsSettings); >> >> >> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >> >> >> >> >> >> tableResult.print(); >> >> >> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >> >> >> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >> >> >> >> bsEnv.execute("aa"); >> >> >> >> } >> >> >> >> >> >> >> >> >> >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >> >> ,order_operation_time >> >> ,inventory_batch_log >> >> ,order_log >> >> ,order_address_book >> >> ,product_inventory >> >> ,order_physical_relation >> >> ,bil_business_attach >> >> ,picking_detail >> >> ,picking_detail >> >> ,orders >> >> >> >> >> >> >> >> >> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >> >> 看到例子都是useOldPlanner 来转table的。 >> >> 致谢 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道: >> >> >抱歉,还是没有看到附件。 >> >> >如果是文本的话,你可以直接贴到邮件里。 >> >> > >> >> >On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote: >> >> > >> >> >> 我再上传一次 >> >> >> >> >> >> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道: >> >> >> >> >> >> Hi, >> >> >> 你的附件好像没有上传。 >> >> >> >> >> >> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote: >> >> >> >> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table >> 不能取到data呢?* >> >> >> > >> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable >> (\n" >> >> + >> >> >> > " `data` VARCHAR , " + >> >> >> > " `table` VARCHAR " + >> >> >> > ") WITH (" + >> >> >> > " 'connector' = 'kafka'," + >> >> >> > " 'topic' = 'order_source'," + >> >> >> > " 'properties.bootstrap.servers' = '***'," + >> >> >> > " 'properties.group.id' = 'real1'," + >> >> >> > " 'format' = 'json'," + >> >> >> > " 'scan.startup.mode' = 'earliest-offset'" + >> >> >> > ")"; >> >> >> > >> >> >> > >> >> >> > 具体见附件 有打印 >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> >> >> >> >> >> >>