[ https://issues.apache.org/jira/browse/FLINK-7057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16130197#comment-16130197 ]
ASF GitHub Bot commented on FLINK-7057: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/4238#discussion_r133670563 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobCacheCleanupTest.java --- @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.blob; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.junit.Ignore; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.File; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import static org.junit.Assert.assertEquals; + +/** + * A few tests for the deferred ref-counting based cleanup inside the {@link BlobCache}. + */ +public class BlobCacheCleanupTest { + + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + /** + * Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)}. + */ + @Test + public void testJobCleanup() throws IOException, InterruptedException { + + JobID jobId = new JobID(); + List<BlobKey> keys = new ArrayList<BlobKey>(); + BlobServer server = null; + BlobCache cache = null; + + final byte[] buf = new byte[128]; + + try { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, 1L); + + server = new BlobServer(config, new VoidBlobStore()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + BlobClient bc = new BlobClient(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); + + keys.add(bc.put(jobId, buf)); + buf[0] += 1; + keys.add(bc.put(jobId, buf)); + + bc.close(); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + // register once + cache.registerJob(jobId); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + // register again (let's say, from another thread or so) + cache.registerJob(jobId); + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing once, nothing should change + cache.releaseJob(jobId); + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing the second time, the job is up for deferred cleanup + cache.releaseJob(jobId); + + // because we cannot guarantee that there are not thread races in the build system, we + // loop for a certain while until the references disappear + { + long deadline = System.currentTimeMillis() + 30_000L; + do { + Thread.sleep(100); + } + while (checkFilesExist(jobId, keys, cache, false) != 0 && + System.currentTimeMillis() < deadline); + } + + // the blob cache should no longer contain the files + // this fails if we exited via a timeout + checkFileCountForJob(0, jobId, cache); + // server should be unaffected + checkFileCountForJob(2, jobId, server); + } + finally { + if (cache != null) { + cache.close(); + } + + if (server != null) { + server.close(); + } + // now everything should be cleaned up + checkFileCountForJob(0, jobId, server); + } + } + + /** + * Tests that {@link BlobCache} cleans up after calling {@link BlobCache#releaseJob(JobID)} + * but only after preserving the file for a bit longer. + */ + @Test + @Ignore("manual test due to stalling: ensures a BLOB is retained first and only deleted after the (long) timeout ") + public void testJobDeferredCleanup() throws IOException, InterruptedException { + // file should be deleted between 5 and 10s after last job release + long cleanupInterval = 5L; + + JobID jobId = new JobID(); + List<BlobKey> keys = new ArrayList<BlobKey>(); + BlobServer server = null; + BlobCache cache = null; + + final byte[] buf = new byte[128]; + + try { + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + temporaryFolder.newFolder().getAbsolutePath()); + config.setLong(BlobServerOptions.CLEANUP_INTERVAL, cleanupInterval); + + server = new BlobServer(config, new VoidBlobStore()); + InetSocketAddress serverAddress = new InetSocketAddress("localhost", server.getPort()); + BlobClient bc = new BlobClient(serverAddress, config); + cache = new BlobCache(serverAddress, config, new VoidBlobStore()); + + keys.add(bc.put(jobId, buf)); + buf[0] += 1; + keys.add(bc.put(jobId, buf)); + + bc.close(); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + // register once + cache.registerJob(jobId); + + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(0, jobId, cache); + + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + // register again (let's say, from another thread or so) + cache.registerJob(jobId); + for (BlobKey key : keys) { + cache.getFile(jobId, key); + } + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing once, nothing should change + cache.releaseJob(jobId); + + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, server); + checkFileCountForJob(2, jobId, cache); + + // after releasing the second time, the job is up for deferred cleanup + cache.releaseJob(jobId); + + // files should still be accessible for now + assertEquals(2, checkFilesExist(jobId, keys, cache, true)); + checkFileCountForJob(2, jobId, cache); + + Thread.sleep(cleanupInterval / 5); --- End diff -- We could, for example, extend BlobCache to introduce synchronization hooks. > move BLOB ref-counting from LibraryCacheManager to BlobCache > ------------------------------------------------------------ > > Key: FLINK-7057 > URL: https://issues.apache.org/jira/browse/FLINK-7057 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, Network > Affects Versions: 1.4.0 > Reporter: Nico Kruber > Assignee: Nico Kruber > > Currently, the {{LibraryCacheManager}} is doing some ref-counting for JAR > files managed by it. Instead, we want the {{BlobCache}} to do that itself for > all job-related BLOBs. Also, we do not want to operate on a per-{{BlobKey}} > level but rather per job. Therefore, the cleanup process should be adapted, > too. -- This message was sent by Atlassian JIRA (v6.4.14#64029)