Hi all,
I’ve got a Flink job that uses Kinesis as source and S3 files as Sink. The sink
rolls at checkpoints and the checkpointing itself is configured as
EXACTLY_ONCE. While running, everything looks good and a new bunch of files
appear on s3 each minute (checkpoint is each 60s).
The problem happens when I stop the job with savepoint. The job generates a
savepoint that contains Kinesis offsets but new files are uploaded to s3
containing records that go beyond the offsets from the savepoint.
I doubt that’s the normal behavior because it breaks the exactly once principle.
Has anyone met this kind of behavior?
That’s how the sink is defined:
FileSink.forBulkFormat (new Path (basePath), new ParquetWriterFactory<>
(parquetBuilder))
.withBucketAssigner (new DateTimeBucketAssigner<> (dateTimeFormat))
.withOutputFileConfig (OutputFileConfig.builder ()
.withPartPrefix (String.format ("part-%s", UUID.randomUUID ()))
.withPartSuffix (String.format ("%s.parquet",
compressionCodecName.getExtension ()))
.build ())
.build ();
Thanks,
Vararu Vadim.