I used the following way long time ago for writing into partitions in hdfs
(maybe better solutions from others), and not sure any interface change
which you need to check:
val baseDir = HadoopClient.resolve(basePath, env)
datum.apply("darwin.write.hadoop.parquet." + postfix,
FileIO.writeDynamic[String, GenericRecord]()
.by(recordPartition.partitionFunc)
.withDestinationCoder(StringUtf8Coder.of())
.via(DarwinParquetIO.sink(...)
.to(baseDir)
...
.withNaming((partitionFolder: String) =>
relativeFileNaming(StaticValueProvider.of[String](baseDir +
Path.SEPARATOR + partitionFolder), fileNaming))
...
val partitionFunc: T => Stringthe good practice is auto-switch: using
event time field from record value for partitioning when event time
window, or process time.
and partitionFunc could consider multi partition columns to get
subdirectories base on ur file system path separator, e.g. S3.
On Wed, Mar 3, 2021 at 5:36 PM Tao Li <[email protected]> wrote:
> Hi Beam community,
>
>
>
> I have a streaming app that writes every hour’s data to a folder named
> with this hour. With Flink (for example), we can leverage “Bucketing File
> Sink”:
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/connectors/filesystem_sink.html
>
>
>
> However I am not seeing Beam FileIO’s writeDynamic API supports specifying
> different output paths for different groups:
> https://beam.apache.org/releases/javadoc/2.28.0/index.html?org/apache/beam/sdk/io/FileIO.html
>
>
>
> Seems like writeDynamic() only supports specifying different naming
> strategy.
>
>
>
> How can I specify different hourly based output paths for hourly data with
> Beam writeDynamic? Please advise. Thanks!
>
>
>
>
>
--
Yours Sincerely
Kobe Feng