你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致
另外想请教下 1.11 版本  datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的,
但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。

















在 2020-07-28 16:02:18,"Jark Wu" <imj...@gmail.com> 写道:
>因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。
>1.12 中已经支持读取复杂结构为 string 类型了。
>
>Best,
>Jark
>
>On Tue, 28 Jul 2020 at 15:36, air23 <wangfei23_...@163.com> wrote:
>
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>>
>>
>> {
>>     "data":[
>>         {
>>             "op_id":"97037138",
>>             "order_id":"84172164"
>>         }
>>     ],
>>     "database":"order_11",
>>     "es":1595720375000,
>>     "id":17469027,
>>     "isDdl":false,
>>     "mysqlType":{
>>         "op_id":"int(11)",
>>         "order_id":"int(11)"
>>     },
>>     "old":null,
>>     "pkNames":[
>>         "op_id"
>>     ],
>>     "sql":"",
>>     "sqlType":{
>>         "op_id":4,
>>         "order_id":4
>>     },
>>     "table":"order_product",
>>     "ts":1595720375837,
>>     "type":"INSERT"
>> }
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道:
>> >有kafka 中json 数据的样例不?
>> >有没有看过 TaskManager 中有没有异常 log 信息?
>> >
>> >
>> >
>> >On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote:
>> >
>> >> 你好 测试代码如下
>> >>
>> >>
>> >> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> >> " `data` VARCHAR , " +
>> >> " `table` VARCHAR " +
>> >> ") WITH (" +
>> >> " 'connector' = 'kafka'," +
>> >> " 'topic' = 'source_databases'," +
>> >> " 'properties.bootstrap.servers' = '***'," +
>> >> " 'properties.group.id' = 'real1'," +
>> >> " 'format' = 'json'," +
>> >> " 'scan.startup.mode' = 'earliest-offset'" +
>> >> ")";
>> >> public static void main(String[] args) throws Exception {
>> >>
>> >>
>> >> //bink table
>> >> StreamExecutionEnvironment bsEnv =
>> >> StreamExecutionEnvironment.getExecutionEnvironment();
>> >>     EnvironmentSettings bsSettings =
>> >>
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>> >>     StreamTableEnvironment bsTableEnv =
>> >> StreamTableEnvironment.create(bsEnv, bsSettings);
>> >>
>> >>     TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>> >>
>> >>
>> >> tableResult.print();
>> >>
>> >>     Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>> >>
>> >> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>> >>
>> >> bsEnv.execute("aa");
>> >>
>> >> }
>> >>
>> >>
>> >>
>> >>
>> >> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> >> ,order_operation_time
>> >> ,inventory_batch_log
>> >> ,order_log
>> >> ,order_address_book
>> >> ,product_inventory
>> >> ,order_physical_relation
>> >> ,bil_business_attach
>> >> ,picking_detail
>> >> ,picking_detail
>> >> ,orders
>> >>
>> >>
>> >>
>> >>
>> >> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> >> 看到例子都是useOldPlanner 来转table的。
>> >> 致谢
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道:
>> >> >抱歉,还是没有看到附件。
>> >> >如果是文本的话,你可以直接贴到邮件里。
>> >> >
>> >> >On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote:
>> >> >
>> >> >> 我再上传一次
>> >> >>
>> >> >> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道:
>> >> >>
>> >> >> Hi,
>> >> >> 你的附件好像没有上传。
>> >> >>
>> >> >> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote:
>> >> >>
>> >> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table
>> 不能取到data呢?*
>> >> >> >
>> >> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable
>> (\n"
>> >> +
>> >> >> >         " `data` VARCHAR , " +
>> >> >> >         " `table` VARCHAR " +
>> >> >> >         ") WITH (" +
>> >> >> >         " 'connector' = 'kafka'," +
>> >> >> >         " 'topic' = 'order_source'," +
>> >> >> >         " 'properties.bootstrap.servers' = '***'," +
>> >> >> >         " 'properties.group.id' = 'real1'," +
>> >> >> >         " 'format' = 'json'," +
>> >> >> >         " 'scan.startup.mode' = 'earliest-offset'" +
>> >> >> >         ")";
>> >> >> >
>> >> >> >
>> >> >> > 具体见附件 有打印
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >> >
>> >> >>
>> >> >>
>> >>
>>

回复