Hi Vijayendra,

OutputFileConfig provides a builder method to create immutable objects
with given 'prefix' and 'suffix'. The parameter which you are passing
to '*withPartPrefix*' will only be evaluated at the time of calling
this method '*withPartPrefix*'. So if you want to achieve a dynamic
'prefix' or 'suffix' then you may try to have your own custom
implementation of 'OutputFileConfig' which could provide a way to set
function definition for 'prefix' or 'suffix'. For the same, I am
attaching you a sample implementation. Kindly make sure that the
function definition which you are passing is serializable.


Use like this

val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
  
.withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))
  .withPartSuffixFunction(()=> ".ext")


Regards,
Ravi

On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <contact....@gmail.com>
wrote:

> Hi Team,
>
> I have tried to assign a dynamic prefix for file name, which contains
> datetime components.
> *The Problem is Job always takes initial datetime when job first starts
> and never refreshes later. *
> *How can I get dynamic current datetime in filename at sink time ?*
>
> *.withPartPrefix
> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))*
>
>
>
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>
> val config = OutputFileConfig
>  .builder() .withPartPrefix("prefix")
>  .withPartSuffix(".ext")
>  .build()
>             val sink = StreamingFileSink
>  .forRowFormat(new Path(outputPath), new SimpleStringEncoder[String]("UTF-8"))
>  .withBucketAssigner(new KeyBucketAssigner())
>  .withRollingPolicy(OnCheckpointRollingPolicy.build()) 
> .withOutputFileConfig(config)
>  .build()
>
>
package com.example

import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig

class DynamicOutputFileConfig(partPrefix:String,partSuffix:String) extends OutputFileConfig(partPrefix,partSuffix) with Serializable {

  private var partPrefixFunction:() => String = _
  private var partSuffixFunction:() => String = _

  def this(){
    this("","")
  }


  override def getPartPrefix: String = if(partPrefixFunction == null) partPrefix else partPrefixFunction.apply()

  /**
    * The suffix for the part name.
    */
  override def getPartSuffix: String = if(partSuffixFunction == null) partSuffix else partSuffixFunction.apply()

  def withPartPrefixFunction(fun:() => String):DynamicOutputFileConfig={
    this.partPrefixFunction = fun
    this
  }

  def withPartSuffixFunction(fun:() => String):DynamicOutputFileConfig={
    this.partSuffixFunction = fun
    this
  }
}

Reply via email to