This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch faster-queue
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git


The following commit(s) were added to refs/heads/faster-queue by this push:
     new 78339d3  custom QueueSynchronizer instead of 
ReentrantLock+Semaphore+CyclicBarrier
78339d3 is described below

commit 78339d3c7ff19bdb29d8761b720d88d65fd840ba
Author: tibordigana <tibordig...@apache.org>
AuthorDate: Tue Jul 14 02:22:31 2020 +0200

    custom QueueSynchronizer instead of ReentrantLock+Semaphore+CyclicBarrier
---
 .../output/ThreadedStreamConsumer.java             |  50 ++++----
 .../output/ThreadedStreamConsumerTest.java         | 129 +++++++++++++++++++--
 2 files changed, 147 insertions(+), 32 deletions(-)

diff --git 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 34d3a31..5e1968f 100644
--- 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -28,40 +28,34 @@ import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Condition;
 import java.util.concurrent.locks.ReentrantLock;
 
-import static java.util.concurrent.TimeUnit.SECONDS;
-
 /**
  * Knows how to reconstruct *all* the state transmitted over stdout by the 
forked process.
  *
  * @author Kristian Rosenvold
  */
 public final class ThreadedStreamConsumer
-        implements EventHandler<Event>, Closeable
+    implements EventHandler<Event>, Closeable
 {
     private static final int QUEUE_MAX_ITEMS = 10_000;
     private static final Event END_ITEM = new FinalEvent();
 
     private final ConcurrentLinkedDeque<Event> queue = new 
ConcurrentLinkedDeque<>();
-
     private final ReentrantLock queueLock = new ReentrantLock();
-
     private final Condition queueCondition = queueLock.newCondition();
-
+    private final Semaphore producerBarrier = new Semaphore( 0 );
     private final AtomicInteger queueSize = new AtomicInteger();
-
     private final AtomicBoolean stop = new AtomicBoolean();
-
     private final Thread thread;
-
     private final Pumper pumper;
 
     final class Pumper
-            implements Runnable
+        implements Runnable
     {
         private final EventHandler<Event> target;
 
@@ -86,9 +80,10 @@ public final class ThreadedStreamConsumer
         @Override
         public void run()
         {
+            queueLock.lock();
+            //todo CyclicBarrier here
             try
             {
-                queueLock.lock();
                 while ( !stop.get() || !queue.isEmpty() )
                 {
                     try
@@ -97,7 +92,8 @@ public final class ThreadedStreamConsumer
 
                         if ( item == null )
                         {
-                            queueCondition.await( 1L, SECONDS );
+                            queueCondition.await();
+                            producerBarrier.release();
                             continue;
                         }
                         else
@@ -145,6 +141,7 @@ public final class ThreadedStreamConsumer
     @Override
     public void handleEvent( @Nonnull Event event )
     {
+        //todo CyclicBarrier here
         if ( stop.get() )
         {
             return;
@@ -156,17 +153,27 @@ public final class ThreadedStreamConsumer
         }
 
         int count = queueSize.get();
-        if ( count == 0 || count >= QUEUE_MAX_ITEMS )
+        boolean min = count == 0;
+        boolean max = count >= QUEUE_MAX_ITEMS;
+        if ( min || max )
         {
+            queueLock.lock();
             try
             {
-                queueLock.lock();
-                updateAndNotifyReader( event );
+                queueSize.incrementAndGet();
+                queue.addLast( event );
+                producerBarrier.drainPermits();
+                queueCondition.signal();
             }
             finally
             {
                 queueLock.unlock();
             }
+
+            if ( max )
+            {
+                producerBarrier.acquireUninterruptibly();
+            }
         }
         else
         {
@@ -175,29 +182,24 @@ public final class ThreadedStreamConsumer
         }
     }
 
-    private void updateAndNotifyReader( @Nonnull Event event )
-    {
-        queueSize.incrementAndGet();
-        queue.addLast( event );
-        queueCondition.signal();
-    }
-
     @Override
     public void close()
-            throws IOException
+        throws IOException
     {
         if ( stop.compareAndSet( false, true ) )
         {
             queue.addLast( END_ITEM );
+            queueLock.lock();
             try
             {
-                queueLock.lock();
                 queueCondition.signal();
             }
             finally
             {
                 queueLock.unlock();
             }
+
+            producerBarrier.release( 2 );
         }
 
         if ( pumper.hasErrors() )
diff --git 
a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
 
b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
index 54491a4..83b6c50 100644
--- 
a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
+++ 
b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
@@ -22,13 +22,18 @@ package 
org.apache.maven.plugin.surefire.booterclient.output;
 import org.apache.maven.surefire.api.event.Event;
 import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent;
 import org.apache.maven.surefire.extensions.EventHandler;
+import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
-
+import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
 
 import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
+import static org.fest.assertions.Assertions.assertThat;
 
 /**
  *
@@ -37,38 +42,146 @@ import static 
org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
 public class ThreadedStreamConsumerTest
 {
     @Test
+    @Ignore
     public void test() throws Exception
     {
+        final CountDownLatch countDown = new CountDownLatch( 100_000 );
         EventHandler<Event> handler = new EventHandler<Event>()
         {
-            private int i;
+            private final AtomicInteger i = new AtomicInteger();
 
             @Override
             public void handleEvent( @Nonnull Event event )
             {
+                //System.out.println(i.get());
+                countDown.countDown();
                 try
                 {
-                    System.out.println( Thread.currentThread() );
-                    if ( i++ % 5000 == 0 )
+                    if ( i.incrementAndGet() % 11_000 == 0 )
                     {
-                        TimeUnit.MILLISECONDS.sleep( 500L );
+                        TimeUnit.MILLISECONDS.sleep( 10L );
                     }
                 }
                 catch ( InterruptedException e )
                 {
-                    e.printStackTrace();
+                    throw new IllegalStateException( e );
                 }
             }
         };
 
         ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer( 
handler );
 
-        for ( int i = 0; i < 11_000; i++ )
+        for ( int i = 0; i < 100_000; i++ )
         {
-            System.out.println( i );
             streamConsumer.handleEvent( new StandardStreamOutWithNewLineEvent( 
NORMAL_RUN, "" ) );
         }
 
+        assertThat( countDown.await( 10L, TimeUnit.MINUTES ) ).isTrue();
+
         streamConsumer.close();
     }
+
+    @Test
+    public void test3() throws Exception
+    {
+        QueueSynchronizer<String> sync = new QueueSynchronizer<>( 2 );
+        sync.pushNext( "1" );
+        sync.pushNext( "2" );
+        //sync.pushNext( "3" );
+        String s1 = sync.awaitNext();
+        String s2 = sync.awaitNext();
+        //String s3 = sync.awaitNext();
+    }
+
+    static class QueueSynchronizer<T>
+    {
+        private final AtomicInteger queueSize = new AtomicInteger();
+
+        QueueSynchronizer( int max )
+        {
+            this.max = max;
+        }
+
+        private class SyncT1 extends AbstractQueuedSynchronizer
+        {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected int tryAcquireShared( int arg )
+            {
+                return queueSize.get() == 0 ? -1 : 1;
+            }
+
+            @Override
+            protected boolean tryReleaseShared( int arg )
+            {
+                return true;
+            }
+
+            void waitIfZero() throws InterruptedException
+            {
+                acquireSharedInterruptibly( 1 );
+            }
+
+            void release()
+            {
+                releaseShared( 0 );
+            }
+        }
+
+        private class SyncT2 extends AbstractQueuedSynchronizer
+        {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected int tryAcquireShared( int arg )
+            {
+                return queueSize.get() < max ? 1 : -1;
+            }
+
+            @Override
+            protected boolean tryReleaseShared( int arg )
+            {
+                return true;
+            }
+
+            void awaitMax()
+            {
+                acquireShared( 1 );
+            }
+
+            void tryRelease()
+            {
+                if ( queueSize.get() == 0 )
+                {
+                    releaseShared( 0 );
+                }
+            }
+        }
+
+        private final SyncT1 t1 = new SyncT1();
+        private final SyncT2 t2 = new SyncT2();
+        private final ConcurrentLinkedDeque<T> queue = new 
ConcurrentLinkedDeque<>();
+        private final int max;
+
+        void pushNext( T t )
+        {
+            queue.addLast( t );
+            int previousCount = queueSize.get();
+            t2.awaitMax();
+            queueSize.incrementAndGet();
+            if ( previousCount == 0 )
+            {
+                t1.release();
+            }
+        }
+
+        T awaitNext() throws InterruptedException
+        {
+            t2.tryRelease();
+            t1.waitIfZero();
+            queueSize.decrementAndGet();
+            return queue.pollFirst();
+        }
+    }
 }

Reply via email to