This is an automated email from the ASF dual-hosted git repository.

clebertsuconic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git


The following commit(s) were added to refs/heads/master by this push:
     new 59c9669  ARTEMIS-2257 Synchronize SharedEventLoopGroup 
shutdownGracefully
     new 6536b42  This closes #2558
59c9669 is described below

commit 59c966902c3d568f10da4f89b0a7778e4351fbdb
Author: yang wei <wy96...@gmail.com>
AuthorDate: Thu Feb 21 15:40:10 2019 +0800

    ARTEMIS-2257 Synchronize SharedEventLoopGroup shutdownGracefully
---
 .../remoting/impl/netty/SharedEventLoopGroup.java  | 42 +++++-----
 .../impl/netty/SharedEventLoopGroupTest.java       | 97 ++++++++++++++++++++++
 2 files changed, 119 insertions(+), 20 deletions(-)

diff --git 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java
 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java
index a6f935c..24c393c 100644
--- 
a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java
+++ 
b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroup.java
@@ -84,30 +84,32 @@ public class SharedEventLoopGroup extends 
DelegatingEventLoopGroup {
 
    @Override
    public Future<?> shutdownGracefully(final long l, final long l2, final 
TimeUnit timeUnit) {
-      if (channelFactoryCount.decrementAndGet() == 0) {
-         shutdown.compareAndSet(null, next().scheduleAtFixedRate(new 
Runnable() {
-            @Override
-            public void run() {
-               synchronized (SharedEventLoopGroup.class) {
-                  if (shutdown.get() != null) {
-                     Future<?> future = 
SharedEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit);
-                     future.addListener(new FutureListener<Object>() {
-                        @Override
-                        public void operationComplete(Future<Object> future) 
throws Exception {
-                           if (future.isSuccess()) {
-                              terminationPromise.setSuccess(null);
-                           } else {
-                              terminationPromise.setFailure(future.cause());
+      synchronized (SharedEventLoopGroup.class) {
+         if (channelFactoryCount.decrementAndGet() == 0) {
+            shutdown.compareAndSet(null, next().scheduleAtFixedRate(new 
Runnable() {
+               @Override
+               public void run() {
+                  synchronized (SharedEventLoopGroup.class) {
+                     if (shutdown.get() != null) {
+                        Future<?> future = 
SharedEventLoopGroup.super.shutdownGracefully(l, l2, timeUnit);
+                        future.addListener(new FutureListener<Object>() {
+                           @Override
+                           public void operationComplete(Future<Object> 
future) throws Exception {
+                              if (future.isSuccess()) {
+                                 terminationPromise.setSuccess(null);
+                              } else {
+                                 terminationPromise.setFailure(future.cause());
+                              }
                            }
-                        }
-                     });
-                     instance = null;
+                        });
+                        instance = null;
+                     }
                   }
                }
-            }
 
-         }, 10, 10, TimeUnit.SECONDS));
+            }, 10, 10, TimeUnit.SECONDS));
+         }
+         return terminationPromise;
       }
-      return terminationPromise;
    }
 }
diff --git 
a/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroupTest.java
 
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroupTest.java
new file mode 100644
index 0000000..bb5cbb7
--- /dev/null
+++ 
b/artemis-core-client/src/test/java/org/apache/activemq/artemis/core/remoting/impl/netty/SharedEventLoopGroupTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.core.remoting.impl.netty;
+
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.netty.channel.EventLoop;
+import io.netty.channel.nio.NioEventLoopGroup;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class SharedEventLoopGroupTest {
+   @Test
+   public void testSharedEventLoopGroupCreateOnShutdown() throws 
InterruptedException {
+      final CustomNioEventLoopGroup customNioEventLoopGroup = new 
CustomNioEventLoopGroup();
+      final CyclicBarrier barrier = new CyclicBarrier(2);
+
+      AtomicReference<SharedEventLoopGroup> sharedEventLoopGroup1 = new 
AtomicReference<>();
+      Thread t1 = new Thread(new Runnable() {
+         @Override
+         public void run() {
+            
sharedEventLoopGroup1.set(SharedEventLoopGroup.getInstance((threadFactory -> 
customNioEventLoopGroup)));
+            customNioEventLoopGroup.setCyclicBarrier(barrier);
+            sharedEventLoopGroup1.get().shutdownGracefully();
+            customNioEventLoopGroup.setCyclicBarrier(null);
+         }
+      });
+      t1.start();
+
+      AtomicReference<SharedEventLoopGroup> sharedEventLoopGroup2 = new 
AtomicReference<>();
+      Thread t2 = new Thread(new Runnable() {
+         @Override
+         public void run() {
+            try {
+               barrier.await();
+               
sharedEventLoopGroup2.set(SharedEventLoopGroup.getInstance((threadFactory -> 
new NioEventLoopGroup(2, threadFactory))));
+            } catch (InterruptedException e) {
+               e.printStackTrace();
+            } catch (BrokenBarrierException e) {
+               e.printStackTrace();
+            }
+         }
+      });
+      t2.start();
+
+      t1.join();
+      t2.join();
+      assertTrue(sharedEventLoopGroup1.get() == sharedEventLoopGroup2.get());
+
+      Thread.sleep(11000);
+      assertFalse(sharedEventLoopGroup2.get().isShuttingDown());
+   }
+
+   private static class CustomNioEventLoopGroup extends NioEventLoopGroup {
+      private CyclicBarrier barrier;
+
+      public void setCyclicBarrier(CyclicBarrier barrier) {
+         this.barrier = barrier;
+      }
+
+      // Here we wait 1sec to imitate the race condition referred by 
ARTEMIS-2257
+      @Override
+      public EventLoop next() {
+         if (barrier != null) {
+            try {
+               // Wait for thread t2 arriving
+               barrier.await();
+               // Wait until thread t2 calling 
SharedEventLoopGroup.getInstance()
+               Thread.sleep(500);
+            } catch (InterruptedException e) {
+               e.printStackTrace();
+            } catch (BrokenBarrierException e) {
+               e.printStackTrace();
+            }
+         }
+         return super.next();
+      }
+   }
+}
\ No newline at end of file

Reply via email to