[
https://issues.apache.org/jira/browse/FLINK-25672?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18078314#comment-18078314
]
Sophia Izokun commented on FLINK-25672:
---------------------------------------
[~martijnvisser] wanted to get your input and provide an update.
I have been looking at this and there are two viable design directions. Both
would be opt-in (default behavior unchanged), keep V1 deserialization intact,
AND change `PendingSplitsCheckpointSerializer` to V2.
option a: watermark and offset
replace the unbounded `HashSet<Path>` with `maxProcessedTime` (highest mtime
processed) plus a small set of paths within `[maxProcessedTime - offset,
maxProcessedTime]`. when it comes to deduplication, a file is considered new if
`mtime >= maxProcessedTime - offset` AND the path isn't in the windowed set.
* new state: KB-MB regardless of job lifetime.
* mo external prerequisite.
* Failure mode: files with mtime older than the offset are silently dropped
(historical backfills, app-level mtime overrides).
- this can be mitigated in my opinion with a metric for skipped files and a
configurable late-file policy.
option b: TTL eviction
Pretty much similiar to what Nickel proposed above
tag each path entry with processed-at time, evict on a configurable retention.
`LinkedHashMap<Path, Long>``
* new state: bounded by retention window (still potentially MB-GB on busy
datalakes).
* no assumption about mtime semantics backfills work correctly.
* failure mode: requires user's external retention (S3 lifecycle......) to
outlast the configured TTL, otherwise files silently reprocess.
I also think that shipping both behind explicit builder methods is also on the
table.
I've sent a [DISCUSS] thread to dev@flink with the full writeup. A few specific
questions I'd value your input on:
* Does the scope warrant a FLIP or is an additive opt in pull request
sufficient?
* Single mechanism or both?
> FileSource enumerator remembers paths of all already processed files which
> can result in large state
> ----------------------------------------------------------------------------------------------------
>
> Key: FLINK-25672
> URL: https://issues.apache.org/jira/browse/FLINK-25672
> Project: Flink
> Issue Type: Improvement
> Components: Connectors / FileSystem
> Reporter: Martijn Visser
> Assignee: Sophia Izokun
> Priority: Major
>
> As mentioned in the Filesystem documentation, for Unbounded File Sources, the
> {{FileEnumerator}} currently remembers paths of all already processed files,
> which is a state that can in come cases grow rather large.
> We should look into possibilities to reduce this. We could look into adding a
> compressed form of tracking already processed files (for example by keeping
> modification timestamps lower boundaries).
> When fixed, this should also be reflected in the documentation, as mentioned
> in https://github.com/apache/flink/pull/18288#discussion_r785707311
--
This message was sent by Atlassian Jira
(v8.20.10#820010)