谢谢回复
简单实现了一下BucketAssigner,可以实现需求
@Override
public String getBucketId(Map element, Context context)
{
if(context.timestamp() - context.currentProcessingTime() < 0) {
return "dt="+context.timestamp();
}
Hi,Jacob
官网有这么一段:`我们可以在格式构建器上调用 .withBucketAssigner(assigner) 来自定义 BucketAssigner
`
链接:
https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/connectors/file_sink.html#%E6%A1%B6%E5%88%86%E9%85%8D
【现状如下】
Flink Job消费kafka消息,每半个小时将消费到的消息进行一系列聚合操作(flink 窗口聚合),然后写入一个orc文件。
据了解,flink写orc的桶分配策略[1],有两种:
一种是基于时间,即按时间为目录创建orc文件。[test/realtime/ : 为根目录]
test/realtime/
└── 2021-03-23--07
├── part-0-0.orc
├── part-0-1.orc
└── 2021-03-23--08
├── part-0-0.orc
├── part-0-1.orc