Hi Flink Community,
I hope this email finds you well. I am currently in the process of
migrating my Flink application from version 1.12.7 to 1.17.2 and have
encountered a behavior issue with the FileSource while reading data from an
S3 bucket.
In the previous version (1.12.7), I was utilizing the readFile method with
the TextInputFormat to continuously monitor the S3 bucket for any updates
or new files added at a specified time interval. The code snippet for this
was as follows:
*streamExecutionEnvironment .readFile(new TextInputFormat(new
Path("s3://my-s3-path")), "s3://my-s3-path",
FileProcessingMode.PROCESS_CONTINUOUSLY, 10000)
.setParallelism(1);*
Now, after migrating to Flink 1.17.2, I have switched to using the
FileSource for continuous monitoring. The code snippet for this is as
follows:
*FileSource<String> fileSource = FileSource .forRecordStreamFormat(new
TextLineInputFormat(), new Path("s3://my-s3-path"))
.monitorContinuously(Duration.ofMillis(10000)) .build();*
*streamExecutionEnvironment .fromSource(fileSource,
WatermarkStrategy.noWatermarks(), "filesource") .uid("filesource")
.setParallelism(1);*
While this setup successfully detects new files added to the S3 bucket, but
it seems to be missing changes made to existing files. I am unsure if this
is expected behavior in Flink 1.17.2 or if there is a configuration detail
I might be overlooking.
Any guidance or suggestions on resolving this issue would be greatly
appreciated.
Thanks,
Prasanna