Repository: qpid-proton Updated Branches: refs/heads/proton-j-reactor 3c3799238 -> 2d3cefc45
PROTON-881: make unittests check for Java I/O object leaks Implements something similar to pn_io so that the various Java I/O resources are created in a single place. Reactor unit tests are parameterized to run once with the reactor implementation returned from Proton.reactor() and once from a unittest extension of ReactorImpl which checks that all Java I/O resources, used by testcase, are closed. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2d3cefc4 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2d3cefc4 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2d3cefc4 Branch: refs/heads/proton-j-reactor Commit: 2d3cefc4562e2246a9007d500cf2885ad9b8adc6 Parents: 3c37992 Author: Adrian Preston <[email protected]> Authored: Mon May 18 23:07:22 2015 +0100 Committer: Adrian Preston <[email protected]> Committed: Mon May 18 23:07:22 2015 +0100 ---------------------------------------------------------------------- .../org/apache/qpid/proton/reactor/Reactor.java | 6 - .../qpid/proton/reactor/impl/AcceptorImpl.java | 8 +- .../org/apache/qpid/proton/reactor/impl/IO.java | 44 +++++++ .../qpid/proton/reactor/impl/IOHandler.java | 8 +- .../apache/qpid/proton/reactor/impl/IOImpl.java | 52 ++++++++ .../qpid/proton/reactor/impl/ReactorImpl.java | 15 ++- .../qpid/proton/reactor/impl/SelectorImpl.java | 6 +- .../apache/qpid/proton/reactor/ReactorTest.java | 63 ++++++++-- .../proton/reactor/impl/AcceptorImplTest.java | 32 ++--- .../proton/reactor/impl/LeakTestReactor.java | 118 +++++++++++++++++++ 10 files changed, 301 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java index 5756b34..d6cca72 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Reactor.java @@ -56,12 +56,6 @@ public interface Reactor { public void setHandler(Handler handler); - // XXX: The C reactor has a pn_reactor_io() function. The closest Java equivalent - // would be a factory for creating SocketChannel's, ServerSocketChannelsm and Selectors. - // This seems like overkill unless there's a use for this in unit testing, or the - // Reactor needs to be integrated with an exotic Java environment which provides its - // own networking implementation. - public Set<ReactorChild> children(); public Collector collector(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java index 7300b50..12006ad 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/AcceptorImpl.java @@ -90,14 +90,8 @@ public class AcceptorImpl implements Acceptor { private final Selectable sel; - // Split out from AcceptorImpl to make it easier for unittests to mock this class - // without having to open an actual port. - protected ServerSocketChannel openServerSocket() throws IOException { - return ServerSocketChannel.open(); - } - protected AcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException { - ServerSocketChannel ssc = openServerSocket(); + ServerSocketChannel ssc = ((ReactorImpl)reactor).getIO().serverSocketChannel(); ssc.bind(new InetSocketAddress(host, port)); sel = ((ReactorImpl)reactor).selectable(this); sel.setChannel(ssc); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java new file mode 100644 index 0000000..1028ae8 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IO.java @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor.impl; + +import java.io.IOException; +import java.nio.channels.Pipe; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +// Java equivalent to pn_io. +// This is, currently, in the reactor.impl package because it is not +// used elsewhere in the proton-j codebase. Instead it is present to +// facilitate mocking of various Java I/O related resources so that +// the unit tests can check for leaks. +public interface IO { + + Pipe pipe() throws IOException; + + Selector selector() throws IOException; + + ServerSocketChannel serverSocketChannel() throws IOException; + + SocketChannel socketChannel() throws IOException; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java index ed14628..f810742 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOHandler.java @@ -102,7 +102,7 @@ public class IOHandler extends BaseHandler { Transport transport = event.getConnection().getTransport(); Socket socket = null; // In this case, 'null' is the proton-j equivalent of PN_INVALID_SOCKET try { - SocketChannel socketChannel = SocketChannel.open(); + SocketChannel socketChannel = ((ReactorImpl)reactor).getIO().socketChannel(); socketChannel.connect(new InetSocketAddress(hostname, port)); socket = socketChannel.socket(); } catch(IOException ioException) { @@ -166,8 +166,6 @@ public class IOHandler extends BaseHandler { Transport transport = selectable.getTransport(); int capacity = transport.capacity(); if (capacity > 0) { - // TODO: we shouldn't be doing this cast. Instead - selectable should return an - // object with 1) a getter for the SelectableChannel, 2) read/write methods. SocketChannel socketChannel = (SocketChannel)selectable.getChannel(); try { int n = socketChannel.read(transport.tail()); @@ -200,7 +198,7 @@ public class IOHandler extends BaseHandler { Transport transport = selectable.getTransport(); int pending = transport.pending(); if (pending > 0) { - SocketChannel channel = (SocketChannel)selectable.getChannel(); // TODO: can't rely on this cast always working! + SocketChannel channel = (SocketChannel)selectable.getChannel(); try { int n = channel.write(transport.head()); if (n < 0) { @@ -299,7 +297,7 @@ public class IOHandler extends BaseHandler { ReactorImpl reactor = (ReactorImpl)event.getReactor(); Selector selector = reactor.getSelector(); if (selector == null) { - selector = new SelectorImpl(); + selector = new SelectorImpl(reactor.getIO()); reactor.setSelector(selector); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java new file mode 100644 index 0000000..6376b16 --- /dev/null +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/IOImpl.java @@ -0,0 +1,52 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor.impl; + +import java.io.IOException; +import java.nio.channels.Pipe; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; + +public class IOImpl implements IO { + + @Override + public Pipe pipe() throws IOException { + return Pipe.open(); + } + + @Override + public Selector selector() throws IOException { + return Selector.open(); + } + + @Override + public ServerSocketChannel serverSocketChannel() throws IOException { + return ServerSocketChannel.open(); + } + + @Override + public SocketChannel socketChannel() throws IOException { + return SocketChannel.open(); + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java index e48bd3b..db20601 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/ReactorImpl.java @@ -63,6 +63,7 @@ public class ReactorImpl implements Reactor, Extendable { private final Pipe wakeup; private Selector selector; private Record attachments; + private final IO io; @Override public long mark() { @@ -75,18 +76,23 @@ public class ReactorImpl implements Reactor, Extendable { return now; } - public ReactorImpl() throws IOException { + protected ReactorImpl(IO io) throws IOException { collector = (CollectorImpl)Proton.collector(); global = new IOHandler(); handler = new BaseHandler(); children = new HashSet<ReactorChild>(); selectables = 0; timer = new Timer(collector); - wakeup = Pipe.open(); + this.io = io; + wakeup = this.io.pipe(); mark(); attachments = new RecordImpl(); } + public ReactorImpl() throws IOException { + this(new IOImpl()); + } + @Override public void free() { if (wakeup.source().isOpen()) { @@ -113,6 +119,7 @@ public class ReactorImpl implements Reactor, Extendable { } } + @Override public Record attachments() { return attachments; } @@ -412,4 +419,8 @@ public class ReactorImpl implements Reactor, Extendable { public Acceptor acceptor(String host, int port, Handler handler) throws IOException { return new AcceptorImpl(this, host, port, handler); } + + public IO getIO() { + return io; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java index 28ea1ae..1145158 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/SelectorImpl.java @@ -30,7 +30,7 @@ import java.util.Iterator; import org.apache.qpid.proton.reactor.Selectable; import org.apache.qpid.proton.reactor.Selector; -public class SelectorImpl implements Selector { +class SelectorImpl implements Selector { private final java.nio.channels.Selector selector; private final HashSet<Selectable> selectables = new HashSet<Selectable>(); @@ -39,8 +39,8 @@ public class SelectorImpl implements Selector { private final HashSet<Selectable> expired = new HashSet<Selectable>(); private final HashSet<Selectable> error = new HashSet<Selectable>(); - public SelectorImpl() throws IOException { - selector = java.nio.channels.Selector.open(); + protected SelectorImpl(IO io) throws IOException { + selector = io.selector(); } @Override http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java index 76fc632..c784849 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/ReactorTest.java @@ -29,6 +29,8 @@ import static org.junit.Assert.assertTrue; import java.io.IOException; import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.engine.BaseHandler; @@ -40,10 +42,61 @@ import org.apache.qpid.proton.engine.Handler; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.reactor.impl.AcceptorImpl; +import org.apache.qpid.proton.reactor.impl.LeakTestReactor; +import org.junit.After; +import org.junit.Before; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +@RunWith(Parameterized.class) public class ReactorTest { + public ReactorFactory reactorFactory; + private Reactor reactor; + + private static interface ReactorFactory { + Reactor newReactor() throws IOException; + } + + // Parameterize the tests, and run them once with a reactor obtained by calling + // 'Proton.reactor()' and once with the LeakTestReactor. + @Parameters + public static Collection<ReactorFactory[]> data() throws IOException { + ReactorFactory classicReactor = new ReactorFactory() { + @Override public Reactor newReactor() throws IOException { + return Proton.reactor(); + } + }; + ReactorFactory newLeakDetection = new ReactorFactory() { + @Override public Reactor newReactor() throws IOException { + return new LeakTestReactor(); + } + }; + return Arrays.asList(new ReactorFactory[][]{{classicReactor}, {newLeakDetection}}); + } + + public ReactorTest(ReactorFactory reactorFactory) { + this.reactorFactory = reactorFactory; + } + + @Before + public void before() throws IOException { + reactor = reactorFactory.newReactor(); + } + + private void checkForLeaks() { + if (reactor instanceof LeakTestReactor) { + ((LeakTestReactor)reactor).assertNoLeaks(); + } + } + + @After + public void after() { + checkForLeaks(); + } + /** * Tests that creating a reactor and running it: * <ul> @@ -54,7 +107,6 @@ public class ReactorTest { */ @Test public void runEmpty() throws IOException { - Reactor reactor = Proton.reactor(); assertNotNull(reactor); reactor.run(); reactor.free(); @@ -81,7 +133,6 @@ public class ReactorTest { */ @Test public void handlerRun() throws IOException { - Reactor reactor = Proton.reactor(); Handler handler = reactor.getHandler(); assertNotNull(handler); TestHandler testHandler = new TestHandler(); @@ -104,7 +155,6 @@ public class ReactorTest { */ @Test public void connection() throws IOException { - Reactor reactor = Proton.reactor(); TestHandler connectionHandler = new TestHandler(); Connection connection = reactor.connection(connectionHandler); assertNotNull(connection); @@ -129,7 +179,6 @@ public class ReactorTest { */ @Test public void acceptor() throws IOException { - Reactor reactor = Proton.reactor(); final Acceptor acceptor = reactor.acceptor("127.0.0.1", 0); assertNotNull(acceptor); assertTrue("acceptor should be one of the reactor's children", reactor.children().contains(acceptor)); @@ -176,8 +225,6 @@ public class ReactorTest { */ @Test public void connect() throws IOException { - Reactor reactor = Proton.reactor(); - ServerHandler sh = new ServerHandler(); Acceptor acceptor = reactor.acceptor("127.0.0.1", 0, sh); final int listeningPort = ((AcceptorImpl)acceptor).getPortNumber(); @@ -288,7 +335,7 @@ public class ReactorTest { } private void transfer(int count, int window) throws IOException { - Reactor reactor = Proton.reactor(); + reactor = reactorFactory.newReactor(); ServerHandler sh = new ServerHandler(); Acceptor acceptor = reactor.acceptor("127.0.0.1", 0, sh); sh.setAcceptor(acceptor); @@ -305,6 +352,7 @@ public class ReactorTest { reactor.run(); reactor.free(); assertEquals("Did not receive the expected number of messages", count, snk.received); + checkForLeaks(); } @Test @@ -326,7 +374,6 @@ public class ReactorTest { @Test public void schedule() throws IOException { - Reactor reactor = Proton.reactor(); TestHandler reactorHandler = new TestHandler(); reactor.getHandler().add(reactorHandler); TestHandler taskHandler = new TestHandler(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java index 85750ea..9ac0538 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/AcceptorImplTest.java @@ -24,8 +24,6 @@ package org.apache.qpid.proton.reactor.impl; import java.io.IOException; import java.nio.channels.ServerSocketChannel; -import org.apache.qpid.proton.engine.Handler; -import org.apache.qpid.proton.reactor.Reactor; import org.apache.qpid.proton.reactor.ReactorChild; import org.apache.qpid.proton.reactor.Selectable.Callback; import org.junit.Test; @@ -44,21 +42,18 @@ public class AcceptorImplTest { final SelectableImpl selectable = new SelectableImpl(); selectable.onError(mockCallback); ReactorImpl mockReactor = Mockito.mock(ReactorImpl.class); - Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable); - class MockAcceptorImpl extends AcceptorImpl { - - protected MockAcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException { - super(reactor, host, port, handler); - } - + class MockIO extends IOImpl { @Override - protected ServerSocketChannel openServerSocket() throws IOException { + public ServerSocketChannel serverSocketChannel() throws IOException { ServerSocketChannel result = Mockito.mock(ServerSocketChannel.class); Mockito.when(result.accept()).thenThrow(new IOException()); return result; } } - new MockAcceptorImpl(mockReactor, "host", 1234, null); + IO mockIO = new MockIO(); + Mockito.when(mockReactor.getIO()).thenReturn(mockIO); + Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable); + new AcceptorImpl(mockReactor, "host", 1234, null); selectable.readable(); Mockito.verify(mockCallback).run(selectable); } @@ -75,21 +70,18 @@ public class AcceptorImplTest { final SelectableImpl selectable = new SelectableImpl(); selectable.onError(mockCallback); ReactorImpl mockReactor = Mockito.mock(ReactorImpl.class); - Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable); - class MockAcceptorImpl extends AcceptorImpl { - - protected MockAcceptorImpl(Reactor reactor, String host, int port, Handler handler) throws IOException { - super(reactor, host, port, handler); - } - + class MockIO extends IOImpl { @Override - protected ServerSocketChannel openServerSocket() throws IOException { + public ServerSocketChannel serverSocketChannel() throws IOException { ServerSocketChannel result = Mockito.mock(ServerSocketChannel.class); Mockito.when(result.accept()).thenReturn(null); return result; } } - new MockAcceptorImpl(mockReactor, "host", 1234, null); + IO mockIO = new MockIO(); + Mockito.when(mockReactor.getIO()).thenReturn(mockIO); + Mockito.when(mockReactor.selectable(Mockito.any(ReactorChild.class))).thenReturn(selectable); + new AcceptorImpl(mockReactor, "host", 1234, null); selectable.readable(); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d3cefc4/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java new file mode 100644 index 0000000..d06ec0a --- /dev/null +++ b/proton-j/src/test/java/org/apache/qpid/proton/reactor/impl/LeakTestReactor.java @@ -0,0 +1,118 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.proton.reactor.impl; + +import java.io.IOException; +import java.nio.channels.Pipe; +import java.nio.channels.Pipe.SinkChannel; +import java.nio.channels.Pipe.SourceChannel; +import java.nio.channels.Selector; +import java.nio.channels.ServerSocketChannel; +import java.nio.channels.SocketChannel; +import java.util.HashMap; +import java.util.Map.Entry; + +import junit.framework.AssertionFailedError; + +// Extends the Reactor to substitute a unit-test implementation of the +// IO class. This detects, and reports, situations where the reactor code +// fails to close one of the Java I/O related resources that it has created. +public class LeakTestReactor extends ReactorImpl { + + private static class TestIO implements IO { + + private final HashMap<Object, Exception> resources = new HashMap<Object, Exception>(); + + @Override + public Pipe pipe() throws IOException { + Pipe pipe = Pipe.open(); + resources.put(pipe.source(), new Exception()); + resources.put(pipe.sink(), new Exception()); + return pipe; + } + + @Override + public Selector selector() throws IOException { + Selector selector = Selector.open(); + resources.put(selector, new Exception()); + return selector; + + } + + @Override + public ServerSocketChannel serverSocketChannel() throws IOException { + ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); + resources.put(serverSocketChannel, new Exception()); + return serverSocketChannel; + } + + @Override + public SocketChannel socketChannel() throws IOException { + SocketChannel socketChannel = SocketChannel.open(); + resources.put(socketChannel, new Exception()); + return socketChannel; + } + + private boolean isOpen(Object resource) { + if (resource instanceof SourceChannel) { + return ((SourceChannel)resource).isOpen(); + } else if (resource instanceof SinkChannel) { + return ((SinkChannel)resource).isOpen(); + } else if (resource instanceof Selector) { + return ((Selector)resource).isOpen(); + } else if (resource instanceof ServerSocketChannel) { + return ((ServerSocketChannel)resource).isOpen(); + } else if (resource instanceof SocketChannel) { + return ((SocketChannel)resource).isOpen(); + } else { + throw new AssertionFailedError("Don't know how to check if this type is open: " + resource.getClass()); + } + } + + protected void assertNoLeaks() throws AssertionFailedError { + boolean fail = false; + for (Entry<Object, Exception> entry : resources.entrySet()) { + if (isOpen(entry.getKey())) { + System.out.println("Leaked an instance of '" + entry.getKey() + "' from:"); + entry.getValue().printStackTrace(System.out); + fail = true; + } + } + if (fail) { + throw new AssertionFailedError("Resources leaked"); + } + resources.clear(); + } + } + + private final TestIO testIO; + + public LeakTestReactor() throws IOException { + super(new TestIO()); + testIO = (TestIO)getIO(); + } + + public void assertNoLeaks() throws AssertionFailedError { + testIO.assertNoLeaks(); + } + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
