Repository: flink
Updated Branches:
  refs/heads/master 1ebd44a63 -> 5af463a9c


[FLINK-7524] Remove potentially blocking behaviour from 
AbstractCloseableRegistry.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0073204b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0073204b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0073204b

Branch: refs/heads/master
Commit: 0073204b257860ad104cde29d3795b3c633f4759
Parents: 1ebd44a
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon Sep 4 12:30:41 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Mon Sep 25 16:04:06 2017 +0200

----------------------------------------------------------------------
 .../apache/flink/core/fs/CloseableRegistry.java |   9 +-
 .../core/fs/SafetyNetCloseableRegistry.java     |  35 ++-
 .../flink/util/AbstractCloseableRegistry.java   |  92 ++++++--
 .../apache/flink/util/WrappingProxyUtil.java    |   8 +-
 .../core/fs/AbstractCloseableRegistryTest.java  | 223 +++++++++++++++++++
 .../flink/core/fs/CloseableRegistryTest.java    |  59 +++++
 .../core/fs/SafetyNetCloseableRegistryTest.java | 211 +++++-------------
 ...StateSnapshotContextSynchronousImplTest.java |   4 +-
 8 files changed, 449 insertions(+), 192 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
index 0d4ea0c..29f363c 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/CloseableRegistry.java
@@ -21,8 +21,9 @@ package org.apache.flink.core.fs;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.util.AbstractCloseableRegistry;
 
+import javax.annotation.Nonnull;
+
 import java.io.Closeable;
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -39,16 +40,16 @@ public class CloseableRegistry extends 
AbstractCloseableRegistry<Closeable, Obje
        private static final Object DUMMY = new Object();
 
        public CloseableRegistry() {
-               super(new HashMap<Closeable, Object>());
+               super(new HashMap<>());
        }
 
        @Override
-       protected void doRegister(Closeable closeable, Map<Closeable, Object> 
closeableMap) throws IOException {
+       protected void doRegister(@Nonnull Closeable closeable, @Nonnull 
Map<Closeable, Object> closeableMap) {
                closeableMap.put(closeable, DUMMY);
        }
 
        @Override
-       protected void doUnRegister(Closeable closeable, Map<Closeable, Object> 
closeableMap) {
+       protected void doUnRegister(@Nonnull Closeable closeable, @Nonnull 
Map<Closeable, Object> closeableMap) {
                closeableMap.remove(closeable);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
index 8b28fa2..6097334 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetCloseableRegistry.java
@@ -27,7 +27,8 @@ import org.apache.flink.util.WrappingProxyUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.concurrent.GuardedBy;
+import javax.annotation.Nonnull;
+
 import java.io.Closeable;
 import java.io.IOException;
 import java.lang.ref.PhantomReference;
@@ -53,19 +54,17 @@ public class SafetyNetCloseableRegistry extends
 
        private static final Logger LOG = 
LoggerFactory.getLogger(SafetyNetCloseableRegistry.class);
 
-       /** Lock for accessing reaper thread and registry count */
+       /** Lock for atomic modifications to reaper thread and registry count */
        private static final Object REAPER_THREAD_LOCK = new Object();
 
        /** Singleton reaper thread takes care of all registries in VM */
-       @GuardedBy("REAPER_THREAD_LOCK")
        private static CloseableReaperThread REAPER_THREAD = null;
 
        /** Global count of all instances of SafetyNetCloseableRegistry */
-       @GuardedBy("REAPER_THREAD_LOCK")
        private static int GLOBAL_SAFETY_NET_REGISTRY_COUNT = 0;
 
-       public SafetyNetCloseableRegistry() {
-               super(new IdentityHashMap<Closeable, 
PhantomDelegatingCloseableRef>());
+       SafetyNetCloseableRegistry() {
+               super(new IdentityHashMap<>());
 
                synchronized (REAPER_THREAD_LOCK) {
                        if (0 == GLOBAL_SAFETY_NET_REGISTRY_COUNT) {
@@ -79,8 +78,8 @@ public class SafetyNetCloseableRegistry extends
 
        @Override
        protected void doRegister(
-                       WrappingProxyCloseable<? extends Closeable> 
wrappingProxyCloseable,
-                       Map<Closeable, PhantomDelegatingCloseableRef> 
closeableMap) throws IOException {
+                       @Nonnull WrappingProxyCloseable<? extends Closeable> 
wrappingProxyCloseable,
+                       @Nonnull Map<Closeable, PhantomDelegatingCloseableRef> 
closeableMap) {
 
                assert Thread.holdsLock(getSynchronizationLock());
 
@@ -100,8 +99,8 @@ public class SafetyNetCloseableRegistry extends
 
        @Override
        protected void doUnRegister(
-                       WrappingProxyCloseable<? extends Closeable> closeable,
-                       Map<Closeable, PhantomDelegatingCloseableRef> 
closeableMap) {
+               @Nonnull WrappingProxyCloseable<? extends Closeable> closeable,
+               @Nonnull Map<Closeable, PhantomDelegatingCloseableRef> 
closeableMap) {
 
                assert Thread.holdsLock(getSynchronizationLock());
 
@@ -131,7 +130,7 @@ public class SafetyNetCloseableRegistry extends
        }
 
        @VisibleForTesting
-       public static boolean isReaperThreadRunning() {
+       static boolean isReaperThreadRunning() {
                synchronized (REAPER_THREAD_LOCK) {
                        return null != REAPER_THREAD && REAPER_THREAD.isAlive();
                }
@@ -148,10 +147,10 @@ public class SafetyNetCloseableRegistry extends
                private final SafetyNetCloseableRegistry closeableRegistry;
                private final String debugString;
 
-               public PhantomDelegatingCloseableRef(
-                               WrappingProxyCloseable<? extends Closeable> 
referent,
-                               SafetyNetCloseableRegistry closeableRegistry,
-                               ReferenceQueue<? super WrappingProxyCloseable<? 
extends Closeable>> q) {
+               PhantomDelegatingCloseableRef(
+                       WrappingProxyCloseable<? extends Closeable> referent,
+                       SafetyNetCloseableRegistry closeableRegistry,
+                       ReferenceQueue<? super WrappingProxyCloseable<? extends 
Closeable>> q) {
 
                        super(referent, q);
                        this.innerCloseable = 
Preconditions.checkNotNull(WrappingProxyUtil.stripProxy(referent));
@@ -159,15 +158,13 @@ public class SafetyNetCloseableRegistry extends
                        this.debugString = referent.toString();
                }
 
-               public String getDebugString() {
+               String getDebugString() {
                        return debugString;
                }
 
                @Override
                public void close() throws IOException {
-                       synchronized 
(closeableRegistry.getSynchronizationLock()) {
-                               
closeableRegistry.closeableToRef.remove(innerCloseable);
-                       }
+                       
closeableRegistry.removeCloseableInternal(innerCloseable);
                        innerCloseable.close();
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
index f949779..4527b5e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
+++ 
b/flink-core/src/main/java/org/apache/flink/util/AbstractCloseableRegistry.java
@@ -19,9 +19,15 @@
 package org.apache.flink.util;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+
+import javax.annotation.Nonnull;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -38,11 +44,20 @@ import java.util.Map;
 @Internal
 public abstract class AbstractCloseableRegistry<C extends Closeable, T> 
implements Closeable {
 
-       protected final Map<Closeable, T> closeableToRef;
+       /** Lock that guards state of this registry. **/
+       private final Object lock;
+
+       /** Map from tracked Closeables to some associated meta data. */
+       @GuardedBy("lock")
+       private final Map<Closeable, T> closeableToRef;
+
+       /** Indicates if this registry is closed. */
+       @GuardedBy("lock")
        private boolean closed;
 
-       public AbstractCloseableRegistry(Map<Closeable, T> closeableToRef) {
-               this.closeableToRef = closeableToRef;
+       public AbstractCloseableRegistry(@Nonnull Map<Closeable, T> 
closeableToRef) {
+               this.lock = new Object();
+               this.closeableToRef = 
Preconditions.checkNotNull(closeableToRef);
                this.closed = false;
        }
 
@@ -51,7 +66,6 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
         * {@link IllegalStateException} and closes the passed {@link 
Closeable}.
         *
         * @param closeable Closeable tor register
-        * 
         * @throws IOException exception when the registry was closed before
         */
        public final void registerClosable(C closeable) throws IOException {
@@ -61,13 +75,14 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
                }
 
                synchronized (getSynchronizationLock()) {
-                       if (closed) {
-                               IOUtils.closeQuietly(closeable);
-                               throw new IOException("Cannot register 
Closeable, registry is already closed. Closing argument.");
+                       if (!closed) {
+                               doRegister(closeable, closeableToRef);
+                               return;
                        }
-
-                       doRegister(closeable, closeableToRef);
                }
+
+               IOUtils.closeQuietly(closeable);
+               throw new IOException("Cannot register Closeable, registry is 
already closed. Closing argument.");
        }
 
        /**
@@ -88,18 +103,22 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
 
        @Override
        public void close() throws IOException {
+               Collection<Closeable> toCloseCopy;
+
                synchronized (getSynchronizationLock()) {
 
                        if (closed) {
                                return;
                        }
 
-                       IOUtils.closeAllQuietly(closeableToRef.keySet());
+                       closed = true;
 
-                       closeableToRef.clear();
+                       toCloseCopy = new ArrayList<>(closeableToRef.keySet());
 
-                       closed = true;
+                       closeableToRef.clear();
                }
+
+               IOUtils.closeAllQuietly(toCloseCopy);
        }
 
        public boolean isClosed() {
@@ -108,11 +127,54 @@ public abstract class AbstractCloseableRegistry<C extends 
Closeable, T> implemen
                }
        }
 
+       /**
+        * Does the actual registration of the closeable with the registry map. 
This should not do any long running or
+        * potentially blocking operations as is is executed under the 
registry's lock.
+        */
+       protected abstract void doRegister(@Nonnull C closeable, @Nonnull 
Map<Closeable, T> closeableMap);
+
+       /**
+        * Does the actual un-registration of the closeable from the registry 
map. This should not do any long running or
+        * potentially blocking operations as is is executed under the 
registry's lock.
+        */
+       protected abstract void doUnRegister(@Nonnull C closeable, @Nonnull 
Map<Closeable, T> closeableMap);
+
+       /**
+        * Returns the lock on which manipulations to members closeableToRef 
and closeable must be synchronized.
+        */
        protected final Object getSynchronizationLock() {
-               return closeableToRef;
+               return lock;
        }
 
-       protected abstract void doUnRegister(C closeable, Map<Closeable, T> 
closeableMap);
+       /**
+        * Adds a mapping to the registry map, respecting locking.
+        */
+       protected final void addCloseableInternal(Closeable closeable, T 
metaData) {
+               synchronized (getSynchronizationLock()) {
+                       closeableToRef.put(closeable, metaData);
+               }
+       }
 
-       protected abstract void doRegister(C closeable, Map<Closeable, T> 
closeableMap) throws IOException;
+       /**
+        * Removes a mapping from the registry map, respecting locking.
+        */
+       protected final void removeCloseableInternal(Closeable closeable) {
+               synchronized (getSynchronizationLock()) {
+                       closeableToRef.remove(closeable);
+               }
+       }
+
+       @VisibleForTesting
+       public final int getNumberOfRegisteredCloseables() {
+               synchronized (getSynchronizationLock()) {
+                       return closeableToRef.size();
+               }
+       }
+
+       @VisibleForTesting
+       public final boolean isCloseableRegistered(Closeable c) {
+               synchronized (getSynchronizationLock()) {
+                       return closeableToRef.containsKey(c);
+               }
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java 
b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
index 6a79913..7493d76 100644
--- a/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
+++ b/flink-core/src/main/java/org/apache/flink/util/WrappingProxyUtil.java
@@ -27,10 +27,16 @@ public final class WrappingProxyUtil {
                throw new AssertionError();
        }
 
+       @SuppressWarnings("unchecked")
        public static <T> T stripProxy(T object) {
-               while (object instanceof WrappingProxy) {
+
+               T previous = null;
+
+               while (object instanceof WrappingProxy && previous != object) {
+                       previous = object;
                        object = ((WrappingProxy<T>) 
object).getWrappedDelegate();
                }
+
                return object;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
new file mode 100644
index 0000000..41b69c8
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/AbstractCloseableRegistryTest.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.util.AbstractCloseableRegistry;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+public abstract class AbstractCloseableRegistryTest<C extends Closeable, T> {
+
+       protected ProducerThread[] streamOpenThreads;
+       protected AbstractCloseableRegistry<C, T> closeableRegistry;
+       protected AtomicInteger unclosedCounter;
+
+       protected abstract C createCloseable();
+
+       protected abstract AbstractCloseableRegistry<C, T> createRegistry();
+
+       protected abstract ProducerThread<C, T> createProducerThread(
+               AbstractCloseableRegistry<C, T> registry,
+               AtomicInteger unclosedCounter,
+               int maxStreams);
+
+       public void setup(int maxStreams) {
+               
Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
+               this.closeableRegistry = createRegistry();
+               this.unclosedCounter = new AtomicInteger(0);
+               this.streamOpenThreads = new ProducerThread[10];
+               for (int i = 0; i < streamOpenThreads.length; ++i) {
+                       streamOpenThreads[i] = 
createProducerThread(closeableRegistry, unclosedCounter, maxStreams);
+               }
+       }
+
+       protected void startThreads() {
+               for (ProducerThread t : streamOpenThreads) {
+                       t.start();
+               }
+       }
+
+       protected void joinThreads() throws InterruptedException {
+               for (Thread t : streamOpenThreads) {
+                       t.join();
+               }
+       }
+
+       @Test
+       public void testClose() throws Exception {
+
+               setup(Integer.MAX_VALUE);
+               startThreads();
+
+               for (int i = 0; i < 5; ++i) {
+                       System.gc();
+                       Thread.sleep(40);
+               }
+
+               closeableRegistry.close();
+
+               joinThreads();
+
+               Assert.assertEquals(0, unclosedCounter.get());
+               Assert.assertEquals(0, 
closeableRegistry.getNumberOfRegisteredCloseables());
+
+               final C testCloseable = spy(createCloseable());
+
+               try {
+
+                       closeableRegistry.registerClosable(testCloseable);
+
+                       Assert.fail("Closed registry should not accept 
closeables!");
+
+               } catch (IOException expected) {
+                       //expected
+               }
+
+               Assert.assertEquals(0, unclosedCounter.get());
+               Assert.assertEquals(0, 
closeableRegistry.getNumberOfRegisteredCloseables());
+               verify(testCloseable).close();
+       }
+
+       @Test
+       public void testNonBlockingClose() throws Exception {
+               setup(Integer.MAX_VALUE);
+
+               final OneShotLatch waitRegistryClosedLatch = new OneShotLatch();
+               final OneShotLatch blockCloseLatch = new OneShotLatch();
+
+               final C spyCloseable = spy(createCloseable());
+
+               doAnswer(invocationOnMock -> {
+                       invocationOnMock.callRealMethod();
+                       waitRegistryClosedLatch.trigger();
+                       blockCloseLatch.await();
+                       return null;
+               }).when(spyCloseable).close();
+
+               closeableRegistry.registerClosable(spyCloseable);
+
+               Assert.assertEquals(1, 
closeableRegistry.getNumberOfRegisteredCloseables());
+
+               Thread closer = new Thread(() -> {
+                       try {
+                               closeableRegistry.close();
+                       } catch (IOException ignore) {
+
+                       }
+               });
+
+               closer.start();
+               waitRegistryClosedLatch.await();
+
+               final C testCloseable = spy(createCloseable());
+
+               try {
+                       closeableRegistry.registerClosable(testCloseable);
+                       Assert.fail("Closed registry should not accept 
closeables!");
+               }catch (IOException ignore) {
+               }
+
+               blockCloseLatch.trigger();
+               closer.join();
+
+               verify(spyCloseable).close();
+               verify(testCloseable).close();
+               Assert.assertEquals(0, 
closeableRegistry.getNumberOfRegisteredCloseables());
+       }
+
+       protected static abstract class ProducerThread<C extends Closeable, T> 
extends Thread {
+
+               protected final AbstractCloseableRegistry<C, T> registry;
+               protected final AtomicInteger refCount;
+               protected final int maxStreams;
+               protected int numStreams;
+
+               public ProducerThread(AbstractCloseableRegistry<C, T> registry, 
AtomicInteger refCount, int maxStreams) {
+                       this.registry = registry;
+                       this.refCount = refCount;
+                       this.maxStreams = maxStreams;
+                       this.numStreams = 0;
+               }
+
+               protected abstract void createAndRegisterStream() throws 
IOException;
+
+               @Override
+               public void run() {
+                       try {
+                               while (numStreams < maxStreams) {
+
+                                       createAndRegisterStream();
+
+                                       try {
+                                               Thread.sleep(2);
+                                       } catch (InterruptedException ignored) 
{}
+
+                                       if (maxStreams != Integer.MAX_VALUE) {
+                                               ++numStreams;
+                                       }
+                               }
+                       } catch (Exception ex) {
+                               // ignored
+                       }
+               }
+       }
+
+       protected static final class TestStream extends FSDataInputStream {
+
+               protected AtomicInteger refCount;
+
+               public TestStream(AtomicInteger refCount) {
+                       this.refCount = refCount;
+                       refCount.incrementAndGet();
+               }
+
+               @Override
+               public void seek(long desired) throws IOException {
+
+               }
+
+               @Override
+               public long getPos() throws IOException {
+                       return 0;
+               }
+
+               @Override
+               public int read() throws IOException {
+                       return 0;
+               }
+
+               @Override
+               public synchronized void close() throws IOException {
+                       if (refCount != null) {
+                               refCount.decrementAndGet();
+                               refCount = null;
+                       }
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java 
b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
new file mode 100644
index 0000000..eb8d1f4
--- /dev/null
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/CloseableRegistryTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.util.AbstractCloseableRegistry;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class CloseableRegistryTest extends 
AbstractCloseableRegistryTest<Closeable, Object> {
+
+       @Override
+       protected Closeable createCloseable() {
+               return new Closeable() {
+                       @Override
+                       public void close() throws IOException {
+
+                       }
+               };
+       }
+
+       @Override
+       protected AbstractCloseableRegistry<Closeable, Object> createRegistry() 
{
+
+               return new CloseableRegistry();
+       }
+
+       @Override
+       protected ProducerThread<Closeable, Object> createProducerThread(
+               AbstractCloseableRegistry<Closeable, Object> registry,
+               AtomicInteger unclosedCounter,
+               int maxStreams) {
+
+               return new ProducerThread<Closeable, Object>(registry, 
unclosedCounter, maxStreams) {
+                       @Override
+                       protected void createAndRegisterStream() throws 
IOException {
+                               TestStream testStream = new 
TestStream(unclosedCounter);
+                               registry.registerClosable(testStream);
+                       }
+               };
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
----------------------------------------------------------------------
diff --git 
a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
 
b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
index 05ee894..4ceda50 100644
--- 
a/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/core/fs/SafetyNetCloseableRegistryTest.java
@@ -19,7 +19,9 @@
 package org.apache.flink.core.fs;
 
 import org.apache.flink.core.testutils.CheckedThread;
+import org.apache.flink.util.AbstractCloseableRegistry;
 import org.apache.flink.util.ExceptionUtils;
+
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Rule;
@@ -30,41 +32,71 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.atomic.AtomicInteger;
 
-public class SafetyNetCloseableRegistryTest {
+public class SafetyNetCloseableRegistryTest
+       extends AbstractCloseableRegistryTest<WrappingProxyCloseable<? extends 
Closeable>,
+       SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> {
 
        @Rule
        public final TemporaryFolder tmpFolder = new TemporaryFolder();
 
-       private ProducerThread[] streamOpenThreads;
-       private SafetyNetCloseableRegistry closeableRegistry;
-       private AtomicInteger unclosedCounter;
+       @Override
+       protected WrappingProxyCloseable<? extends Closeable> createCloseable() 
{
+               return new WrappingProxyCloseable<Closeable>() {
 
-       public void setup() {
-               
Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
-               this.closeableRegistry = new SafetyNetCloseableRegistry();
-               this.unclosedCounter = new AtomicInteger(0);
-               this.streamOpenThreads = new ProducerThread[10];
-               for (int i = 0; i < streamOpenThreads.length; ++i) {
-                       streamOpenThreads[i] = new 
ProducerThread(closeableRegistry, unclosedCounter, Integer.MAX_VALUE);
-               }
+                       @Override
+                       public void close() throws IOException {
+
+                       }
+
+                       @Override
+                       public Closeable getWrappedDelegate() {
+                               return this;
+                       }
+               };
        }
 
-       @After
-       public void tearDown() {
-               
Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
+       @Override
+       protected AbstractCloseableRegistry<
+               WrappingProxyCloseable<? extends Closeable>,
+               SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> 
createRegistry() {
+
+               return new SafetyNetCloseableRegistry();
        }
 
-       private void startThreads(int maxStreams) {
-               for (ProducerThread t : streamOpenThreads) {
-                       t.setMaxStreams(maxStreams);
-                       t.start();
-               }
+       @Override
+       protected AbstractCloseableRegistryTest.ProducerThread<
+               WrappingProxyCloseable<? extends Closeable>,
+               SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> 
createProducerThread(
+               AbstractCloseableRegistry<
+                       WrappingProxyCloseable<? extends Closeable>,
+                       
SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef> registry,
+               AtomicInteger unclosedCounter,
+               int maxStreams) {
+
+               return new AbstractCloseableRegistryTest.ProducerThread
+                       <WrappingProxyCloseable<? extends Closeable>,
+                               
SafetyNetCloseableRegistry.PhantomDelegatingCloseableRef>(registry, 
unclosedCounter, maxStreams) {
+
+                       int count = 0;
+
+                       @Override
+                       protected void createAndRegisterStream() throws 
IOException {
+                               String debug = Thread.currentThread().getName() 
+ " " + count;
+                               TestStream testStream = new 
TestStream(refCount);
+
+                               // this method automatically registers the 
stream with the given registry.
+                               @SuppressWarnings("unused")
+                               ClosingFSDataInputStream pis = 
ClosingFSDataInputStream.wrapSafe(
+                                       testStream, 
(SafetyNetCloseableRegistry) registry,
+                                       debug); //reference dies here
+                               ++count;
+                       }
+               };
        }
 
-       private void joinThreads() throws InterruptedException {
-               for (Thread t : streamOpenThreads) {
-                       t.join();
-               }
+       @After
+       public void tearDown() {
+               
Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
        }
 
        @Test
@@ -133,51 +165,9 @@ public class SafetyNetCloseableRegistryTest {
        }
 
        @Test
-       public void testClose() throws Exception {
-
-               setup();
-               startThreads(Integer.MAX_VALUE);
-
-               for (int i = 0; i < 5; ++i) {
-                       System.gc();
-                       Thread.sleep(40);
-               }
-
-               closeableRegistry.close();
-
-               joinThreads();
-
-               Assert.assertEquals(0, unclosedCounter.get());
-
-               try {
-
-                       WrappingProxyCloseable<Closeable> testCloseable = new 
WrappingProxyCloseable<Closeable>() {
-                               @Override
-                               public Closeable getWrappedDelegate() {
-                                       return this;
-                               }
-
-                               @Override
-                               public void close() throws IOException {
-                                       unclosedCounter.incrementAndGet();
-                               }
-                       };
-
-                       closeableRegistry.registerClosable(testCloseable);
-
-                       Assert.fail("Closed registry should not accept 
closeables!");
-
-               } catch (IOException expected) {
-                       //expected
-               }
-
-               Assert.assertEquals(1, unclosedCounter.get());
-       }
-
-       @Test
        public void testSafetyNetClose() throws Exception {
-               setup();
-               startThreads(20);
+               setup(20);
+               startThreads();
 
                joinThreads();
 
@@ -194,95 +184,14 @@ public class SafetyNetCloseableRegistryTest {
        public void testReaperThreadSpawnAndStop() throws Exception {
                
Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
 
-               try (SafetyNetCloseableRegistry r1 = new 
SafetyNetCloseableRegistry()) {
+               try (SafetyNetCloseableRegistry ignored = new 
SafetyNetCloseableRegistry()) {
                        
Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
 
-                       try (SafetyNetCloseableRegistry r2 = new 
SafetyNetCloseableRegistry()) {
+                       try (SafetyNetCloseableRegistry ignored2 = new 
SafetyNetCloseableRegistry()) {
                                
Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
                        }
                        
Assert.assertTrue(SafetyNetCloseableRegistry.isReaperThreadRunning());
                }
                
Assert.assertFalse(SafetyNetCloseableRegistry.isReaperThreadRunning());
        }
-
-       
//------------------------------------------------------------------------------------------------------------------
-
-       private static final class ProducerThread extends Thread {
-
-               private final SafetyNetCloseableRegistry registry;
-               private final AtomicInteger refCount;
-               private int maxStreams;
-
-               public ProducerThread(SafetyNetCloseableRegistry registry, 
AtomicInteger refCount, int maxStreams) {
-                       this.registry = registry;
-                       this.refCount = refCount;
-                       this.maxStreams = maxStreams;
-               }
-
-               public int getMaxStreams() {
-                       return maxStreams;
-               }
-
-               public void setMaxStreams(int maxStreams) {
-                       this.maxStreams = maxStreams;
-               }
-
-               @Override
-               public void run() {
-                       try {
-                               int count = 0;
-                               while (maxStreams > 0) {
-                                       String debug = 
Thread.currentThread().getName() + " " + count;
-                                       TestStream testStream = new 
TestStream(refCount);
-                                       refCount.incrementAndGet();
-
-                                       @SuppressWarnings("unused")
-                                       ClosingFSDataInputStream pis = 
ClosingFSDataInputStream.wrapSafe(testStream, registry, debug); //reference 
dies here
-
-                                       try {
-                                               Thread.sleep(2);
-                                       } catch (InterruptedException ignored) 
{}
-
-                                       if (maxStreams != Integer.MAX_VALUE) {
-                                               --maxStreams;
-                                       }
-                                       ++count;
-                               }
-                       } catch (Exception ex) {
-                               // ignored
-                       }
-               }
-       }
-
-       private static final class TestStream extends FSDataInputStream {
-
-               private AtomicInteger refCount;
-
-               public TestStream(AtomicInteger refCount) {
-                       this.refCount = refCount;
-               }
-
-               @Override
-               public void seek(long desired) throws IOException {
-
-               }
-
-               @Override
-               public long getPos() throws IOException {
-                       return 0;
-               }
-
-               @Override
-               public int read() throws IOException {
-                       return 0;
-               }
-
-               @Override
-               public void close() throws IOException {
-                       if (refCount != null) {
-                               refCount.decrementAndGet();
-                               refCount = null;
-                       }
-               }
-       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/0073204b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
index 099f1f9..8934591 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StateSnapshotContextSynchronousImplTest.java
@@ -120,11 +120,11 @@ public class StateSnapshotContextSynchronousImplTest 
extends TestLogger {
 
        static final class InsightCloseableRegistry extends CloseableRegistry {
                public int size() {
-                       return closeableToRef.size();
+                       return getNumberOfRegisteredCloseables();
                }
 
                public boolean contains(Closeable closeable) {
-                       return closeableToRef.containsKey(closeable);
+                       return isCloseableRegistered(closeable);
                }
        }
 }

Reply via email to