你好。 我猜测 是有可能是这个问题。但是我这个topic是 读取的一个库的binlog。有很多表 所以ARRAY<ROW< op_id STRING, order_id STRING>> 这种 里面 不是固定的 所以我想用datastream 解析 然后在根据表不同 解析成不同的table。但是发现blinkplaner 好像不可以datastream 转换为table。或者是我没有发现这个例子 谢谢
在 2020-07-28 16:05:55,"admin" <17626017...@163.com> 写道: >data格式不是string,可以定义为ARRAY<ROW< op_id STRING, order_id STRING>> > >> 2020年7月28日 下午3:35,air23 <wangfei23_...@163.com> 写道: >> >> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来 >> >> >> { >> "data":[ >> { >> "op_id":"97037138", >> "order_id":"84172164" >> } >> ], >> "database":"order_11", >> "es":1595720375000, >> "id":17469027, >> "isDdl":false, >> "mysqlType":{ >> "op_id":"int(11)", >> "order_id":"int(11)" >> }, >> "old":null, >> "pkNames":[ >> "op_id" >> ], >> "sql":"", >> "sqlType":{ >> "op_id":4, >> "order_id":4 >> }, >> "table":"order_product", >> "ts":1595720375837, >> "type":"INSERT" >> } >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道: >>> 有kafka 中json 数据的样例不? >>> 有没有看过 TaskManager 中有没有异常 log 信息? >>> >>> >>> >>> On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote: >>> >>>> 你好 测试代码如下 >>>> >>>> >>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" + >>>> " `data` VARCHAR , " + >>>> " `table` VARCHAR " + >>>> ") WITH (" + >>>> " 'connector' = 'kafka'," + >>>> " 'topic' = 'source_databases'," + >>>> " 'properties.bootstrap.servers' = '***'," + >>>> " 'properties.group.id' = 'real1'," + >>>> " 'format' = 'json'," + >>>> " 'scan.startup.mode' = 'earliest-offset'" + >>>> ")"; >>>> public static void main(String[] args) throws Exception { >>>> >>>> >>>> //bink table >>>> StreamExecutionEnvironment bsEnv = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> EnvironmentSettings bsSettings = >>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build(); >>>> StreamTableEnvironment bsTableEnv = >>>> StreamTableEnvironment.create(bsEnv, bsSettings); >>>> >>>> TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL); >>>> >>>> >>>> tableResult.print(); >>>> >>>> Table table = bsTableEnv.sqlQuery("select * from kafkaTable"); >>>> >>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1); >>>> >>>> bsEnv.execute("aa"); >>>> >>>> } >>>> >>>> >>>> >>>> >>>> 输出结果如下 data都是空的。数据格式为canal解析的mysql binlog >>>> ,order_operation_time >>>> ,inventory_batch_log >>>> ,order_log >>>> ,order_address_book >>>> ,product_inventory >>>> ,order_physical_relation >>>> ,bil_business_attach >>>> ,picking_detail >>>> ,picking_detail >>>> ,orders >>>> >>>> >>>> >>>> >>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗? >>>> 看到例子都是useOldPlanner 来转table的。 >>>> 致谢 >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> >>>> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道: >>>>> 抱歉,还是没有看到附件。 >>>>> 如果是文本的话,你可以直接贴到邮件里。 >>>>> >>>>> On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote: >>>>> >>>>>> 我再上传一次 >>>>>> >>>>>> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道: >>>>>> >>>>>> Hi, >>>>>> 你的附件好像没有上传。 >>>>>> >>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote: >>>>>> >>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?* >>>>>>> >>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" >>>> + >>>>>>> " `data` VARCHAR , " + >>>>>>> " `table` VARCHAR " + >>>>>>> ") WITH (" + >>>>>>> " 'connector' = 'kafka'," + >>>>>>> " 'topic' = 'order_source'," + >>>>>>> " 'properties.bootstrap.servers' = '***'," + >>>>>>> " 'properties.group.id' = 'real1'," + >>>>>>> " 'format' = 'json'," + >>>>>>> " 'scan.startup.mode' = 'earliest-offset'" + >>>>>>> ")"; >>>>>>> >>>>>>> >>>>>>> 具体见附件 有打印 >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>> >>>>>> >>>>