直接转成string1.11版本还不支持,会在1.12修复,参考jira[1]
[1]https://issues.apache.org/jira/browse/FLINK-18002 <https://issues.apache.org/jira/browse/FLINK-18002> > 2020年7月28日 下午5:20,air23 <wangfei23_...@163.com> 写道: > > 你好 收到。谢谢。 因为这个topic 是有很多表的binlog的。所以解析成array 也是不行的。长度和字段都不一致 > 另外想请教下 1.11 版本 datastream 可以转换为 blink的table吗。 看到例子 好像都是 useOldPlanner 来转的, > 但是我想用blink 写入到hive。 我这边需求 就是通过binlog - flink - hive。 不同的mysql表写入到不同hive表。 > > > > > > > > > > > > > > > > > > 在 2020-07-28 16:02:18,"Jark Wu" <imj...@gmail.com> 写道: >> 因为 "data" 是一个复杂结构,不是单纯的 string 结构。所以1.11为止,这个功能还不支持。 >> 1.12 中已经支持读取复杂结构为 string 类型了。 >> >> Best, >> Jark >> >> On Tue, 28 Jul 2020 at 15:36, air23 <wangfei23_...@163.com> wrote: >> >>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 >>> >>> >>> { >>> "data":[ >>> { >>> "op_id":"97037138", >>> "order_id":"84172164" >>> } >>> ], >>> "database":"order_11", >>> "es":1595720375000, >>> "id":17469027, >>> "isDdl":false, >>> "mysqlType":{ >>> "op_id":"int(11)", >>> "order_id":"int(11)" >>> }, >>> "old":null, >>> "pkNames":[ >>> "op_id" >>> ], >>> "sql":"", >>> "sqlType":{ >>> "op_id":4, >>> "order_id":4 >>> }, >>> "table":"order_product", >>> "ts":1595720375837, >>> "type":"INSERT" >>> } >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> >>> 在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道: >>>> 有kafka 中json 数据的样例不? >>>> 有没有看过 TaskManager 中有没有异常 log 信息? >>>> >>>> >>>> >>>> On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote: >>>> >>>>> 你好 测试代码如下 >>>>> >>>>> >>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >>>>> " `data` VARCHAR , " + >>>>> " `table` VARCHAR " + >>>>> ") WITH (" + >>>>> " 'connector' = 'kafka'," + >>>>> " 'topic' = 'source_databases'," + >>>>> " 'properties.bootstrap.servers' = '***'," + >>>>> " 'properties.group.id' = 'real1'," + >>>>> " 'format' = 'json'," + >>>>> " 'scan.startup.mode' = 'earliest-offset'" + >>>>> ")"; >>>>> public static void main(String[] args) throws Exception { >>>>> >>>>> >>>>> //bink table >>>>> StreamExecutionEnvironment bsEnv = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> EnvironmentSettings bsSettings = >>>>> >>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>>> StreamTableEnvironment bsTableEnv = >>>>> StreamTableEnvironment.create(bsEnv, bsSettings); >>>>> >>>>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >>>>> >>>>> >>>>> tableResult.print(); >>>>> >>>>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >>>>> >>>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >>>>> >>>>> bsEnv.execute("aa"); >>>>> >>>>> } >>>>> >>>>> >>>>> >>>>> >>>>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >>>>> ,order_operation_time >>>>> ,inventory_batch_log >>>>> ,order_log >>>>> ,order_address_book >>>>> ,product_inventory >>>>> ,order_physical_relation >>>>> ,bil_business_attach >>>>> ,picking_detail >>>>> ,picking_detail >>>>> ,orders >>>>> >>>>> >>>>> >>>>> >>>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >>>>> 看到例子都是useOldPlanner 来转table的。 >>>>> 致谢 >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道: >>>>>> 抱歉,还是没有看到附件。 >>>>>> 如果是文本的话,你可以直接贴到邮件里。 >>>>>> >>>>>> On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote: >>>>>> >>>>>>> 我再上传一次 >>>>>>> >>>>>>> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道: >>>>>>> >>>>>>> Hi, >>>>>>> 你的附件好像没有上传。 >>>>>>> >>>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote: >>>>>>> >>>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table >>> 不能取到data呢?* >>>>>>>> >>>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable >>> (\n" >>>>> + >>>>>>>> " `data` VARCHAR , " + >>>>>>>> " `table` VARCHAR " + >>>>>>>> ") WITH (" + >>>>>>>> " 'connector' = 'kafka'," + >>>>>>>> " 'topic' = 'order_source'," + >>>>>>>> " 'properties.bootstrap.servers' = '***'," + >>>>>>>> " 'properties.group.id' = 'real1'," + >>>>>>>> " 'format' = 'json'," + >>>>>>>> " 'scan.startup.mode' = 'earliest-offset'" + >>>>>>>> ")"; >>>>>>>> >>>>>>>> >>>>>>>> 具体见附件 有打印 >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>> >>>>>>> >>>>> >>>