Re:Re: Re: Re: 解析kafka的mysql binlog问题
你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致 另外想请教下 1.11 版本 datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的, 但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。 在 2020-07-28 16:02:18,"Jark Wu" 写道: >因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。 >1.12 中已经支持读取复杂结构为 string 类型了。 > >Best, >Jark > >On Tue, 28 Jul 2020 at 15:36, air23 wrote: > >> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 >> >> >> { >> "data":[ >> { >> "op_id":"97037138", >> "order_id":"84172164" >> } >> ], >> "database":"order_11", >> "es":1595720375000, >> "id":17469027, >> "isDdl":false, >> "mysqlType":{ >> "op_id":"int(11)", >> "order_id":"int(11)" >> }, >> "old":null, >> "pkNames":[ >> "op_id" >> ], >> "sql":"", >> "sqlType":{ >> "op_id":4, >> "order_id":4 >> }, >> "table":"order_product", >> "ts":1595720375837, >> "type":"INSERT" >> } >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-28 14:44:35,"Jark Wu" 写道: >> >有kafka 中json 数据的样例不? >> >有没有看过 TaskManager 中有没有异常 log 信息? >> > >> > >> > >> >On Tue, 28 Jul 2020 at 09:40, air23 wrote: >> > >> >> 你好 测试代码如下 >> >> >> >> >> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >> >> " `data` VARCHAR , " + >> >> " `table` VARCHAR " + >> >> ") WITH (" + >> >> " 'connector' = 'kafka'," + >> >> " 'topic' = 'source_databases'," + >> >> " 'properties.bootstrap.servers' = '***'," + >> >> " 'properties.group.id' = 'real1'," + >> >> " 'format' = 'json'," + >> >> " 'scan.startup.mode' = 'earliest-offset'" + >> >> ")"; >> >> public static void main(String[] args) throws Exception { >> >> >> >> >> >> //bink table >> >> StreamExecutionEnvironment bsEnv = >> >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> EnvironmentSettings bsSettings = >> >> >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> >> StreamTableEnvironment bsTableEnv = >> >> StreamTableEnvironment.create(bsEnv, bsSettings); >> >> >> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >> >> >> >> >> >> tableResult.print(); >> >> >> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >> >> >> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >> >> >> >> bsEnv.execute("aa"); >> >> >> >> } >> >> >> >> >> >> >> >> >> >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >> >> ,order_operation_time >> >> ,inventory_batch_log >> >> ,order_log >> >> ,order_address_book >> >> ,product_inventory >> >> ,order_physical_relation >> >> ,bil_business_attach >> >> ,picking_detail >> >> ,picking_detail >> >> ,orders >> >> >> >> >> >> >> >> >> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >> >> 看到例子都是useOldPlanner 来转table的。 >> >> 致谢 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-27 19:44:10,"Jark Wu" 写道: >> >> >抱歉,还是没有看到附件。 >> >> >如果是文本的话,你可以直接贴到邮件里。 >> >> > >> >> >On Mon, 27 Jul 2020 at 19:22, air23 wrote: >> >> > >> >> >> 我再上传一次 >> >> >> >> >> >> 在2020年07月27日 18:55,Jark Wu 写道: >> >> >> >> >> >> Hi, >> >> >> 你的附件好像没有上传。 >> >> >> >> >> >> On Mon, 27 Jul 2020 at 18:17, air23 wrote: >> >> >> >> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table >> 不能取到data呢?* >> >> >> > >> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable >> (\n" >> >> + >> >> >> > " `data` VARCHAR , " + >> >> >> > " `table` VARCHAR " + >> >> >> > ") WITH (" + >> >> >> > " 'connector' = 'kafka'," + >> >> >> > " 'topic' = 'order_source'," + >> >> >> > " 'properties.bootstrap.servers' = '***'," + >> >> >> > " 'properties.group.id' = 'real1'," + >> >> >> > " 'format' = 'json'," + >> >> >> > " 'scan.startup.mode' = 'earliest-offset'" + >> >> >> > ")"; >> >> >> > >> >> >> > >> >> >> > 具体见附件 有打印 >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> > >> >> >> >> >> >> >> >> >>
Re:回复: Re: Re: 解析kafka的mysql binlog问题
你好 使用的是1.11.1版本的 在 2020-07-28 16:02:30,"明启 孙" <374060...@qq.com> 写道: >你的flink什么版本 > >发送自 Windows 10 版邮件应用 > >发件人: air23 >发送时间: 2020年7月28日 15:36 >收件人: user-zh@flink.apache.org >主题: Re:Re: Re: 解析kafka的mysql binlog问题 > >格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 > > >{ >"data":[ >{ >"op_id":"97037138", >"order_id":"84172164" >} >], >"database":"order_11", >"es":1595720375000, >"id":17469027, >"isDdl":false, >"mysqlType":{ >"op_id":"int(11)", >"order_id":"int(11)" >}, >"old":null, >"pkNames":[ >"op_id" >], >"sql":"", >"sqlType":{ >"op_id":4, >"order_id":4 >}, >"table":"order_product", >"ts":1595720375837, >"type":"INSERT" >} > > > > > > > > > > > > > > > > > >在 2020-07-28 14:44:35,"Jark Wu" 写道: >>有kafka 中json 数据的样例不? >>有没有看过 TaskManager 中有没有异常 log 信息? >> >> >> >>On Tue, 28 Jul 2020 at 09:40, air23 wrote: >> >>> 你好 测试代码如下 >>> >>> >>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >>> " `data` VARCHAR , " + >>> " `table` VARCHAR " + >>> ") WITH (" + >>> " 'connector' = 'kafka'," + >>> " 'topic' = 'source_databases'," + >>> " 'properties.bootstrap.servers' = '***'," + >>> " 'properties.group.id' = 'real1'," + >>> " 'format' = 'json'," + >>> " 'scan.startup.mode' = 'earliest-offset'" + >>> ")"; >>> public static void main(String[] args) throws Exception { >>> >>> >>> //bink table >>> StreamExecutionEnvironment bsEnv = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> EnvironmentSettings bsSettings = >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>> StreamTableEnvironment bsTableEnv = >>> StreamTableEnvironment.create(bsEnv, bsSettings); >>> >>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >>> >>> >>> tableResult.print(); >>> >>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >>> >>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >>> >>> bsEnv.execute("aa"); >>> >>> } >>> >>> >>> >>> >>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >>> ,order_operation_time >>> ,inventory_batch_log >>> ,order_log >>> ,order_address_book >>> ,product_inventory >>> ,order_physical_relation >>> ,bil_business_attach >>> ,picking_detail >>> ,picking_detail >>> ,orders >>> >>> >>> >>> >>> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >>> 看到例子都是useOldPlanner 来转table的。 >>> 致谢 >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> 在 2020-07-27 19:44:10,"Jark Wu" 写道: >>> >抱歉,还是没有看到附件。 >>> >如果是文本的话,你可以直接贴到邮件里。 >>> > >>> >On Mon, 27 Jul 2020 at 19:22, air23 wrote: >>> > >>> >> 我再上传一次 >>> >> >>> >> 在2020年07月27日 18:55,Jark Wu 写道: >>> >> >>> >> Hi, >>> >> 你的附件好像没有上传。 >>> >> >>> >> On Mon, 27 Jul 2020 at 18:17, air23 wrote: >>> >> >>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >>> >> > >>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" >>> + >>> >> > " `data` VARCHAR , " + >>> >> > " `table` VARCHAR " + >>> >> > ") WITH (" + >>> >> > " 'connector' = 'kafka'," + >>> >> > " 'topic' = 'order_source'," + >>> >> > " 'properties.bootstrap.servers' = '***'," + >>> >> > " 'properties.group.id' = 'real1'," + >>> >> > " 'format' = 'json'," + >>> >> > " 'scan.startup.mode' = 'earliest-offset'" + >>> >> > ")"; >>> >> > >>> >> > >>> >> > 具体见附件 有打印 >>> >> > >>> >> > >>> >> > >>> >> > >>> >> > >>> >> >>> >> >>> >
Re: Re: Re: 解析kafka的mysql binlog问题
因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。 1.12 中已经支持读取复杂结构为 string 类型了。 Best, Jark On Tue, 28 Jul 2020 at 15:36, air23 wrote: > 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 > > > { > "data":[ > { > "op_id":"97037138", > "order_id":"84172164" > } > ], > "database":"order_11", > "es":1595720375000, > "id":17469027, > "isDdl":false, > "mysqlType":{ > "op_id":"int(11)", > "order_id":"int(11)" > }, > "old":null, > "pkNames":[ > "op_id" > ], > "sql":"", > "sqlType":{ > "op_id":4, > "order_id":4 > }, > "table":"order_product", > "ts":1595720375837, > "type":"INSERT" > } > > > > > > > > > > > > > > > > > > 在 2020-07-28 14:44:35,"Jark Wu" 写道: > >有kafka 中json 数据的样例不? > >有没有看过 TaskManager 中有没有异常 log 信息? > > > > > > > >On Tue, 28 Jul 2020 at 09:40, air23 wrote: > > > >> 你好 测试代码如下 > >> > >> > >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + > >> " `data` VARCHAR , " + > >> " `table` VARCHAR " + > >> ") WITH (" + > >> " 'connector' = 'kafka'," + > >> " 'topic' = 'source_databases'," + > >> " 'properties.bootstrap.servers' = '***'," + > >> " 'properties.group.id' = 'real1'," + > >> " 'format' = 'json'," + > >> " 'scan.startup.mode' = 'earliest-offset'" + > >> ")"; > >> public static void main(String[] args) throws Exception { > >> > >> > >> //bink table > >> StreamExecutionEnvironment bsEnv = > >> StreamExecutionEnvironment.getExecutionEnvironment(); > >> EnvironmentSettings bsSettings = > >> > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > >> StreamTableEnvironment bsTableEnv = > >> StreamTableEnvironment.create(bsEnv, bsSettings); > >> > >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); > >> > >> > >> tableResult.print(); > >> > >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); > >> > >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); > >> > >> bsEnv.execute("aa"); > >> > >> } > >> > >> > >> > >> > >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog > >> ,order_operation_time > >> ,inventory_batch_log > >> ,order_log > >> ,order_address_book > >> ,product_inventory > >> ,order_physical_relation > >> ,bil_business_attach > >> ,picking_detail > >> ,picking_detail > >> ,orders > >> > >> > >> > >> > >> 另外再问个问题。1.11版本 blink 不能datastream转table吗? > >> 看到例子都是useOldPlanner 来转table的。 > >> 致谢 > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> > >> 在 2020-07-27 19:44:10,"Jark Wu" 写道: > >> >抱歉,还是没有看到附件。 > >> >如果是文本的话,你可以直接贴到邮件里。 > >> > > >> >On Mon, 27 Jul 2020 at 19:22, air23 wrote: > >> > > >> >> 我再上传一次 > >> >> > >> >> 在2020年07月27日 18:55,Jark Wu 写道: > >> >> > >> >> Hi, > >> >> 你的附件好像没有上传。 > >> >> > >> >> On Mon, 27 Jul 2020 at 18:17, air23 wrote: > >> >> > >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table > 不能取到data呢?* > >> >> > > >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable > (\n" > >> + > >> >> > " `data` VARCHAR , " + > >> >> > " `table` VARCHAR " + > >> >> > ") WITH (" + > >> >> > " 'connector' = 'kafka'," + > >> >> > " 'topic' = 'order_source'," + > >> >> > " 'properties.bootstrap.servers' = '***'," + > >> >> > " 'properties.group.id' = 'real1'," + > >> >> > " 'format' = 'json'," + > >> >> > " 'scan.startup.mode' = 'earliest-offset'" + > >> >> > ")"; > >> >> > > >> >> > > >> >> > 具体见附件 有打印 > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > > >> >> > >> >> > >> >
回复: Re: Re: 解析kafka的mysql binlog问题
你的flink什么版本 发送自 Windows 10 版邮件应用 发件人: air23 发送时间: 2020年7月28日 15:36 收件人: user-zh@flink.apache.org 主题: Re:Re: Re: 解析kafka的mysql binlog问题 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 { "data":[ { "op_id":"97037138", "order_id":"84172164" } ], "database":"order_11", "es":1595720375000, "id":17469027, "isDdl":false, "mysqlType":{ "op_id":"int(11)", "order_id":"int(11)" }, "old":null, "pkNames":[ "op_id" ], "sql":"", "sqlType":{ "op_id":4, "order_id":4 }, "table":"order_product", "ts":1595720375837, "type":"INSERT" } 在 2020-07-28 14:44:35,"Jark Wu" 写道: >有kafka 中json 数据的样例不? >有没有看过 TaskManager 中有没有异常 log 信息? > > > >On Tue, 28 Jul 2020 at 09:40, air23 wrote: > >> 你好 测试代码如下 >> >> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >> " `data` VARCHAR , " + >> " `table` VARCHAR " + >> ") WITH (" + >> " 'connector' = 'kafka'," + >> " 'topic' = 'source_databases'," + >> " 'properties.bootstrap.servers' = '***'," + >> " 'properties.group.id' = 'real1'," + >> " 'format' = 'json'," + >> " 'scan.startup.mode' = 'earliest-offset'" + >> ")"; >> public static void main(String[] args) throws Exception { >> >> >> //bink table >> StreamExecutionEnvironment bsEnv = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> EnvironmentSettings bsSettings = >> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >> StreamTableEnvironment bsTableEnv = >> StreamTableEnvironment.create(bsEnv, bsSettings); >> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >> >> >> tableResult.print(); >> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >> >> bsEnv.execute("aa"); >> >> } >> >> >> >> >> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >> ,order_operation_time >> ,inventory_batch_log >> ,order_log >> ,order_address_book >> ,product_inventory >> ,order_physical_relation >> ,bil_business_attach >> ,picking_detail >> ,picking_detail >> ,orders >> >> >> >> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >> 看到例子都是useOldPlanner 来转table的。 >> 致谢 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-27 19:44:10,"Jark Wu" 写道: >> >抱歉,还是没有看到附件。 >> >如果是文本的话,你可以直接贴到邮件里。 >> > >> >On Mon, 27 Jul 2020 at 19:22, air23 wrote: >> > >> >> 我再上传一次 >> >> >> >> 在2020年07月27日 18:55,Jark Wu 写道: >> >> >> >> Hi, >> >> 你的附件好像没有上传。 >> >> >> >> On Mon, 27 Jul 2020 at 18:17, air23 wrote: >> >> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >> >> > >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" >> + >> >> > " `data` VARCHAR , " + >> >> > " `table` VARCHAR " + >> >> > ") WITH (" + >> >> > " 'connector' = 'kafka'," + >> >> > " 'topic' = 'order_source'," + >> >> > " 'properties.bootstrap.servers' = '***'," + >> >> > " 'properties.group.id' = 'real1'," + >> >> > " 'format' = 'json'," + >> >> > " 'scan.startup.mode' = 'earliest-offset'" + >> >> > ")"; >> >> > >> >> > >> >> > 具体见附件 有打印 >> >> > >> >> > >> >> > >> >> > >> >> > >> >> >> >> >>
Re: Re: 解析kafka的mysql binlog问题
有kafka 中json 数据的样例不? 有没有看过 TaskManager 中有没有异常 log 信息? On Tue, 28 Jul 2020 at 09:40, air23 wrote: > 你好 测试代码如下 > > > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + > " `data` VARCHAR , " + > " `table` VARCHAR " + > ") WITH (" + > " 'connector' = 'kafka'," + > " 'topic' = 'source_databases'," + > " 'properties.bootstrap.servers' = '***'," + > " 'properties.group.id' = 'real1'," + > " 'format' = 'json'," + > " 'scan.startup.mode' = 'earliest-offset'" + > ")"; > public static void main(String[] args) throws Exception { > > > //bink table > StreamExecutionEnvironment bsEnv = > StreamExecutionEnvironment.getExecutionEnvironment(); > EnvironmentSettings bsSettings = > EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); > StreamTableEnvironment bsTableEnv = > StreamTableEnvironment.create(bsEnv, bsSettings); > > TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); > > > tableResult.print(); > > Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); > > bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); > > bsEnv.execute("aa"); > > } > > > > > 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog > ,order_operation_time > ,inventory_batch_log > ,order_log > ,order_address_book > ,product_inventory > ,order_physical_relation > ,bil_business_attach > ,picking_detail > ,picking_detail > ,orders > > > > > 另外再问个问题。1.11版本 blink 不能datastream转table吗? > 看到例子都是useOldPlanner 来转table的。 > 致谢 > > > > > > > > > > > > > > > 在 2020-07-27 19:44:10,"Jark Wu" 写道: > >抱歉,还是没有看到附件。 > >如果是文本的话,你可以直接贴到邮件里。 > > > >On Mon, 27 Jul 2020 at 19:22, air23 wrote: > > > >> 我再上传一次 > >> > >> 在2020年07月27日 18:55,Jark Wu 写道: > >> > >> Hi, > >> 你的附件好像没有上传。 > >> > >> On Mon, 27 Jul 2020 at 18:17, air23 wrote: > >> > >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* > >> > > >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" > + > >> > " `data` VARCHAR , " + > >> > " `table` VARCHAR " + > >> > ") WITH (" + > >> > " 'connector' = 'kafka'," + > >> > " 'topic' = 'order_source'," + > >> > " 'properties.bootstrap.servers' = '***'," + > >> > " 'properties.group.id' = 'real1'," + > >> > " 'format' = 'json'," + > >> > " 'scan.startup.mode' = 'earliest-offset'" + > >> > ")"; > >> > > >> > > >> > 具体见附件 有打印 > >> > > >> > > >> > > >> > > >> > > >> > >> >