Hi! filesystem sink 的文件数量与 sink 并发数有关。如果数据量不大可以考虑在 sink DDL 的 with 参数里加入 'sink.parallelism' = '1' 设置 sink 并发度为 1。
陈卓宇 <2572805...@qq.com.invalid> 于2021年11月11日周四 下午4:50写道: > 问题描述:写到本地产生了8个part-817677cc-2f4b-464a-bf9e-11957bcf9c76-0-0 ...的文件 > 请问,我只想写成一个csv文件,如果关闭这种文件分区 > > > > Flink SQL: > String tw_smart_tag="CREATE TABLE tw_smart_tag (\n" + > " id STRING,\n" + > " tag_code STRING,\n"+ > " parent_id STRING,\n"+ > " name STRING,\n"+ > " type STRING,\n"+ > " tag_type STRING,\n"+ > " data_type STRING,\n"+ > " status STRING,\n"+ > " valid_status STRING,\n"+ > " opr_status STRING,\n"+ > " online STRING,\n"+ > " opr_type STRING,\n"+ > " opr_time STRING,\n"+ > " invalid_time STRING,\n"+ > " auth_type STRING,\n"+ > " remark STRING,\n"+ > " sort STRING,\n"+ > " batch_no STRING,\n"+ > " created_by STRING,\n"+ > " created_time STRING,\n"+ > " updated_by STRING,\n"+ > " updated_time STRING\n"+ > ") WITH (\n" + > " 'connector' = 'filesystem', -- 必选: 指定连接器类型\n" + > " 'path' = > 'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag.csv', > -- 必选: 指向目录的路径\n" + > " 'format' = 'csv' -- 必选: 文件系统连接器需要指定格式,请查阅 表格式 > 部分以获取更多细节\n" + > ")\n"; > > String tw_smart_tag_detail="CREATE TABLE tw_smart_tag_detail (\n" + > " id STRING,\n" + > " tag_id STRING,\n"+ > " code STRING,\n"+ > " name STRING,\n"+ > " content STRING,\n"+ > " status STRING,\n"+ > " created_by STRING,\n"+ > " created_time STRING,\n"+ > " updated_by STRING,\n"+ > " updated_time STRING\n"+ > ") WITH (\n" + > " 'connector' = 'filesystem', -- 必选: 指定连接器类型\n" + > " 'path' = > 'hdfs://ark1:8020//tmp/usertag/20211029/db_31abd9593e9983ec/metadata/tw_smart_tag_detail.csv', > -- 必选: 指向目录的路径\n" + > " 'format' = 'csv' -- 必选: 文件系统连接器需要指定格式,请查阅 表格式 > 部分以获取更多细节\n" + > ")\n"; > > //导出到本地 > String loaclhostFile="CREATE TABLE loaclhost_File (\n" + > " id STRING,\n" + > " tag_code STRING,\n"+ > " name STRING,\n"+ > " data_type STRING,\n"+ > " detailID STRING,\n"+ > " tag_id STRING,\n"+ > " detailName STRING\n"+ > ") WITH (\n" + > " 'connector' = 'filesystem', -- 必选: 指定连接器类型\n" + > " 'path' = 'hdfs://ark1:8020//tmp/usertag/20211029/data/', -- > 必选: 指向目录的路径\n" + > " 'format' = 'csv', -- 必选: 文件系统连接器需要指定格式,请查阅 > 表格式 部分以获取更多细节\n" + > ")\n"; > > String joinSQL = "insert into loaclhost_File\n" + > "SELECT tw_smart_tag.id AS id,\n" + > "tw_smart_tag.tag_code AS tag_code,\n" + > "tw_smart_tag.name AS name,\n" + > "tw_smart_tag.data_type AS data_type,\n" + > "tw_smart_tag_detail.id AS detailID,\n" + > "tw_smart_tag_detail.tag_id AS tag_id,\n" + > "tw_smart_tag_detail.name AS detailName\n" + > "FROM tw_smart_tag INNER JOIN tw_smart_tag_detail ON > tw_smart_tag.id = tw_smart_tag_detail.tag_id"; > // tw_smart_tag.id = tw_smart_tag_detail.tag_id > tenv.executeSql(tw_smart_tag).print(); > tenv.executeSql(tw_smart_tag_detail).print(); > tenv.executeSql(loaclhostFile).print(); > tenv.executeSql(joinSQL).print(); > > 陈卓宇 > > >