Hello Everyone,

I'd like to start a discussion on FLINK-25672 [1], which I've recently
been assigned. Before drafting a FLIP I want to lay out the design space
and get the community's input on which direction (or combination) to
pursue.


The FiileSource​ ContinuousFileSplitEnumerator​ keeps an unbounded 
HashSet<Path> of every
file ever processed. The set is held in JobManager coordinator state and
serialized into PendingSplitsCheckpoint on every checkpoint. For
long-running streaming jobs over massive amounts of data, this set
grows without bound and is also documented as a current limitation in
the FileSystem connector docs [2].

Two independent production reports establish the pattern:

  - Cliff R. (FLINK-25672 comment, Jan 2023): 16 GB JM heap pool
    exhausted in a HA Kubernetes setup, blocked by un-evictable file
    path history.

  - Nickel F. (FLINK-25672 comment, Apr 2025): JM OOM, fixed in their
    fork with TTL-based eviction.


The comments above implemented their own fixes because the project hasn't
shipped one yet.



I am currently mulling over two viable options, both options below are opt-in 
(default behavior unchanged), keep the serializer's V1 deserialization branch 
intact for savepoint compatibility, and change 
PendingSplitsCheckpointSerializer​ to V2.

Option a: watermark + offset

Replace the unbounded HashSet<Path> with two pieces of state:

  - maxProcessedTime: the highest mtime ever processed.
  - processedFilesInOffsetWindow: a HashSet<Path> of paths whose mtime
    falls within [maxProcessedTime - offset, maxProcessedTime].

Dedup decision: a file is new if mtime >= maxProcessedTime - offset AND
the path isn't in the windowed set. After each discovery batch, advance
the watermark and evict anything older than (maxProcessedTime - offset).

User-facing API (sketch):

  FileSource.forRecordStreamFormat(format, path)
      .monitorContinuously(Duration.ofSeconds(30))
      .outOfOrderTolerance(Duration.ofMinutes(5))
      .build();

pros:
  - State bounded by the offset window. For typical workloads that's
    a few KB to a few MB.
  - No external prerequisite. Works on any filesystem.
  - FileSourceSplit already carries fileModificationTime, so no
    FileSystem API changes are needed.

cons:
  - Files with mtime older than (maxProcessedTime - offset) are
    silently dropped. This breaks the following workloads:

  *
Historical backfills
  *
Application-level mtime overrides (mtime set to event-time

        rather than wall-clock).
  - The offset must be sized appropriately to absorb expected mtime v. arrival
    variance, including scenarios such as S3 multipart upload duration and 
replication
    lag.

i believe there are some mitigations that can be included such as:
  - a metric: count files skipped because their mtime fell below the
    boundary. This makes the silent-drop failure mode observable to the end 
user.
  - an optional way to deal with this: configurable late-file policy (skip / 
process /
    side-output), mirroring how Flink handles late records.

option b: TTL-based eviction

Keep a path set, but tag each entry with the timestamp at which it was
processed and evict entries older than a configurable retention.

State: LinkedHashMap<Path, Long> processedPathsByTime. Insertion-order
iteration makes eviction amortized.

User-facing API (sketch):

  FileSource.forRecordStreamFormat(format, path)
      .monitorContinuously(Duration.ofSeconds(30))
      .withProcessedPathsRetention(Duration.ofDays(7))
      .build();

Pros:
  - No assumption about mtime semantics. Backfills, replication lag,
    and application-level mtime overrides all work correctly because
    dedup is by path, not by timestamp.
  - Quite conceptually simple and easy: same data structure, just with eviction.
  - Already proven in production  
https://issues.apache.org/jira/browse/FLINK-25672?focusedCommentId=17941577&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17941577.

Cons:
  - Requires an external retention guarantee from the user (such as a S3
    lifecycle policy). If storage retention is longer than the
    configured TTL, files reappear and get reprocessed silently.
  - State is bounded by retention duration, not by offset window.
    For a 7-day retention on a busy datalake, that's still tens of
    millions of paths, though it is better than unbounded it can still get 
quite large. Larger than the first options footprint

Both options above require tradeoffs regardless of which method is used.

Another option is that we could ship both behind explicit builder methods,
with the unbounded mode as default. But that conclusion should come out
of this discussion, not into it.




My question for this dev list:

  1. Single mechanism or both? If one, which one?
  2. For option a: reasonable default offset? my instinct is 5 min as a default 
to absorb
     multipart uploads and replication lag, but I have no strong
     basis for that number.
  3. For option b: when we measure how old a path entry is for eviction, should 
we use the JobManager's wall-clock time or Flink's processing time? My instinct 
is wall-clock since users will set the TTL to match their external storage 
retention
  4. is it worth introducing a `processedPathsStateSize` metric so users
     can observe state size regardless of which mode they choose?
  5. Does the scope of this change warrant a FLIP, or is a
     well documented pull request sufficient given the additive, opt-in
     nature?


Regards,
Sophia

[1] https://issues.apache.org/jira/browse/FLINK-25672
[2] 
https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/filesystem/#current-limitations

Reply via email to