你好。
我猜测 是有可能是这个问题。但是我这个topic是 读取的一个库的binlog。有很多表   所以ARRAY<ROW< op_id STRING, 
order_id STRING>>
这种 里面 不是固定的
 所以我想用datastream 解析 然后在根据表不同 解析成不同的table。但是发现blinkplaner 好像不可以datastream 
转换为table。或者是我没有发现这个例子
谢谢

















在 2020-07-28 16:05:55,"admin" <17626017...@163.com> 写道:
>data格式不是string,可以定义为ARRAY<ROW< op_id STRING, order_id STRING>>
>
>> 2020年7月28日 下午3:35,air23 <wangfei23_...@163.com> 写道:
>> 
>> 格式如下 是canal解析的binlog 格式 data下面是一个数组样式。就是用string取不出来 其他都可以取的出来
>> 
>> 
>> {
>>    "data":[
>>        {
>>            "op_id":"97037138",
>>            "order_id":"84172164"
>>        }
>>    ],
>>    "database":"order_11",
>>    "es":1595720375000,
>>    "id":17469027,
>>    "isDdl":false,
>>    "mysqlType":{
>>        "op_id":"int(11)",
>>        "order_id":"int(11)"
>>    },
>>    "old":null,
>>    "pkNames":[
>>        "op_id"
>>    ],
>>    "sql":"",
>>    "sqlType":{
>>        "op_id":4,
>>        "order_id":4
>>    },
>>    "table":"order_product",
>>    "ts":1595720375837,
>>    "type":"INSERT"
>> }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 在 2020-07-28 14:44:35,"Jark Wu" <imj...@gmail.com> 写道:
>>> 有kafka 中json 数据的样例不?
>>> 有没有看过 TaskManager 中有没有异常 log 信息?
>>> 
>>> 
>>> 
>>> On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote:
>>> 
>>>> 你好 测试代码如下
>>>> 
>>>> 
>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
>>>> " `data` VARCHAR , " +
>>>> " `table` VARCHAR " +
>>>> ") WITH (" +
>>>> " 'connector' = 'kafka'," +
>>>> " 'topic' = 'source_databases'," +
>>>> " 'properties.bootstrap.servers' = '***'," +
>>>> " 'properties.group.id' = 'real1'," +
>>>> " 'format' = 'json'," +
>>>> " 'scan.startup.mode' = 'earliest-offset'" +
>>>> ")";
>>>> public static void main(String[] args) throws Exception {
>>>> 
>>>> 
>>>> //bink table
>>>> StreamExecutionEnvironment bsEnv =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>    EnvironmentSettings bsSettings =
>>>> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>>>>    StreamTableEnvironment bsTableEnv =
>>>> StreamTableEnvironment.create(bsEnv, bsSettings);
>>>> 
>>>>    TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>>>> 
>>>> 
>>>> tableResult.print();
>>>> 
>>>>    Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>>>> 
>>>> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>>>> 
>>>> bsEnv.execute("aa");
>>>> 
>>>> }
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
>>>> ,order_operation_time
>>>> ,inventory_batch_log
>>>> ,order_log
>>>> ,order_address_book
>>>> ,product_inventory
>>>> ,order_physical_relation
>>>> ,bil_business_attach
>>>> ,picking_detail
>>>> ,picking_detail
>>>> ,orders
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
>>>> 看到例子都是useOldPlanner 来转table的。
>>>> 致谢
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 
>>>> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道:
>>>>> 抱歉,还是没有看到附件。
>>>>> 如果是文本的话,你可以直接贴到邮件里。
>>>>> 
>>>>> On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote:
>>>>> 
>>>>>> 我再上传一次
>>>>>> 
>>>>>> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道:
>>>>>> 
>>>>>> Hi,
>>>>>> 你的附件好像没有上传。
>>>>>> 
>>>>>> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote:
>>>>>> 
>>>>>>> *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
>>>>>>> 
>>>>>>> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
>>>> +
>>>>>>>        " `data` VARCHAR , " +
>>>>>>>        " `table` VARCHAR " +
>>>>>>>        ") WITH (" +
>>>>>>>        " 'connector' = 'kafka'," +
>>>>>>>        " 'topic' = 'order_source'," +
>>>>>>>        " 'properties.bootstrap.servers' = '***'," +
>>>>>>>        " 'properties.group.id' = 'real1'," +
>>>>>>>        " 'format' = 'json'," +
>>>>>>>        " 'scan.startup.mode' = 'earliest-offset'" +
>>>>>>>        ")";
>>>>>>> 
>>>>>>> 
>>>>>>> 具体见附件 有打印
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>> 

回复