Github user tedyu commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r134142624 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java --- @@ -108,11 +139,63 @@ public BlobCache( this.numFetchRetries = 0; } + // Initializing the clean up task + this.cleanupTimer = new Timer(true); + + cleanupInterval = blobClientConfig.getLong(BlobServerOptions.CLEANUP_INTERVAL) * 1000; + this.cleanupTimer.schedule(this, cleanupInterval, cleanupInterval); + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } /** + * Registers use of job-related BLOBs. + * <p> + * Using any other method to access BLOBs, e.g. {@link #getFile}, is only valid within calls + * to {@link #registerJob(JobID)} and {@link #releaseJob(JobID)}. + * + * @param jobId + * ID of the job this blob belongs to + * + * @see #releaseJob(JobID) + */ + public void registerJob(JobID jobId) { + synchronized (jobRefCounters) { + RefCount ref = jobRefCounters.get(jobId); + if (ref == null) { + ref = new RefCount(); + jobRefCounters.put(jobId, ref); + } + ++ref.references; + } + } + + /** + * Unregisters use of job-related BLOBs and allow them to be released. + * + * @param jobId + * ID of the job this blob belongs to + * + * @see #registerJob(JobID) + */ + public void releaseJob(JobID jobId) { + synchronized (jobRefCounters) { + RefCount ref = jobRefCounters.get(jobId); + + if (ref == null) { + LOG.warn("improper use of releaseJob() without a matching number of registerJob() calls"); --- End diff -- Including jobId would help troubleshooting.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---