格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来


{
    "data":[
        {
            "op_id":"97037138",
            "order_id":"84172164"
        }
    ],
    "database":"order_11",
    "es":1595720375000,
    "id":17469027,
    "isDdl":false,
    "mysqlType":{
        "op_id":"int(11)",
        "order_id":"int(11)"
    },
    "old":null,
    "pkNames":[
        "op_id"
    ],
    "sql":"",
    "sqlType":{
        "op_id":4,
        "order_id":4
    },
    "table":"order_product",
    "ts":1595720375837,
    "type":"INSERT"
}

















在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道:
>有kafka 中json 数据的样例不?
>有没有看过 TaskManager 中有没有异常 log 信息?
>
>
>
>On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote:
>
>> 你好 测试代码如下
>>
>>
>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>> " `data` VARCHAR , " +
>> " `table` VARCHAR " +
>> ") WITH (" +
>> " 'connector' = 'kafka'," +
>> " 'topic' = 'source_databases'," +
>> " 'properties.bootstrap.servers' = '***'," +
>> " 'properties.group.id' = 'real1'," +
>> " 'format' = 'json'," +
>> " 'scan.startup.mode' = 'earliest-offset'" +
>> ")";
>> public static void main(String[] args) throws Exception {
>>
>>
>> //bink table
>> StreamExecutionEnvironment bsEnv =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>     EnvironmentSettings bsSettings =
>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>     StreamTableEnvironment bsTableEnv =
>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>
>>     TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>
>>
>> tableResult.print();
>>
>>     Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>
>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>
>> bsEnv.execute("aa");
>>
>> }
>>
>>
>>
>>
>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>> ,order_operation_time
>> ,inventory_batch_log
>> ,order_log
>> ,order_address_book
>> ,product_inventory
>> ,order_physical_relation
>> ,bil_business_attach
>> ,picking_detail
>> ,picking_detail
>> ,orders
>>
>>
>>
>>
>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>> 看到例子都是useOldPlanner 来转table的。
>> 致谢
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道:
>> >抱歉,还是没有看到附件。
>> >如果是文本的话,你可以直接贴到邮件里。
>> >
>> >On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote:
>> >
>> >> 我再上传一次
>> >>
>> >> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道:
>> >>
>> >> Hi,
>> >> 你的附件好像没有上传。
>> >>
>> >> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote:
>> >>
>> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>> >> >
>> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>> +
>> >> >         " `data` VARCHAR , " +
>> >> >         " `table` VARCHAR " +
>> >> >         ") WITH (" +
>> >> >         " 'connector' = 'kafka'," +
>> >> >         " 'topic' = 'order_source'," +
>> >> >         " 'properties.bootstrap.servers' = '***'," +
>> >> >         " 'properties.group.id' = 'real1'," +
>> >> >         " 'format' = 'json'," +
>> >> >         " 'scan.startup.mode' = 'earliest-offset'" +
>> >> >         ")";
>> >> >
>> >> >
>> >> > 具体见附件 有打印
>> >> >
>> >> >
>> >> >
>> >> >
>> >> >
>> >>
>> >>
>>

回复