Some data is silently lost on my Flink stream job when state is restored
from a savepoint.

Do you have any debugging hints to find out where exactly the data gets
dropped?

My job gathers distinct values using a 24-hour window. It doesn't have any
custom state management.

When I cancel the job with savepoint and restore from that savepoint, some
data is missed. It seems to be losing just a small amount of data. The
event time of lost data is probably around the time of savepoint. In other
words the rest of the time window is not entirely missed – collection works
correctly also for (most of the) events that come in after restoring.

When the job processes a full 24-hour window without interruptions it
doesn't miss anything.

Usually the problem doesn't happen in test environments that have smaller
parallelism and smaller data volumes. But in production volumes the job
seems to be consistently missing at least something on every restore.

This issue has consistently happened since the job was initially created.
It was at first run on an older version of Flink 1.5-SNAPSHOT and it still
happens on both Flink 1.5.2 & 1.6.0.

I'm wondering if this could be for example some synchronization issue
between the kafka consumer offsets vs. what's been written by BucketingSink?

1. Job content, simplified

        kafkaStream
                .flatMap(new ExtractFieldsFunction())
                .keyBy(new MapKeySelector(1, 2, 3, 4))
                .timeWindow(Time.days(1))
                .allowedLateness(allowedLateness)
                .sideOutputLateData(lateDataTag)
                .reduce(new DistinctFunction())
                .addSink(sink)
                // use a fixed number of output partitions
                .setParallelism(8))

/**
 * Usage: .keyBy("the", "distinct", "fields").reduce(new DistinctFunction())
 */
public class DistinctFunction implements
ReduceFunction<java.util.Map<String, String>> {
    @Override
    public Map<String, String> reduce(Map<String, String> value1,
Map<String, String> value2) {
        return value1;
    }
}

2. State configuration

boolean enableIncrementalCheckpointing = true;
String statePath = "s3n://bucket/savepoints";
new RocksDBStateBackend(statePath, enableIncrementalCheckpointing);

Checkpointing Mode Exactly Once
Interval 1m 0s
Timeout 10m 0s
Minimum Pause Between Checkpoints 1m 0s
Maximum Concurrent Checkpoints 1
Persist Checkpoints Externally Enabled (retain on cancellation)

3. BucketingSink configuration

We use BucketingSink, I don't think there's anything special here, if not
the fact that we're writing to S3.

        String outputPath = "s3://bucket/output";
        BucketingSink<Map<String, String>> sink = new
BucketingSink<Map<String, String>>(outputPath)
                .setBucketer(new ProcessdateBucketer())
                .setBatchSize(batchSize)
                .setInactiveBucketThreshold(inactiveBucketThreshold)

.setInactiveBucketCheckInterval(inactiveBucketCheckInterval);
        sink.setWriter(new IdJsonWriter());

4. Kafka & event time

My flink job reads the data from Kafka, using a
BoundedOutOfOrdernessTimestampExtractor on the kafka consumer to
synchronize watermarks accross all kafka partitions. We also write late
data to side output, but nothing is written there – if it would, it could
explain missed data in the main output (I'm also sure that our late data
writing works, because we previously had some actual late data which ended
up there).

5. allowedLateness

It may be or may not be relevant that I have also enabled allowedLateness
with 1 minute lateness on the 24-hour window:

If that makes sense, I could try removing allowedLateness entirely? That
would be just to rule out that Flink doesn't have a bug that's related to
restoring state in combination with the allowedLateness feature. After all,
all of our data should be in a good enough order to not be late, given the
max out of orderness used on kafka consumer timestamp extractor.

Thank you in advance!

Reply via email to