Re:Re: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
另外想请教下 1.11 版本  datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。

















在 2020-07-28 16:02:18,"Jark Wu"  写道:
>因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>1.12 中已经支持读取复杂结构为 string 类型了。
>
>Best,
>Jark
>
>On Tue, 28 Jul 2020 at 15:36, air23  wrote:
>
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>
>>
>> {
>> "data":[
>> {
>> "op_id":"97037138",
>> "order_id":"84172164"
>> }
>> ],
>> "database":"order_11",
>> "es":1595720375000,
>> "id":17469027,
>> "isDdl":false,
>> "mysqlType":{
>> "op_id":"int(11)",
>> "order_id":"int(11)"
>> },
>> "old":null,
>> "pkNames":[
>> "op_id"
>> ],
>> "sql":"",
>> "sqlType":{
>> "op_id":4,
>> "order_id":4
>> },
>> "table":"order_product",
>> "ts":1595720375837,
>> "type":"INSERT"
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
>> >有kafka 中json 数据的样例不?
>> >有没有看过 TaskManager 中有没有异常 log 信息?
>> >
>> >
>> >
>> >On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>> >
>> >> 你好 测试代码如下
>> >>
>> >>
>> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> >> " `data` VARCHAR , " +
>> >> " `table` VARCHAR " +
>> >> ") WITH (" +
>> >> " 'connector' = 'kafka'," +
>> >> " 'topic' = 'source_databases'," +
>> >> " 'properties.bootstrap.servers' = '***'," +
>> >> " 'properties.group.id' = 'real1'," +
>> >> " 'format' = 'json'," +
>> >> " 'scan.startup.mode' = 'earliest-offset'" +
>> >> ")";
>> >> public static void main(String[] args) throws Exception {
>> >>
>> >>
>> >> //bink table
>> >> StreamExecutionEnvironment bsEnv =
>> >> StreamExecutionEnvironment.getExecutionEnvironment();
>> >> EnvironmentSettings bsSettings =
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> >> StreamTableEnvironment bsTableEnv =
>> >> StreamTableEnvironment.create(bsEnv, bsSettings);
>> >>
>> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>> >>
>> >>
>> >> tableResult.print();
>> >>
>> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>> >>
>> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>> >>
>> >> bsEnv.execute("aa");
>> >>
>> >> }
>> >>
>> >>
>> >>
>> >>
>> >> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> >> ,order_operation_time
>> >> ,inventory_batch_log
>> >> ,order_log
>> >> ,order_address_book
>> >> ,product_inventory
>> >> ,order_physical_relation
>> >> ,bil_business_attach
>> >> ,picking_detail
>> >> ,picking_detail
>> >> ,orders
>> >>
>> >>
>> >>
>> >>
>> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> >> 看到例子都是useOldPlanner 来转table的。
>> >> 致谢
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>> >> >抱歉,还是没有看到附件。
>> >> >如果是文本的话,你可以直接贴到邮件里。
>> >> >
>> >> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>> >> >
>> >> >> 我再上传一次
>> >> >>
>> >> >> 在2020年07月27日 18:55,Jark Wu  写道:
>> >> >>
>> >> >> Hi,
>> >> >> 你的附件好像没有上传。
>> >> >>
>> >> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>> >> >>
>> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>> 不能取到data呢?*
>> >> >> >
>> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>> (\n"
>> >> +
>> >> >> > " `data` VARCHAR , " +
>> >> >> > " `table` VARCHAR " +
>> >> >> > ") WITH (" +
>> >> >> > " 'connector' = 'kafka'," +
>> >> >> > " 'topic' = 'order_source'," +
>> >> >> > " 'properties.bootstrap.servers' = '***'," +
>> >> >> > " 'properties.group.id' = 'real1'," +
>> >> >> > " 'format' = 'json'," +
>> >> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>> >> >> > ")";
>> >> >> >
>> >> >> >
>> >> >> > 具体见附件 有打印
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >>
>> >> >>
>> >>
>>


Re:回复: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 air23
你好 使用的是1.11.1版本的 



















在 2020-07-28 16:02:30,"明启 孙" <374060...@qq.com> 写道:
>你的flink什么版本
>
>发送自 Windows 10 版邮件应用
>
>发件人: air23
>发送时间: 2020年7月28日 15:36
>收件人: user-zh@flink.apache.org
>主题: Re:Re: Re: 解析kafka的mysql binlog问题
>
>格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
>{
>"data":[
>{
>"op_id":"97037138",
>"order_id":"84172164"
>}
>],
>"database":"order_11",
>"es":1595720375000,
>"id":17469027,
>"isDdl":false,
>"mysqlType":{
>"op_id":"int(11)",
>"order_id":"int(11)"
>},
>"old":null,
>"pkNames":[
>"op_id"
>],
>"sql":"",
>"sqlType":{
>"op_id":4,
>"order_id":4
>},
>"table":"order_product",
>"ts":1595720375837,
>"type":"INSERT"
>}
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-07-28 14:44:35,"Jark Wu"  写道:
>>有kafka 中json 数据的样例不?
>>有没有看过 TaskManager 中有没有异常 log 信息?
>>
>>
>>
>>On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>>
>>> 你好 测试代码如下
>>>
>>>
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>> " `data` VARCHAR , " +
>>> " `table` VARCHAR " +
>>> ") WITH (" +
>>> " 'connector' = 'kafka'," +
>>> " 'topic' = 'source_databases'," +
>>> " 'properties.bootstrap.servers' = '***'," +
>>> " 'properties.group.id' = 'real1'," +
>>> " 'format' = 'json'," +
>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>> ")";
>>> public static void main(String[] args) throws Exception {
>>>
>>>
>>> //bink table
>>> StreamExecutionEnvironment bsEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>> StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>
>>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>
>>>
>>> tableResult.print();
>>>
>>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>
>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>
>>> bsEnv.execute("aa");
>>>
>>> }
>>>
>>>
>>>
>>>
>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>> ,order_operation_time
>>> ,inventory_batch_log
>>> ,order_log
>>> ,order_address_book
>>> ,product_inventory
>>> ,order_physical_relation
>>> ,bil_business_attach
>>> ,picking_detail
>>> ,picking_detail
>>> ,orders
>>>
>>>
>>>
>>>
>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>> 看到例子都是useOldPlanner 来转table的。
>>> 致谢
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>>> >抱歉,还是没有看到附件。
>>> >如果是文本的话,你可以直接贴到邮件里。
>>> >
>>> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>>> >
>>> >> 我再上传一次
>>> >>
>>> >> 在2020年07月27日 18:55,Jark Wu  写道:
>>> >>
>>> >> Hi,
>>> >> 你的附件好像没有上传。
>>> >>
>>> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>>> >>
>>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>> >> >
>>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>> +
>>> >> > " `data` VARCHAR , " +
>>> >> > " `table` VARCHAR " +
>>> >> > ") WITH (" +
>>> >> > " 'connector' = 'kafka'," +
>>> >> > " 'topic' = 'order_source'," +
>>> >> > " 'properties.bootstrap.servers' = '***'," +
>>> >> > " 'properties.group.id' = 'real1'," +
>>> >> > " 'format' = 'json'," +
>>> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>>> >> > ")";
>>> >> >
>>> >> >
>>> >> > 具体见附件 有打印
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >>
>>> >>
>>>
>


Re: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 Jark Wu
因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
1.12 中已经支持读取复杂结构为 string 类型了。

Best,
Jark

On Tue, 28 Jul 2020 at 15:36, air23  wrote:

> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
> {
> "data":[
> {
> "op_id":"97037138",
> "order_id":"84172164"
> }
> ],
> "database":"order_11",
> "es":1595720375000,
> "id":17469027,
> "isDdl":false,
> "mysqlType":{
> "op_id":"int(11)",
> "order_id":"int(11)"
> },
> "old":null,
> "pkNames":[
> "op_id"
> ],
> "sql":"",
> "sqlType":{
> "op_id":4,
> "order_id":4
> },
> "table":"order_product",
> "ts":1595720375837,
> "type":"INSERT"
> }
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-28 14:44:35,"Jark Wu"  写道:
> >有kafka 中json 数据的样例不?
> >有没有看过 TaskManager 中有没有异常 log 信息?
> >
> >
> >
> >On Tue, 28 Jul 2020 at 09:40, air23  wrote:
> >
> >> 你好 测试代码如下
> >>
> >>
> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> >> " `data` VARCHAR , " +
> >> " `table` VARCHAR " +
> >> ") WITH (" +
> >> " 'connector' = 'kafka'," +
> >> " 'topic' = 'source_databases'," +
> >> " 'properties.bootstrap.servers' = '***'," +
> >> " 'properties.group.id' = 'real1'," +
> >> " 'format' = 'json'," +
> >> " 'scan.startup.mode' = 'earliest-offset'" +
> >> ")";
> >> public static void main(String[] args) throws Exception {
> >>
> >>
> >> //bink table
> >> StreamExecutionEnvironment bsEnv =
> >> StreamExecutionEnvironment.getExecutionEnvironment();
> >> EnvironmentSettings bsSettings =
> >>
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> >> StreamTableEnvironment bsTableEnv =
> >> StreamTableEnvironment.create(bsEnv, bsSettings);
> >>
> >> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
> >>
> >>
> >> tableResult.print();
> >>
> >> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
> >>
> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
> >>
> >> bsEnv.execute("aa");
> >>
> >> }
> >>
> >>
> >>
> >>
> >> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> >> ,order_operation_time
> >> ,inventory_batch_log
> >> ,order_log
> >> ,order_address_book
> >> ,product_inventory
> >> ,order_physical_relation
> >> ,bil_business_attach
> >> ,picking_detail
> >> ,picking_detail
> >> ,orders
> >>
> >>
> >>
> >>
> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> >> 看到例子都是useOldPlanner 来转table的。
> >> 致谢
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >>
> >> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
> >> >抱歉,还是没有看到附件。
> >> >如果是文本的话,你可以直接贴到邮件里。
> >> >
> >> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
> >> >
> >> >> 我再上传一次
> >> >>
> >> >> 在2020年07月27日 18:55,Jark Wu  写道:
> >> >>
> >> >> Hi,
> >> >> 你的附件好像没有上传。
> >> >>
> >> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
> >> >>
> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
> 不能取到data呢?*
> >> >> >
> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
> (\n"
> >> +
> >> >> > " `data` VARCHAR , " +
> >> >> > " `table` VARCHAR " +
> >> >> > ") WITH (" +
> >> >> > " 'connector' = 'kafka'," +
> >> >> > " 'topic' = 'order_source'," +
> >> >> > " 'properties.bootstrap.servers' = '***'," +
> >> >> > " 'properties.group.id' = 'real1'," +
> >> >> > " 'format' = 'json'," +
> >> >> > " 'scan.startup.mode' = 'earliest-offset'" +
> >> >> > ")";
> >> >> >
> >> >> >
> >> >> > 具体见附件 有打印
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >> >
> >> >>
> >> >>
> >>
>


回复: Re: Re: 解析kafka的mysql binlog问题

2020-07-28 文章 明启 孙
你的flink什么版本

发送自 Windows 10 版邮件应用

发件人: air23
发送时间: 2020年7月28日 15:36
收件人: user-zh@flink.apache.org
主题: Re:Re: Re: 解析kafka的mysql binlog问题

格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来


{
"data":[
{
"op_id":"97037138",
"order_id":"84172164"
}
],
"database":"order_11",
"es":1595720375000,
"id":17469027,
"isDdl":false,
"mysqlType":{
"op_id":"int(11)",
"order_id":"int(11)"
},
"old":null,
"pkNames":[
"op_id"
],
"sql":"",
"sqlType":{
"op_id":4,
"order_id":4
},
"table":"order_product",
"ts":1595720375837,
"type":"INSERT"
}

















在 2020-07-28 14:44:35,"Jark Wu"  写道:
>有kafka 中json 数据的样例不?
>有没有看过 TaskManager 中有没有异常 log 信息?
>
>
>
>On Tue, 28 Jul 2020 at 09:40, air23  wrote:
>
>> 你好 测试代码如下
>>
>>
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> " `data` VARCHAR , " +
>> " `table` VARCHAR " +
>> ") WITH (" +
>> " 'connector' = 'kafka'," +
>> " 'topic' = 'source_databases'," +
>> " 'properties.bootstrap.servers' = '***'," +
>> " 'properties.group.id' = 'real1'," +
>> " 'format' = 'json'," +
>> " 'scan.startup.mode' = 'earliest-offset'" +
>> ")";
>> public static void main(String[] args) throws Exception {
>>
>>
>> //bink table
>> StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>
>>
>> tableResult.print();
>>
>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>
>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>
>> bsEnv.execute("aa");
>>
>> }
>>
>>
>>
>>
>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> ,order_operation_time
>> ,inventory_batch_log
>> ,order_log
>> ,order_address_book
>> ,product_inventory
>> ,order_physical_relation
>> ,bil_business_attach
>> ,picking_detail
>> ,picking_detail
>> ,orders
>>
>>
>>
>>
>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> 看到例子都是useOldPlanner 来转table的。
>> 致谢
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
>> >抱歉,还是没有看到附件。
>> >如果是文本的话,你可以直接贴到邮件里。
>> >
>> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
>> >
>> >> 我再上传一次
>> >>
>> >> 在2020年07月27日 18:55,Jark Wu  写道:
>> >>
>> >> Hi,
>> >> 你的附件好像没有上传。
>> >>
>> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
>> >>
>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >> >
>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>> +
>> >> > " `data` VARCHAR , " +
>> >> > " `table` VARCHAR " +
>> >> > ") WITH (" +
>> >> > " 'connector' = 'kafka'," +
>> >> > " 'topic' = 'order_source'," +
>> >> > " 'properties.bootstrap.servers' = '***'," +
>> >> > " 'properties.group.id' = 'real1'," +
>> >> > " 'format' = 'json'," +
>> >> > " 'scan.startup.mode' = 'earliest-offset'" +
>> >> > ")";
>> >> >
>> >> >
>> >> > 具体见附件 有打印
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>>



Re: Re: 解析kafka的mysql binlog问题

2020-07-27 文章 Jark Wu
有kafka 中json 数据的样例不?
有没有看过 TaskManager 中有没有异常 log 信息?



On Tue, 28 Jul 2020 at 09:40, air23  wrote:

> 你好 测试代码如下
>
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'source_databases'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
> public static void main(String[] args) throws Exception {
>
>
> //bink table
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
> EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
> StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>
> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>
>
> tableResult.print();
>
> Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>
> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>
> bsEnv.execute("aa");
>
> }
>
>
>
>
> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> ,order_operation_time
> ,inventory_batch_log
> ,order_log
> ,order_address_book
> ,product_inventory
> ,order_physical_relation
> ,bil_business_attach
> ,picking_detail
> ,picking_detail
> ,orders
>
>
>
>
> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> 看到例子都是useOldPlanner 来转table的。
> 致谢
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-27 19:44:10,"Jark Wu"  写道:
> >抱歉,还是没有看到附件。
> >如果是文本的话,你可以直接贴到邮件里。
> >
> >On Mon, 27 Jul 2020 at 19:22, air23  wrote:
> >
> >> 我再上传一次
> >>
> >> 在2020年07月27日 18:55,Jark Wu  写道:
> >>
> >> Hi,
> >> 你的附件好像没有上传。
> >>
> >> On Mon, 27 Jul 2020 at 18:17, air23  wrote:
> >>
> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
> >> >
> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
> +
> >> > " `data` VARCHAR , " +
> >> > " `table` VARCHAR " +
> >> > ") WITH (" +
> >> > " 'connector' = 'kafka'," +
> >> > " 'topic' = 'order_source'," +
> >> > " 'properties.bootstrap.servers' = '***'," +
> >> > " 'properties.group.id' = 'real1'," +
> >> > " 'format' = 'json'," +
> >> > " 'scan.startup.mode' = 'earliest-offset'" +
> >> > ")";
> >> >
> >> >
> >> > 具体见附件 有打印
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >>
>