Hi Dhurandar,
With my understand I think what you need is to get notified when a file is
written successfully (committed) on the S3 FileSystem. However, currently there
is no public API for the listener and there an issue tracking it [1].
With the current version, one possible method comes to me is that may have
to use reflection to access some internal states of StreamFileSink to get the
committed files. As a whole, you may need to implement a customized
StreamingFileSink and override the notifyCheckpointComplete method, where the
new S3 file get committed and visible:
class CustomizedStreamingFileSink extends StreamingFileSink {
public void notifyCheckpointComplete(long checkpointId) throws Exception {
// 1. First use reflection to get the list of files will be committed
in this call.
// The list of files should be get via StreamingFile -> (
StreamingFileSink Helper if 1.11 is used ) -> Buckets -> activeBuckets (there
will be multiple Buckets) -> (for each Bucket)
pendingFileRecoverablesPerCheckpoint
// Then we could get the iterator of pending files to commit in this
time via pendingFileRecoverablesPerCheckpoint.headMap(checkpointId, true)[2]
// Then you could get the S3 object names via (PendingFileRecover if 1.11 is
used) -> CommitRecoverable (Will must be S3Recoverable ) -> objectName.
super.notifyCheckpointComplete(checkpointId); // Get files committed
normally.
// 3. Then here could start writing meta info for S3 objects recorded
in step 1.
}
}
For a single file it may get committed multiple times, therefore the writing
meta info action must also be able to handle the repeat writing.
Another possible method will be to use a seperate source operator to periodly
scans the S3 file system to detect the newly added files and modify their meta
data. There should be embedding source function
ContinuousFileMonitoringFunction[3] for this work, and I think it might be
modified or reused for scanning the files.
Best,
Yun
[1] https://issues.apache.org/jira/browse/FLINK-17900
[2]
https://github.com/apache/flink/blob/a5527e3b2ff4abea2ff8fa05cb755561549be06a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java#L268
[3]
https://github.com/apache/flink/blob/master/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java
------------------------------------------------------------------
Sender:dhurandar S<[email protected]>
Date:2020/06/20 03:19:38
Recipient:user<[email protected]>; Flink Dev<[email protected]>
Theme:adding s3 object metadata while using StreamFileSink
We are creating files in S3 and we want to update the S3 object metadata with
some security-related information for governance purposes.
Right now Apache Flink totally abstracts how and when S3 object gets created in
the system.
Is there a way that we can pass the S3 object metadata and update it for the
object created.
If not,
How can we know when Apache Flink has created an S3 file. Deterministically.
Since once its created in S3 we can write Java code after that to add those
metadata information?
--
Thank you and regards,
Dhurandar