I am new to Flink and doing a POC on it and using it to read data from kafka
topic and to store it in files on server. I am using FileSink to store files,
it creates the directory structure date and time wise but no logs files are
getting created.
When i run the program it creates directory structure as below but log files
are not getting stored here.
/flink/testlogs/2021-12-08--07
/flink/testlogs/2021-12-08--06
I want the log files should be written every 15 mins to a new log file. Below
is the code.
DataStream <String> kafkaTopicData = env.addSource(new
FlinkKafkaConsumer<String>("MyTopic",new SimpleStringSchema(),p));
OutputFileConfig config = OutputFileConfig
.builder()
.withPartPrefix("prefix")
.withPartSuffix(".ext")
.build();
DataStream <Tuple6 < String,String,String ,String, String ,Integer >>
newStream=kafkaTopicData.map(new LogParser());
final FileSink<Tuple6<String, String, String, String, String, Integer>> sink =
FileSink.forRowFormat(new Path("/flink/testlogs"),
new SimpleStringEncoder < Tuple6 < String,String,String
,String, String ,Integer >> ("UTF-8"))
.withRollingPolicy(DefaultRollingPolicy.builder()
.withRolloverInterval(TimeUnit.MINUTES.toMillis(15))
.withInactivityInterval(TimeUnit.MINUTES.toMillis(5))
.withMaxPartSize(1024 * 1024 * 1024)
.build())
.withOutputFileConfig(config)
.build();
newStream.sinkTo(sink);
env.execute("DataReader");
LogParser returns Tuple6.
Regards,