This is an automated email from the ASF dual-hosted git repository. clebertsuconic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push: new 59c9669 ARTEMIS-2257 Synchronize SharedEventLoopGroup shutdownGracefully new 6536b42 This closes #2558 59c9669 is described below commit 59c966902c3d568f10da4f89b0a7778e4351fbdb Author: yang wei <wy96...@gmail.com> AuthorDate: Thu Feb 21 15:40:10 2019 +0800 ARTEMIS-2257 Synchronize SharedEventLoopGroup shutdownGracefully --- .../remoting/impl/netty/SharedEventLoopGroup.java | 42 +++++----- .../impl/netty/SharedEventLoopGroupTest.java | 97 ++++++++++++++++++++++ 2 files changed, 119 insertions(+), 20 deletions(-) diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java index a6f935c..24c393c 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java @@ -84,30 +84,32 @@ public class SharedEventLoopGroup extends DelegatingEventLoopGroup { @Override public Future<?> shutdownGracefully(final long l, final long l2, final TimeUnit timeUnit) { - if (channelFactoryCount.decrementAndGet() == 0) { - shutdown.compareAndSet(null, next().scheduleAtFixedRate(new Runnable() { - @Override - public void run() { - synchronized (SharedEventLoopGroup.class) { - if (shutdown.get() != null) { - Future<?> future = SharedEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit); - future.addListener(new FutureListener<Object>() { - @Override - public void operationComplete(Future<Object> future) throws Exception { - if (future.isSuccess()) { - terminationPromise.setSuccess(null); - } else { - terminationPromise.setFailure(future.cause()); + synchronized (SharedEventLoopGroup.class) { + if (channelFactoryCount.decrementAndGet() == 0) { + shutdown.compareAndSet(null, next().scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + synchronized (SharedEventLoopGroup.class) { + if (shutdown.get() != null) { + Future<?> future = SharedEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit); + future.addListener(new FutureListener<Object>() { + @Override + public void operationComplete(Future<Object> future) throws Exception { + if (future.isSuccess()) { + terminationPromise.setSuccess(null); + } else { + terminationPromise.setFailure(future.cause()); + } } - } - }); - instance = null; + }); + instance = null; + } } } - } - }, 10, 10, TimeUnit.SECONDS)); + }, 10, 10, TimeUnit.SECONDS)); + } + return terminationPromise; } - return terminationPromise; } } diff --git a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroupTest.java b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroupTest.java new file mode 100644 index 0000000..bb5cbb7 --- /dev/null +++ b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroupTest.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.remoting.impl.netty; + +import java.util.concurrent.BrokenBarrierException; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.atomic.AtomicReference; + +import io.netty.channel.EventLoop; +import io.netty.channel.nio.NioEventLoopGroup; +import org.junit.Test; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class SharedEventLoopGroupTest { + @Test + public void testSharedEventLoopGroupCreateOnShutdown() throws InterruptedException { + final CustomNioEventLoopGroup customNioEventLoopGroup = new CustomNioEventLoopGroup(); + final CyclicBarrier barrier = new CyclicBarrier(2); + + AtomicReference<SharedEventLoopGroup> sharedEventLoopGroup1 = new AtomicReference<>(); + Thread t1 = new Thread(new Runnable() { + @Override + public void run() { + sharedEventLoopGroup1.set(SharedEventLoopGroup.getInstance((threadFactory -> customNioEventLoopGroup))); + customNioEventLoopGroup.setCyclicBarrier(barrier); + sharedEventLoopGroup1.get().shutdownGracefully(); + customNioEventLoopGroup.setCyclicBarrier(null); + } + }); + t1.start(); + + AtomicReference<SharedEventLoopGroup> sharedEventLoopGroup2 = new AtomicReference<>(); + Thread t2 = new Thread(new Runnable() { + @Override + public void run() { + try { + barrier.await(); + sharedEventLoopGroup2.set(SharedEventLoopGroup.getInstance((threadFactory -> new NioEventLoopGroup(2, threadFactory)))); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + }); + t2.start(); + + t1.join(); + t2.join(); + assertTrue(sharedEventLoopGroup1.get() == sharedEventLoopGroup2.get()); + + Thread.sleep(11000); + assertFalse(sharedEventLoopGroup2.get().isShuttingDown()); + } + + private static class CustomNioEventLoopGroup extends NioEventLoopGroup { + private CyclicBarrier barrier; + + public void setCyclicBarrier(CyclicBarrier barrier) { + this.barrier = barrier; + } + + // Here we wait 1sec to imitate the race condition referred by ARTEMIS-2257 + @Override + public EventLoop next() { + if (barrier != null) { + try { + // Wait for thread t2 arriving + barrier.await(); + // Wait until thread t2 calling SharedEventLoopGroup.getInstance() + Thread.sleep(500); + } catch (InterruptedException e) { + e.printStackTrace(); + } catch (BrokenBarrierException e) { + e.printStackTrace(); + } + } + return super.next(); + } + } +} \ No newline at end of file