有kafka 中json 数据的样例不?
有没有看过 TaskManager 中有没有异常 log 信息?



On Tue, 28 Jul 2020 at 09:40, air23 <wangfei23_...@163.com> wrote:

> 你好 测试代码如下
>
>
> private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n" +
> " `data` VARCHAR , " +
> " `table` VARCHAR " +
> ") WITH (" +
> " 'connector' = 'kafka'," +
> " 'topic' = 'source_databases'," +
> " 'properties.bootstrap.servers' = '***'," +
> " 'properties.group.id' = 'real1'," +
> " 'format' = 'json'," +
> " 'scan.startup.mode' = 'earliest-offset'" +
> ")";
> public static void main(String[] args) throws Exception {
>
>
> //bink table
> StreamExecutionEnvironment bsEnv =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     EnvironmentSettings bsSettings =
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
>     StreamTableEnvironment bsTableEnv =
> StreamTableEnvironment.create(bsEnv, bsSettings);
>
>     TableResult tableResult = bsTableEnv.executeSql(KAFKA_SQL);
>
>
> tableResult.print();
>
>     Table table = bsTableEnv.sqlQuery("select * from kafkaTable");
>
> bsTableEnv.toAppendStream(table, Row.class).print().setParallelism(1);
>
> bsEnv.execute("aa");
>
> }
>
>
>
>
> 输出结果如下  data都是空的。数据格式为canal解析的mysql binlog
> ,order_operation_time
> ,inventory_batch_log
> ,order_log
> ,order_address_book
> ,product_inventory
> ,order_physical_relation
> ,bil_business_attach
> ,picking_detail
> ,picking_detail
> ,orders
>
>
>
>
> 另外再问个问题。1.11版本 blink 不能datastream转table吗?
> 看到例子都是useOldPlanner 来转table的。
> 致谢
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2020-07-27 19:44:10,"Jark Wu" <imj...@gmail.com> 写道:
> >抱歉,还是没有看到附件。
> >如果是文本的话,你可以直接贴到邮件里。
> >
> >On Mon, 27 Jul 2020 at 19:22, air23 <wangfei23_...@163.com> wrote:
> >
> >> 我再上传一次
> >>
> >> 在2020年07月27日 18:55,Jark Wu <imj...@gmail.com> 写道:
> >>
> >> Hi,
> >> 你的附件好像没有上传。
> >>
> >> On Mon, 27 Jul 2020 at 18:17, air23 <wangfei23_...@163.com> wrote:
> >>
> >> > *你好。这个是我的解析sql。我想读取binlog的 data数据和table 数据。 为什么可以取到table 不能取到data呢?*
> >> >
> >> > private static final String KAFKA_SQL = "CREATE TABLE kafkaTable (\n"
> +
> >> >         " `data` VARCHAR , " +
> >> >         " `table` VARCHAR " +
> >> >         ") WITH (" +
> >> >         " 'connector' = 'kafka'," +
> >> >         " 'topic' = 'order_source'," +
> >> >         " 'properties.bootstrap.servers' = '***'," +
> >> >         " 'properties.group.id' = 'real1'," +
> >> >         " 'format' = 'json'," +
> >> >         " 'scan.startup.mode' = 'earliest-offset'" +
> >> >         ")";
> >> >
> >> >
> >> > 具体见附件 有打印
> >> >
> >> >
> >> >
> >> >
> >> >
> >>
> >>
>

Reply via email to