你好 使用的是<flink.version>1.11.1</flink.version>版本的 



















在 2020-07-28 16:02:30,"明启 孙" <374060...@qq.com> 写道:
>你的flink什么版本
>
>发送自 Windows 10 版邮件应用
>
>发件人: air23
>发送时间: 2020年7月28日 15:36
>收件人: user-zh@flink.apache.org
>主题: Re:Re: Re: 解析kafka的mysql binlog问题
>
>格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>
>
>{
>    "data":[
>        {
>            "op_id":"97037138",
>            "order_id":"84172164"
>        }
>    ],
>    "database":"order_11",
>    "es":1595720375000,
>    "id":17469027,
>    "isDdl":false,
>    "mysqlType":{
>        "op_id":"int(11)",
>        "order_id":"int(11)"
>    },
>    "old":null,
>    "pkNames":[
>        "op_id"
>    ],
>    "sql":"",
>    "sqlType":{
>        "op_id":4,
>        "order_id":4
>    },
>    "table":"order_product",
>    "ts":1595720375837,
>    "type":"INSERT"
>}
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道:
>>有kafka 中json 数据的样例不?
>>有没有看过 TaskManager 中有没有异常 log 信息?
>>
>>
>>
>>On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote:
>>
>>> 你好 测试代码如下
>>>
>>>
>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>> " `data` VARCHAR , " +
>>> " `table` VARCHAR " +
>>> ") WITH (" +
>>> " 'connector' = 'kafka'," +
>>> " 'topic' = 'source_databases'," +
>>> " 'properties.bootstrap.servers' = '***'," +
>>> " 'properties.group.id' = 'real1'," +
>>> " 'format' = 'json'," +
>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>> ")";
>>> public static void main(String[] args) throws Exception {
>>>
>>>
>>> //bink table
>>> StreamExecutionEnvironment bsEnv =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>     EnvironmentSettings bsSettings =
>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>     StreamTableEnvironment bsTableEnv =
>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>
>>>     TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>
>>>
>>> tableResult.print();
>>>
>>>     Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>
>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>
>>> bsEnv.execute("aa");
>>>
>>> }
>>>
>>>
>>>
>>>
>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>> ,order_operation_time
>>> ,inventory_batch_log
>>> ,order_log
>>> ,order_address_book
>>> ,product_inventory
>>> ,order_physical_relation
>>> ,bil_business_attach
>>> ,picking_detail
>>> ,picking_detail
>>> ,orders
>>>
>>>
>>>
>>>
>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>> 看到例子都是useOldPlanner 来转table的。
>>> 致谢
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道:
>>> >抱歉,还是没有看到附件。
>>> >如果是文本的话,你可以直接贴到邮件里。
>>> >
>>> >On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote:
>>> >
>>> >> 我再上传一次
>>> >>
>>> >> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道:
>>> >>
>>> >> Hi,
>>> >> 你的附件好像没有上传。
>>> >>
>>> >> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote:
>>> >>
>>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>> >> >
>>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>> +
>>> >> >         " `data` VARCHAR , " +
>>> >> >         " `table` VARCHAR " +
>>> >> >         ") WITH (" +
>>> >> >         " 'connector' = 'kafka'," +
>>> >> >         " 'topic' = 'order_source'," +
>>> >> >         " 'properties.bootstrap.servers' = '***'," +
>>> >> >         " 'properties.group.id' = 'real1'," +
>>> >> >         " 'format' = 'json'," +
>>> >> >         " 'scan.startup.mode' = 'earliest-offset'" +
>>> >> >         ")";
>>> >> >
>>> >> >
>>> >> > 具体见附件 有打印
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >> >
>>> >>
>>> >>
>>>
>

回复