[
https://issues.apache.org/jira/browse/FLINK-29926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17630327#comment-17630327
]
Martijn Visser commented on FLINK-29926:
----------------------------------------
bq. FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any
new changes that has happened on the same file in the directory.
To be honest, if an existing file gets changed, I'm not sure that would be
picked up by the FileSource. The JavaDoc
https://nightlies.apache.org/flink/flink-docs-master/api/java/org/apache/flink/connector/file/src/AbstractFileSource.AbstractFileSourceBuilder.html#monitorContinuously-java.time.Duration-
mentions:
bq. This makes the source a "continuous streaming" source that keeps running,
monitoring for new files, and reads these files when they appear and are
discovered by the monitoring.
So my understanding is that the FileSource checks for new files, not for
changes in file that have already been processed.
> File source continuous monitoring mode ignoring files during savepoint
> upgrade mode
> -----------------------------------------------------------------------------------
>
> Key: FLINK-29926
> URL: https://issues.apache.org/jira/browse/FLINK-29926
> Project: Flink
> Issue Type: Bug
> Reporter: Avinash
> Priority: Critical
> Labels: Flink, ReadFile
>
> During a stateful application upgrade using flink kubernetes operator, the
> StreamExecutionEnvironment.readFile() with
> FileProcessingMode.PROCESS_CONTINUOUSLY mode operator fails to detect any new
> changes that has happened on the same file in the directory.
>
> *Background* : Currently we have a fresh deployment of the application using
> kuberenetes operator using savepoint as the upgarde mode and checkpoint
> enabled.
> env.readFile() with FileProcessingMode.PROCESS_CONTINUOUSLY mode operator
> starts continuosly monitoring the directory (S3 prefix) for any changes and
> also checkpoints for the provided duration.
> {noformat}
> 2022-11-07 10:47:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path:
> s3://test-app/configs
> ...
> ...
> 2022-11-07 10:47:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy
> Source Thread - Source: Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Ignoring s3://test-app/configs/control-event-config.json, with mod time=
> 1667817365000 and global mod time= 1667817365000
> ...
> ...
> 2022-11-07 10:51:40.896 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - ContinuousFileMonitoringFunction checkpointed 1667817365000.{noformat}
> Now we try to upgrade the application using the kubernetes operator, due to
> this the application tries to take savepoint by using the below Suspend
> Mechanism - Cancel with savepoint.
> By doing this, the application calls the cancel methods which inturn sets the
> globalModificationTime = Long.MAX_VALUE and then the savepoint is taken.
> {noformat}
> 2022-11-07 10:54:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy
> Source Thread - Source: Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Ignoring s3://test-app/configs/control-event-config.json, with mod time=
> 1667817365000 and global mod time= 1667817365000
> ...
> 2022-11-07 10:55:12.899 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - ContinuousFileMonitoringFunction checkpointed 9223372036854775807
> ....
> 2022-11-07 10:55:13.090 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Closed File Monitoring Source for path: s3://test-app/{noformat}
> Due to this, the globalModificationTime changed from 1667817365000 to
> MAX_VALUE (9223372036854775807) and gets stored in the savepoint state.
> Once the application restarts with the new changes, the env.readFile()
> operator restores the previous state in which the globalModificationTime =
> Long.MAX_VALUE and starts ignoring any changes done to the file after upgrade
> {noformat}
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='INFO' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Restoring state for the ContinuousFileMonitoringFunction
> ....
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - ContinuousFileMonitoringFunction retrieved a global mod time of
> 9223372036854775807
> ....
> 2022-11-07 11:00:13.577 UTC taskmanager-1-1 [priority='DEBUG' thread='Source:
> Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Opened ContinuousFileMonitoringFunction (taskIdx= 0) for path:
> s3://test-app/configs
> ....
> 2022-11-07 11:00:13.996 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy
> Source Thread - Source: Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Ignoring s3://test-app/configs/control-event-config.json, with mod time=
> 1667821399000 and global mod time= 9223372036854775807
> ...
> ...
> 2022-11-07 11:01:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy
> Source Thread - Source: Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Ignoring s3://test-app/configs/control-event-config.json, with mod time=
> 1667821399000 and global mod time= 9223372036854775807
> ...
> 2022-11-07 11:02:12.897 UTC taskmanager-1-1 [priority='DEBUG' thread='Legacy
> Source Thread - Source: Custom File Source (1/1)#0']
> org.apache.flink.streaming.api.functions.source.ContinuousFileMonitoringFunction@
> - Ignoring s3://test-app/configs/control-event-config.json, with mod time=
> 1667821399000 and global mod time= 9223372036854775807{noformat}
> Cause : The above issue seems to be due the reassignment of the
> globalModificationTime to MAX_VALUE during cancel
> [https://github.com/apache/flink/blob/a77747892b1724fa5ec388c2b0fe519db32664e9/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java#L389]
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)