我接入了一个 RocketMQ 的流作为输入。

 DataStream<Tuple3<Integer, String, String>> ds = env.addSource(new 
RocketMQSource(
........
                        System.out.println(res);
                        return res;
                    }
                });


        tableEnv.registerDataStream("t_pick_task", ds, "pick_task_id, 
pick_list_no, sku_code");

        TableSink<Row> csvSink = new CsvTableSink("D:\\data\\flink",",");
        String[] fieldNames = {"num"};
        TypeInformation[] fieldTypes = {Types.INT};
        tableEnv.registerTableSink("RubberOrders", fieldNames, fieldTypes, 
csvSink);
        tableEnv.sqlUpdate(
                "INSERT INTO RubberOrders SELECT pick_task_id FROM 
t_pick_task");



wangl...@geekplus.com.cn
 
Sender: Alec Chen
Send Time: 2019-08-08 21:01
Receiver: user-zh
Subject: Re: CsvTableSink 目录没有写入具体的数据
完整代码发一下
 
wangl...@geekplus.com.cn <wangl...@geekplus.com.cn> 于2019年8月8日周四 下午7:37写道:
 
>
> 我按官网上的
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/sql.html#specifying-a-query
> 例子写的代码
> 但运行后 CsvTableSink 指定的目录只生成了空文件,没有具体的内容,这是为什么呢?
>
>
>
> wangl...@geekplus.com.cn
>

回复