Github user aljoscha commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4639#discussion_r140452104
  
    --- Diff: 
flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
 ---
    @@ -0,0 +1,230 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + * http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.core.fs;
    +
    +import org.apache.flink.core.testutils.OneShotLatch;
    +import org.apache.flink.util.AbstractCloseableRegistry;
    +
    +import org.junit.Assert;
    +import org.junit.Test;
    +
    +import java.io.Closeable;
    +import java.io.IOException;
    +import java.util.concurrent.atomic.AtomicInteger;
    +
    +import static org.mockito.Mockito.doAnswer;
    +import static org.mockito.Mockito.verify;
    +import static org.powermock.api.mockito.PowerMockito.spy;
    +
    +public abstract class AbstractCloseableRegistryTest<C extends Closeable, 
T> {
    +
    +   protected ProducerThread[] streamOpenThreads;
    +   protected AbstractCloseableRegistry<C, T> closeableRegistry;
    +   protected AtomicInteger unclosedCounter;
    +
    +   protected abstract C createCloseable();
    +
    +   protected abstract AbstractCloseableRegistry<C, T> createRegistry();
    +
    +   protected abstract ProducerThread<C, T> createProducerThread(
    +           AbstractCloseableRegistry<C, T> registry,
    +           AtomicInteger unclosedCounter,
    +           int maxStreams);
    +
    +   public void setup() {
    +           
Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
    +           this.closeableRegistry = createRegistry();
    +           this.unclosedCounter = new AtomicInteger(0);
    +           this.streamOpenThreads = new ProducerThread[10];
    +           for (int i = 0; i < streamOpenThreads.length; ++i) {
    +                   streamOpenThreads[i] = 
createProducerThread(closeableRegistry, unclosedCounter, Integer.MAX_VALUE);
    +           }
    +   }
    +
    +   protected void startThreads(int maxStreams) {
    +           for (ProducerThread t : streamOpenThreads) {
    +                   t.setMaxStreams(maxStreams);
    +                   t.start();
    +           }
    +   }
    +
    +   protected void joinThreads() throws InterruptedException {
    +           for (Thread t : streamOpenThreads) {
    +                   t.join();
    +           }
    +   }
    +
    +   @Test
    +   public void testClose() throws Exception {
    +
    +           setup();
    +           startThreads(Integer.MAX_VALUE);
    +
    +           for (int i = 0; i < 5; ++i) {
    +                   System.gc();
    +                   Thread.sleep(40);
    +           }
    +
    +           closeableRegistry.close();
    +
    +           joinThreads();
    +
    +           Assert.assertEquals(0, unclosedCounter.get());
    +           Assert.assertEquals(0, 
closeableRegistry.getNumberOfRegisteredCloseables());
    +
    +           final C testCloseable = spy(createCloseable());
    +
    +           try {
    +
    +                   closeableRegistry.registerClosable(testCloseable);
    +
    +                   Assert.fail("Closed registry should not accept 
closeables!");
    +
    +           } catch (IOException expected) {
    +                   //expected
    +           }
    +
    +           Assert.assertEquals(0, unclosedCounter.get());
    +           Assert.assertEquals(0, 
closeableRegistry.getNumberOfRegisteredCloseables());
    +           verify(testCloseable).close();
    +   }
    +
    +   @Test
    +   public void testNonBlockingClose() throws Exception {
    +           setup();
    +
    +           final OneShotLatch waitRegistryClosedLatch = new OneShotLatch();
    +           final OneShotLatch blockCloseLatch = new OneShotLatch();
    +
    +           final C spyCloseable = spy(createCloseable());
    +
    +           doAnswer(invocationOnMock -> {
    +                   invocationOnMock.callRealMethod();
    +                   waitRegistryClosedLatch.trigger();
    +                   blockCloseLatch.await();
    +                   return null;
    +           }).when(spyCloseable).close();
    +
    +           closeableRegistry.registerClosable(spyCloseable);
    +
    +           Assert.assertEquals(1, 
closeableRegistry.getNumberOfRegisteredCloseables());
    +
    +           Thread closer = new Thread(() -> {
    +                   try {
    +                           closeableRegistry.close();
    +                   } catch (IOException ignore) {
    +
    +                   }
    +           });
    +
    +           closer.start();
    +           waitRegistryClosedLatch.await();
    +
    +           final C testCloseable = spy(createCloseable());
    +
    +           try {
    +                   closeableRegistry.registerClosable(testCloseable);
    +                   Assert.fail("Closed registry should not accept 
closeables!");
    +           }catch (IOException ignore) {
    +           }
    +
    +           blockCloseLatch.trigger();
    +           closer.join();
    +
    +           verify(spyCloseable).close();
    +           verify(testCloseable).close();
    +           Assert.assertEquals(0, 
closeableRegistry.getNumberOfRegisteredCloseables());
    +   }
    +
    +   protected static abstract class ProducerThread<C extends Closeable, T> 
extends Thread {
    +
    +           protected final AbstractCloseableRegistry<C, T> registry;
    +           protected final AtomicInteger refCount;
    +           protected int maxStreams;
    +
    +           public ProducerThread(AbstractCloseableRegistry<C, T> registry, 
AtomicInteger refCount, int maxStreams) {
    +                   this.registry = registry;
    +                   this.refCount = refCount;
    +                   this.maxStreams = maxStreams;
    +           }
    +
    +           public int getMaxStreams() {
    +                   return maxStreams;
    +           }
    +
    +           public void setMaxStreams(int maxStreams) {
    +                   this.maxStreams = maxStreams;
    +           }
    +
    +           protected abstract void createAndRegisterStream() throws 
IOException;
    +
    +           @Override
    +           public void run() {
    +                   try {
    +                           while (maxStreams > 0) {
    +
    +                                   createAndRegisterStream();
    +
    +                                   try {
    +                                           Thread.sleep(2);
    +                                   } catch (InterruptedException ignored) 
{}
    +
    +                                   if (maxStreams != Integer.MAX_VALUE) {
    +                                           --maxStreams;
    --- End diff --
    
    nit: I always find it strange when a field that should be a final is also 
used as a "counter". In those cases there should probably be a field 
`numStream` and you loop until `maxStreams == openStreams`.


---

Reply via email to