Hi!

filesystem sink 的文件数量与 sink 并发数有关。如果数据量不大可以考虑在 sink DDL 的 with 参数里加入
'sink.parallelism' = '1' 设置 sink 并发度为 1。

陈卓宇 <2572805...@qq.com.invalid> 于2021年11月11日周四 下午4:50写道:

> 问题描述:写到本地产生了8个part-817677cc-2f4b-464a-bf9e-11957bcf9c76-0-0 ...的文件
> 请问,我只想写成一个csv文件,如果关闭这种文件分区
>
>
>
> Flink SQL:
> String tw_smart_tag="CREATE TABLE tw_smart_tag (\n" +
>         "  id STRING,\n" +
>         "  tag_code STRING,\n"+
>         "  parent_id STRING,\n"+
>         "  name STRING,\n"+
>         "  type STRING,\n"+
>         "  tag_type STRING,\n"+
>         "  data_type STRING,\n"+
>         "  status STRING,\n"+
>         "  valid_status STRING,\n"+
>         "  opr_status STRING,\n"+
>         "  online STRING,\n"+
>         "  opr_type STRING,\n"+
>         "  opr_time STRING,\n"+
>         "  invalid_time STRING,\n"+
>         "  auth_type STRING,\n"+
>         "  remark STRING,\n"+
>         "  sort STRING,\n"+
>         "  batch_no STRING,\n"+
>         "  created_by STRING,\n"+
>         "  created_time STRING,\n"+
>         "  updated_by STRING,\n"+
>         "  updated_time STRING\n"+
>         ") WITH (\n" +
>         "  'connector' = 'filesystem',           -- 必选: 指定连接器类型\n" +
>         "  'path' =
> 'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag.csv',
> -- 必选: 指向目录的路径\n" +
>         "  'format' = 'csv'                   -- 必选: 文件系统连接器需要指定格式,请查阅 表格式
> 部分以获取更多细节\n" +
>         ")\n";
>
> String tw_smart_tag_detail="CREATE TABLE tw_smart_tag_detail (\n" +
>         "  id STRING,\n" +
>         "  tag_id STRING,\n"+
>         "  code STRING,\n"+
>         "  name STRING,\n"+
>         "  content STRING,\n"+
>         "  status STRING,\n"+
>         "  created_by STRING,\n"+
>         "  created_time STRING,\n"+
>         "  updated_by STRING,\n"+
>         "  updated_time STRING\n"+
>         ") WITH (\n" +
>         "  'connector' = 'filesystem',           -- 必选: 指定连接器类型\n" +
>         "  'path' =
> 'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag_detail.csv',
> -- 必选: 指向目录的路径\n" +
>         "  'format' = 'csv'                   -- 必选: 文件系统连接器需要指定格式,请查阅 表格式
> 部分以获取更多细节\n" +
>         ")\n";
>
> //导出到本地
> String loaclhostFile="CREATE TABLE loaclhost_File (\n" +
>         "  id STRING,\n" +
>         "  tag_code STRING,\n"+
>         "  name STRING,\n"+
>         "  data_type STRING,\n"+
>         "  detailID STRING,\n"+
>         "  tag_id STRING,\n"+
>         "  detailName STRING\n"+
>         ") WITH (\n" +
>         "  'connector' = 'filesystem',           -- 必选: 指定连接器类型\n" +
>         "  'path' = 'hdfs://ark1:8020//tmp/usertag/20211029/data/',  --
> 必选: 指向目录的路径\n" +
>         "  'format' = 'csv',                   -- 必选: 文件系统连接器需要指定格式,请查阅
> 表格式 部分以获取更多细节\n" +
>         ")\n";
>
> String joinSQL = "insert into loaclhost_File\n" +
>         "SELECT tw_smart_tag.id AS id,\n" +
>         "tw_smart_tag.tag_code AS tag_code,\n" +
>         "tw_smart_tag.name AS name,\n" +
>         "tw_smart_tag.data_type AS data_type,\n" +
>         "tw_smart_tag_detail.id AS detailID,\n" +
>         "tw_smart_tag_detail.tag_id AS tag_id,\n" +
>         "tw_smart_tag_detail.name AS detailName\n" +
>                 "FROM tw_smart_tag INNER JOIN tw_smart_tag_detail ON
> tw_smart_tag.id = tw_smart_tag_detail.tag_id";
> // tw_smart_tag.id = tw_smart_tag_detail.tag_id
> tenv.executeSql(tw_smart_tag).print();
> tenv.executeSql(tw_smart_tag_detail).print();
> tenv.executeSql(loaclhostFile).print();
> tenv.executeSql(joinSQL).print();
>
> 陈卓宇
>
>
> &nbsp;

回复