[ 
https://issues.apache.org/jira/browse/FLINK-25672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18078314#comment-18078314
 ] 

Sophia Izokun commented on FLINK-25672:
---------------------------------------

[~martijnvisser] wanted to get your input and provide an update. 

I have been looking at this and there are two viable design directions. Both 
would be opt-in (default behavior unchanged), keep V1 deserialization intact, 
AND change `PendingSplitsCheckpointSerializer` to V2.

option a: watermark and offset

replace the unbounded `HashSet<Path>` with `maxProcessedTime` (highest mtime 
processed) plus a small set of paths within `[maxProcessedTime - offset, 
maxProcessedTime]`. when it comes to deduplication, a file is considered new if 
`mtime >= maxProcessedTime - offset` AND the path isn't in the windowed set.
 * new state: KB-MB regardless of job lifetime.
 * mo external prerequisite.
 * Failure mode: files with mtime older than the offset are silently dropped 
(historical backfills, app-level mtime overrides). 
    - this can be mitigated in my opinion with a metric for skipped files and a 
configurable late-file policy.

option b: TTL eviction
Pretty much similiar to what Nickel proposed above

tag each path entry with processed-at time, evict on a configurable retention. 
`LinkedHashMap<Path, Long>``
 * new state: bounded by retention window (still potentially MB-GB on busy 
datalakes).
 * no assumption about mtime semantics backfills work correctly.
 * failure mode: requires user's external retention (S3 lifecycle......) to 
outlast the configured TTL, otherwise files silently reprocess.

I also think that shipping both behind explicit builder methods is also on the 
table.

I've sent a [DISCUSS] thread to dev@flink with the full writeup. A few specific 
questions I'd value your input on:
 * Does the scope warrant a FLIP or is an additive opt in pull request 
sufficient?
 * Single mechanism or both?

> FileSource enumerator remembers paths of all already processed files which 
> can result in large state
> ----------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-25672
>                 URL: https://issues.apache.org/jira/browse/FLINK-25672
>             Project: Flink
>          Issue Type: Improvement
>          Components: Connectors / FileSystem
>            Reporter: Martijn Visser
>            Assignee: Sophia Izokun
>            Priority: Major
>
> As mentioned in the Filesystem documentation, for Unbounded File Sources, the 
> {{FileEnumerator}} currently remembers paths of all already processed files, 
> which is a state that can in come cases grow rather large. 
> We should look into possibilities to reduce this. We could look into adding a 
> compressed form of tracking already processed files (for example by keeping 
> modification timestamps lower boundaries).
> When fixed, this should also be reflected in the documentation, as mentioned 
> in https://github.com/apache/flink/pull/18288#discussion_r785707311



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to