[ 
https://issues.apache.org/jira/browse/FLINK-9592?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-9592:
----------------------------------
    Labels: auto-deprioritized-major auto-unassigned pull-request-available 
stale-minor  (was: auto-deprioritized-major auto-unassigned 
pull-request-available)

I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help 
the community manage its development. I see this issues has been marked as 
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated 
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is 
still Minor, please either assign yourself or give an update. Afterwards, 
please remove the label or in 7 days the issue will be deprioritized.


> Notify on moving file into pending/ final state
> -----------------------------------------------
>
>                 Key: FLINK-9592
>                 URL: https://issues.apache.org/jira/browse/FLINK-9592
>             Project: Flink
>          Issue Type: New Feature
>          Components: Connectors / FileSystem
>            Reporter: Rinat Sharipov
>            Priority: Minor
>              Labels: auto-deprioritized-major, auto-unassigned, 
> pull-request-available, stale-minor
>
> Hi mates, I got a proposal about functionality of BucketingSink.
>  
> During implementation of one of our tasks we got the following need - create 
> a meta-file, with the path and additional information about the file, created 
> by BucketingSink, when it’s been moved into final place.
> Unfortunately such behaviour is currently not available for us. 
>  
> We’ve implemented our own Sink, that provides an opportunity to register 
> notifiers, that will be called, when file state is changing, but current API 
> doesn’t allow us to add such behaviour using inheritance ...
>  
> It seems, that such functionality could be useful, and could be a part of 
> BucketingSink API
> What do you sink, should I make a PR ?
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> ------------------------------------------------------------------------------------------------------------------------
>  
> Hi,
> I see that could be a useful feature. What exactly now is preventing you from 
> inheriting from BucketingSink? Maybe it would be just enough to make the 
> BucketingSink easier extendable.
> One thing now that could collide with such feature is that Kostas is now 
> working on larger BucketingSink rework/refactor. 
> Piotrek
> ________________________________________________________________________
>  
> Hi guys, thx for your reply. 
> The following code info is actual for *release-1.5.0 tag, 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class*
>  
> For now, BucketingSink has the following lifecycle of files
>  
> When moving files from opened to pending state:
>  # on each item (*method* *invoke:434* *line*), we check that suitable bucket 
> exist, and contain opened file, in case, when opened file doesn’t exist, we 
> create one, and write item to it
>  # on each item (*method* *invoke:434* *line*), we check that suitable opened 
> file doesn’t exceed the limits, and if limits are exceeded, we close it and 
> move into pending state using *closeCurrentPartFile:568 line - private method*
>  # on each timer request (*onProcessingTime:482 line*), we check, if items 
> haven't been added to the opened file longer, than specified period of time, 
> we close it, using the same private method *closeCurrentPartFile:588 line*
>  
> So, the only way, that we have, is to call our hook from 
> *closeCurrentPartFile*, that is private, so we copy-pasted the current impl 
> and injected our logic there
>  
>  
> Files are moving from pending state into final, during checkpointing 
> lifecycle, in *notifyCheckpointComplete:657 line*, that is public, and 
> contains a lot of logic, including discovery of files in pending states, 
> synchronization of state access and it’s modification, etc … 
>  
> So we couldn’t override it, or call super method and add some logic, because 
> when current impl changes the state of files, it removes them from state, and 
> we don’t have any opportunity to know, 
> for which files state have been changed.
>  
> To solve such problem, we've created the following interface
>  
> /**
>  * The \{@code FileStateChangeCallback}is used to perform any additional 
> operations, when
> {@link BucketingSink}
>  * moves file from one state to another. For more information about state 
> management of \{@code BucketingSink}, look
>  * through it's official documentation.
> */
> public interface FileStateChangeCallback extends Serializable \{ /** * Used 
> to perform any additional operations, related with moving of file into next 
> state. * * @param fs provides access for working with file system * @param 
> path path to the file, moved into next state * * @throws IOException if 
> something went wrong, while performing any operations with file system */ 
> void call(FileSystem fs, Path path) throws IOException; }
> And have added an ability to register this callbacks in BucketingSink impl in 
> the following manner
>  
> public BucketingSink<T> 
> registerOnFinalStateChangeCallback(FileStateChangeCallback… callbacks) \{...}
> public BucketingSink<T> 
> registerOnPendingStateChangeCallback(FileStateChangeCallback... callbacks) 
> \{...}
>  
> I’m ready to discuss the best ways, how such hooks could be implemented in 
> the core impl or any other improvements, that will help us to add such 
> functionality into our extension, using public api, instead of copy-pasting 
> the source code.
>  
> Thx for your help, mates =)
>  [*See More* from Piotr Nowojski|x-redundant-cluster-toggle://0]
>  
> Sincerely yours,
>  *Rinat Sharipov*
> Software Engineer at 1DMP CORE Team
>  
> email: [r.shari...@cleverdata.ru|mailto:a.totma...@cleverdata.ru]
> mobile: +7 (925) 416-37-26
> Clever{color:#4f8f00}DATA{color}
> make your data clever
>  
> ________________________________________________________________________
>  
> Hi,
>  
> Couple of things:
>  
> 1. Please create a Jira ticket with this proposal, so we can move discussion 
> from user mailing list.
>  
> I haven’t thought it through, so take my comments with a grain of salt, 
> however:
>  
> 2. If we were to go with such callback, I would prefer to have one 
> BucketStateChangeCallback, with methods like `onInProgressToPending(…)`, 
> `onPendingToFinal`, `onPendingToCancelled(…)`, etc, in oppose to having one 
> interface passed three times/four times for different purposes.
>  
> 3. Other thing that I had in mind is that BucketingSink could be rewritten to 
> extend TwoPhaseCommitSinkFunction. In that case, with 
>  
> public class BucketingSink2 extends TwoPhaseCommitSinkFunction<???>
>  
> user could add his own hooks by overriding following methods
>  
> BucketingSink2#beginTransaction, BucketingSink2#preCommit, 
> BucketingSink2#commit, BucketingSink2#abort. For example:
>  
> public class MyBucketingSink extends BucketingSink2 {
>   @Override
>   protected void  commit(??? txn)
> {     super.commit(txn);     // My hook on moving file from pending to commit 
> state   }
> ;
> }
>  
> Alternatively, we could implement before mentioned callbacks support in 
> TwoPhaseCommitSinkFunction and provide such feature to 
> Kafka/Pravega/BucketingSink at once.
>  
> Piotrek
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to