Hi Vijayendra, OutputFileConfig provides a builder method to create immutable objects with given 'prefix' and 'suffix'. The parameter which you are passing to '*withPartPrefix*' will only be evaluated at the time of calling this method '*withPartPrefix*'. So if you want to achieve a dynamic 'prefix' or 'suffix' then you may try to have your own custom implementation of 'OutputFileConfig' which could provide a way to set function definition for 'prefix' or 'suffix'. For the same, I am attaching you a sample implementation. Kindly make sure that the function definition which you are passing is serializable.
Use like this val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig() .withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS"))) .withPartSuffixFunction(()=> ".ext") Regards, Ravi On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <contact....@gmail.com> wrote: > Hi Team, > > I have tried to assign a dynamic prefix for file name, which contains > datetime components. > *The Problem is Job always takes initial datetime when job first starts > and never refreshes later. * > *How can I get dynamic current datetime in filename at sink time ?* > > *.withPartPrefix > (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))* > > > > https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html > > val config = OutputFileConfig > .builder() .withPartPrefix("prefix") > .withPartSuffix(".ext") > .build() > val sink = StreamingFileSink > .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8")) > .withBucketAssigner(new KeyBucketAssigner()) > .withRollingPolicy(OnCheckpointRollingPolicy.build()) > .withOutputFileConfig(config) > .build() > >
package com.example import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig class DynamicOutputFileConfig(partPrefix:String,partSuffix:String) extends OutputFileConfig(partPrefix,partSuffix) with Serializable { private var partPrefixFunction:() => String = _ private var partSuffixFunction:() => String = _ def this(){ this("","") } override def getPartPrefix: String = if(partPrefixFunction == null) partPrefix else partPrefixFunction.apply() /** * The suffix for the part name. */ override def getPartSuffix: String = if(partSuffixFunction == null) partSuffix else partSuffixFunction.apply() def withPartPrefixFunction(fun:() => String):DynamicOutputFileConfig={ this.partPrefixFunction = fun this } def withPartSuffixFunction(fun:() => String):DynamicOutputFileConfig={ this.partSuffixFunction = fun this } }