Hi Yadav,

What Flink version are you using? `getPartPrefix` and `getPartSufix`
methods were not public before 1.10.1/1.11.0, which might be causing
this problem for you. Other than that, if you are already using Flink
1.10.1 (or newer), maybe please double check what class are you extending?
The example provided by Ravi seems to be working for me.

Piotrek

wt., 13 paź 2020 o 19:02 Vijayendra Yadav <contact....@gmail.com>
napisał(a):

> *Thanks Ravi. I got following Error:*
> [ERROR] DynamicOutputFileConfig.scala:21: error: method getPartPrefix
> overrides nothing
> [ERROR]   override def getPartPrefix: String = if(partPrefixFunction ==
> null) partPrefix else partPrefixFunction.apply()
> [ERROR]                ^
> [ERROR] DynamicOutputFileConfig.scala:26: error: method getPartSuffix
> overrides nothing
> [ERROR]   override def getPartSuffix: String = if(partSuffixFunction ==
> null) partSuffix else partSuffixFunction.apply()
>
>
>
> On Tue, Oct 13, 2020 at 7:29 AM Ravi Bhushan Ratnakar <
> ravibhushanratna...@gmail.com> wrote:
>
>> Hi Vijayendra,
>>
>> OutputFileConfig provides a builder method to create immutable objects with 
>> given 'prefix' and 'suffix'. The parameter which you are passing to 
>> '*withPartPrefix*' will only be evaluated at the time of calling this method 
>> '*withPartPrefix*'. So if you want to achieve a dynamic 'prefix' or 'suffix' 
>> then you may try to have your own custom implementation of 
>> 'OutputFileConfig' which could provide a way to set function definition for 
>> 'prefix' or 'suffix'. For the same, I am attaching you a sample 
>> implementation. Kindly make sure that the function definition which you are 
>> passing is serializable.
>>
>>
>> Use like this
>>
>> val outputFileConfig:OutputFileConfig = new DynamicOutputFileConfig()
>>   
>> .withPartPrefixFunction(()=>ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))
>>   .withPartSuffixFunction(()=> ".ext")
>>
>>
>> Regards,
>> Ravi
>>
>> On Tue, Oct 13, 2020 at 6:05 AM Vijayendra Yadav <contact....@gmail.com>
>> wrote:
>>
>>> Hi Team,
>>>
>>> I have tried to assign a dynamic prefix for file name, which contains
>>> datetime components.
>>> *The Problem is Job always takes initial datetime when job first starts
>>> and never refreshes later. *
>>> *How can I get dynamic current datetime in filename at sink time ?*
>>>
>>> *.withPartPrefix
>>> (ZonedDateTime.now.withZoneSameInstant(ZoneId.of("UTC")).format(DateTimeFormatter.ofPattern("yyyy-MM-dd-HH-mm-ss-SSS")))*
>>>
>>>
>>>
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/connectors/streamfile_sink.html
>>>
>>> val config = OutputFileConfig
>>>  .builder() .withPartPrefix("prefix")
>>>  .withPartSuffix(".ext")
>>>  .build()
>>>             val sink = StreamingFileSink
>>>  .forRowFormat(new Path(outputPath), new 
>>> SimpleStringEncoder[String]("UTF-8"))
>>>  .withBucketAssigner(new KeyBucketAssigner())
>>>  .withRollingPolicy(OnCheckpointRollingPolicy.build()) 
>>> .withOutputFileConfig(config)
>>>  .build()
>>>
>>>

Reply via email to