Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/4639#discussion_r140452104 --- Diff: flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java --- @@ -0,0 +1,230 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.core.fs; + +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.util.AbstractCloseableRegistry; + +import org.junit.Assert; +import org.junit.Test; + +import java.io.Closeable; +import java.io.IOException; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.spy; + +public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> { + + protected ProducerThread[] streamOpenThreads; + protected AbstractCloseableRegistry<C, T> closeableRegistry; + protected AtomicInteger unclosedCounter; + + protected abstract C createCloseable(); + + protected abstract AbstractCloseableRegistry<C, T> createRegistry(); + + protected abstract ProducerThread<C, T> createProducerThread( + AbstractCloseableRegistry<C, T> registry, + AtomicInteger unclosedCounter, + int maxStreams); + + public void setup() { + Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning()); + this.closeableRegistry = createRegistry(); + this.unclosedCounter = new AtomicInteger(0); + this.streamOpenThreads = new ProducerThread[10]; + for (int i = 0; i < streamOpenThreads.length; ++i) { + streamOpenThreads[i] = createProducerThread(closeableRegistry, unclosedCounter, Integer.MAX_VALUE); + } + } + + protected void startThreads(int maxStreams) { + for (ProducerThread t : streamOpenThreads) { + t.setMaxStreams(maxStreams); + t.start(); + } + } + + protected void joinThreads() throws InterruptedException { + for (Thread t : streamOpenThreads) { + t.join(); + } + } + + @Test + public void testClose() throws Exception { + + setup(); + startThreads(Integer.MAX_VALUE); + + for (int i = 0; i < 5; ++i) { + System.gc(); + Thread.sleep(40); + } + + closeableRegistry.close(); + + joinThreads(); + + Assert.assertEquals(0, unclosedCounter.get()); + Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables()); + + final C testCloseable = spy(createCloseable()); + + try { + + closeableRegistry.registerClosable(testCloseable); + + Assert.fail("Closed registry should not accept closeables!"); + + } catch (IOException expected) { + //expected + } + + Assert.assertEquals(0, unclosedCounter.get()); + Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables()); + verify(testCloseable).close(); + } + + @Test + public void testNonBlockingClose() throws Exception { + setup(); + + final OneShotLatch waitRegistryClosedLatch = new OneShotLatch(); + final OneShotLatch blockCloseLatch = new OneShotLatch(); + + final C spyCloseable = spy(createCloseable()); + + doAnswer(invocationOnMock -> { + invocationOnMock.callRealMethod(); + waitRegistryClosedLatch.trigger(); + blockCloseLatch.await(); + return null; + }).when(spyCloseable).close(); + + closeableRegistry.registerClosable(spyCloseable); + + Assert.assertEquals(1, closeableRegistry.getNumberOfRegisteredCloseables()); + + Thread closer = new Thread(() -> { + try { + closeableRegistry.close(); + } catch (IOException ignore) { + + } + }); + + closer.start(); + waitRegistryClosedLatch.await(); + + final C testCloseable = spy(createCloseable()); + + try { + closeableRegistry.registerClosable(testCloseable); + Assert.fail("Closed registry should not accept closeables!"); + }catch (IOException ignore) { + } + + blockCloseLatch.trigger(); + closer.join(); + + verify(spyCloseable).close(); + verify(testCloseable).close(); + Assert.assertEquals(0, closeableRegistry.getNumberOfRegisteredCloseables()); + } + + protected static abstract class ProducerThread<C extends Closeable, T> extends Thread { + + protected final AbstractCloseableRegistry<C, T> registry; + protected final AtomicInteger refCount; + protected int maxStreams; + + public ProducerThread(AbstractCloseableRegistry<C, T> registry, AtomicInteger refCount, int maxStreams) { + this.registry = registry; + this.refCount = refCount; + this.maxStreams = maxStreams; + } + + public int getMaxStreams() { + return maxStreams; + } + + public void setMaxStreams(int maxStreams) { + this.maxStreams = maxStreams; + } + + protected abstract void createAndRegisterStream() throws IOException; + + @Override + public void run() { + try { + while (maxStreams > 0) { + + createAndRegisterStream(); + + try { + Thread.sleep(2); + } catch (InterruptedException ignored) {} + + if (maxStreams != Integer.MAX_VALUE) { + --maxStreams; --- End diff -- nit: I always find it strange when a field that should be a final is also used as a "counter". In those cases there should probably be a field `numStream` and you loop until `maxStreams == openStreams`.
---