This is an automated email from the ASF dual-hosted git repository.

tibordigana pushed a commit to branch faster-queue
in repository https://gitbox.apache.org/repos/asf/maven-surefire.git


The following commit(s) were added to refs/heads/faster-queue by this push:
     new 2515839  finished ThreadedStreamConsumer and the test
2515839 is described below

commit 2515839106580f7c0ad81661ed29fd684c636acc
Author: tibordigana <tibordig...@apache.org>
AuthorDate: Fri Jul 17 02:43:15 2020 +0200

    finished ThreadedStreamConsumer and the test
---
 .../output/ThreadedStreamConsumer.java             | 239 +++++++++++++--------
 .../output/ThreadedStreamConsumerTest.java         | 189 ++++------------
 .../org/apache/maven/surefire/JUnit4SuiteTest.java |   2 +
 3 files changed, 198 insertions(+), 232 deletions(-)

diff --git 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
index 5e1968f..1114948 100644
--- 
a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
+++ 
b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java
@@ -20,7 +20,6 @@ package org.apache.maven.plugin.surefire.booterclient.output;
  */
 
 import org.apache.maven.surefire.api.event.Event;
-import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory;
 import org.apache.maven.surefire.extensions.EventHandler;
 import org.apache.maven.surefire.shared.utils.cli.StreamConsumer;
 
@@ -28,14 +27,19 @@ import javax.annotation.Nonnull;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedDeque;
-import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Condition;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+import static 
org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread;
 
 /**
- * Knows how to reconstruct *all* the state transmitted over stdout by the 
forked process.
+ * Knows how to reconstruct *all* the state transmitted over Channel by the 
forked process.
+ * <br>
+ * After applying the performance improvements with {@link QueueSynchronizer} 
the throughput becomes
+ * 6.33 mega messages per second
+ * (158 nano seconds per message, 5 million messages within 0.79 seconds - see 
the test ThreadedStreamConsumerTest)
+ * on CPU i5 Dual Core 2.6 GHz and Oracle JDK 11.
  *
  * @author Kristian Rosenvold
  */
@@ -45,13 +49,9 @@ public final class ThreadedStreamConsumer
     private static final int QUEUE_MAX_ITEMS = 10_000;
     private static final Event END_ITEM = new FinalEvent();
 
-    private final ConcurrentLinkedDeque<Event> queue = new 
ConcurrentLinkedDeque<>();
-    private final ReentrantLock queueLock = new ReentrantLock();
-    private final Condition queueCondition = queueLock.newCondition();
-    private final Semaphore producerBarrier = new Semaphore( 0 );
-    private final AtomicInteger queueSize = new AtomicInteger();
+    private final QueueSynchronizer<Event> synchronizer = new 
QueueSynchronizer<>( QUEUE_MAX_ITEMS, END_ITEM );
     private final AtomicBoolean stop = new AtomicBoolean();
-    private final Thread thread;
+    private final AtomicBoolean isAlive = new AtomicBoolean( true );
     private final Pumper pumper;
 
     final class Pumper
@@ -80,44 +80,26 @@ public final class ThreadedStreamConsumer
         @Override
         public void run()
         {
-            queueLock.lock();
-            //todo CyclicBarrier here
-            try
+            while ( !stop.get() || !synchronizer.isEmptyQueue() )
             {
-                while ( !stop.get() || !queue.isEmpty() )
+                try
                 {
-                    try
-                    {
-                        Event item = queue.pollFirst();
-
-                        if ( item == null )
-                        {
-                            queueCondition.await();
-                            producerBarrier.release();
-                            continue;
-                        }
-                        else
-                        {
-                            queueSize.decrementAndGet();
-                        }
-
-                        if ( shouldStopQueueing( item ) )
-                        {
-                            break;
-                        }
-
-                        target.handleEvent( item );
-                    }
-                    catch ( Throwable t )
+                    Event item = synchronizer.awaitNext();
+
+                    if ( shouldStopQueueing( item ) )
                     {
-                        errors.addException( t );
+                        break;
                     }
+
+                    target.handleEvent( item );
+                }
+                catch ( Throwable t )
+                {
+                    errors.addException( t );
                 }
             }
-            finally
-            {
-                queueLock.unlock();
-            }
+
+            isAlive.set( false );
         }
 
         boolean hasErrors()
@@ -134,52 +116,26 @@ public final class ThreadedStreamConsumer
     public ThreadedStreamConsumer( EventHandler<Event> target )
     {
         pumper = new Pumper( target );
-        thread = DaemonThreadFactory.newDaemonThread( pumper, 
"ThreadedStreamConsumer" );
+        Thread thread = newDaemonThread( pumper, "ThreadedStreamConsumer" );
         thread.start();
     }
 
     @Override
     public void handleEvent( @Nonnull Event event )
     {
-        //todo CyclicBarrier here
         if ( stop.get() )
         {
             return;
         }
-        else if ( !thread.isAlive() )
+        // Do NOT call Thread.isAlive() - slow.
+        // It makes worse performance from 790 millis to 1250 millis for 5 
million messages.
+        else if ( !isAlive.get() )
         {
-            queue.clear();
+            synchronizer.clearQueue();
             return;
         }
 
-        int count = queueSize.get();
-        boolean min = count == 0;
-        boolean max = count >= QUEUE_MAX_ITEMS;
-        if ( min || max )
-        {
-            queueLock.lock();
-            try
-            {
-                queueSize.incrementAndGet();
-                queue.addLast( event );
-                producerBarrier.drainPermits();
-                queueCondition.signal();
-            }
-            finally
-            {
-                queueLock.unlock();
-            }
-
-            if ( max )
-            {
-                producerBarrier.acquireUninterruptibly();
-            }
-        }
-        else
-        {
-            queueSize.incrementAndGet();
-            queue.addLast( event );
-        }
+        synchronizer.pushNext( event );
     }
 
     @Override
@@ -188,18 +144,7 @@ public final class ThreadedStreamConsumer
     {
         if ( stop.compareAndSet( false, true ) )
         {
-            queue.addLast( END_ITEM );
-            queueLock.lock();
-            try
-            {
-                queueCondition.signal();
-            }
-            finally
-            {
-                queueLock.unlock();
-            }
-
-            producerBarrier.release( 2 );
+            synchronizer.markStopped();
         }
 
         if ( pumper.hasErrors() )
@@ -214,7 +159,7 @@ public final class ThreadedStreamConsumer
      * @param item    element from <code>items</code>
      * @return {@code true} if tail of the queue
      */
-    private boolean shouldStopQueueing( Event item )
+    private static boolean shouldStopQueueing( Event item )
     {
         return item == END_ITEM;
     }
@@ -271,4 +216,122 @@ public final class ThreadedStreamConsumer
             return false;
         }
     }
+
+    /**
+     * This synchronization helper mostly avoids the locks.
+     * If the queue size has reached zero or {@code maxQueueSize} then the 
threads are locked (parked/unparked).
+     * The thread instance T1 is reader (see the class "Pumper") and T2 is the 
writer (see the method "handleEvent").
+     *
+     * @param <T> element type in the queue
+     */
+    static class QueueSynchronizer<T>
+    {
+        private final SyncT1 t1 = new SyncT1();
+        private final SyncT2 t2 = new SyncT2();
+        private final ConcurrentLinkedDeque<T> queue = new 
ConcurrentLinkedDeque<>();
+        private final AtomicInteger queueSize = new AtomicInteger();
+        private final int maxQueueSize;
+        private final T stopItemMarker;
+
+        QueueSynchronizer( int maxQueueSize, T stopItemMarker )
+        {
+            this.maxQueueSize = maxQueueSize;
+            this.stopItemMarker = stopItemMarker;
+        }
+
+        private class SyncT1 extends AbstractQueuedSynchronizer
+        {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected int tryAcquireShared( int arg )
+            {
+                return queueSize.get() == 0 ? -1 : 1;
+            }
+
+            @Override
+            protected boolean tryReleaseShared( int arg )
+            {
+                return true;
+            }
+
+            void waitIfZero() throws InterruptedException
+            {
+                acquireSharedInterruptibly( 1 );
+            }
+
+            void release()
+            {
+                releaseShared( 0 );
+            }
+        }
+
+        private class SyncT2 extends AbstractQueuedSynchronizer
+        {
+            private static final long serialVersionUID = 1L;
+
+            @Override
+            protected int tryAcquireShared( int arg )
+            {
+                return queueSize.get() < maxQueueSize ? 1 : -1;
+            }
+
+            @Override
+            protected boolean tryReleaseShared( int arg )
+            {
+                return true;
+            }
+
+            void awaitMax()
+            {
+                acquireShared( 1 );
+            }
+
+            void tryRelease()
+            {
+                if ( queueSize.get() == 0 )
+                {
+                    releaseShared( 0 );
+                }
+            }
+        }
+
+        void markStopped()
+        {
+            addNext( stopItemMarker );
+        }
+
+        void pushNext( T t )
+        {
+            t2.awaitMax();
+            addNext( t );
+        }
+
+        T awaitNext() throws InterruptedException
+        {
+            t2.tryRelease();
+            t1.waitIfZero();
+            queueSize.decrementAndGet();
+            return queue.pollFirst();
+        }
+
+        boolean isEmptyQueue()
+        {
+            return queue.isEmpty();
+        }
+
+        void clearQueue()
+        {
+            queue.clear();
+        }
+
+        private void addNext( T t )
+        {
+            queue.addLast( t );
+            if ( queueSize.getAndIncrement() == 0 )
+            {
+                t1.release();
+            }
+        }
+    }
 }
diff --git 
a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
 
b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
index 415aab1..a859c76 100644
--- 
a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
+++ 
b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java
@@ -19,19 +19,18 @@ package 
org.apache.maven.plugin.surefire.booterclient.output;
  * under the License.
  */
 
+import 
org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer.QueueSynchronizer;
 import org.apache.maven.surefire.api.event.Event;
 import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent;
 import org.apache.maven.surefire.extensions.EventHandler;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
-import java.util.concurrent.ConcurrentLinkedDeque;
+import java.util.concurrent.Callable;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+import java.util.concurrent.FutureTask;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN;
 import static org.fest.assertions.Assertions.assertThat;
 
@@ -42,11 +41,10 @@ import static org.fest.assertions.Assertions.assertThat;
 public class ThreadedStreamConsumerTest
 {
     @Test
-    public void test5() throws Exception
+    public void testQueueSynchronizer() throws Exception
     {
         final CountDownLatch countDown = new CountDownLatch( 5_000_000 );
-        final QueueSynchronizer<String> sync = new QueueSynchronizer<>(  
5_000_000 );
-        final AtomicInteger idx = new AtomicInteger();
+        final QueueSynchronizer<Integer> sync = new QueueSynchronizer<>(  8 * 
1024, null );
 
         Thread t = new Thread()
         {
@@ -55,21 +53,10 @@ public class ThreadedStreamConsumerTest
             {
                 while ( true )
                 {
-                    if (sync.queueSize.get() == 0){
-                        //System.out.println("zero at " + idx.get());
-                    }
                     try
                     {
-                        String s = sync.awaitNext();
-                        if (s == null){
-                            System.out.println(s);
-                        }
-                        //System.out.println( i.get() + " " + s );
+                        sync.awaitNext();
                         countDown.countDown();
-                        if ( idx.incrementAndGet() % 11_000 == 0 )
-                        {
-                            //TimeUnit.MILLISECONDS.sleep( 10L );
-                        }
                     }
                     catch ( InterruptedException e )
                     {
@@ -78,64 +65,59 @@ public class ThreadedStreamConsumerTest
                 }
             }
         };
+        t.setDaemon( true );
         t.start();
 
+        SECONDS.sleep( 1 );
         System.gc();
-        TimeUnit.SECONDS.sleep( 2 );
+        SECONDS.sleep( 2 );
 
         long t1 = System.currentTimeMillis();
 
         for ( int i = 0; i < 5_000_000; i++ )
         {
-            sync.pushNext( i + "" );
+            sync.pushNext( i );
         }
-        assertThat( countDown.await( 10L, TimeUnit.MINUTES ) ).isTrue();
-        long t2 = System.currentTimeMillis();
-        System.out.println( ( t2 - t1 ) + " millis" );
 
-        TimeUnit.SECONDS.sleep( 2 );
+        assertThat( countDown.await( 3L, SECONDS ) )
+            .isTrue();
 
-        System.out.println( idx.get() );
-        System.out.println("countDown " + countDown.getCount());
-        System.out.println("queue size " + sync.queue.size());
-        System.out.println("queue size " + sync.queueSize.get());
+        long t2 = System.currentTimeMillis();
+        System.out.println( ( t2 - t1 ) + " millis in testQueueSynchronizer()" 
);
     }
 
     @Test
-    public void test() throws Exception
+    public void testThreadedStreamConsumer() throws Exception
     {
-        final CountDownLatch countDown = new CountDownLatch( 1000_000 );
+        final CountDownLatch countDown = new CountDownLatch( 5_000_000 );
         EventHandler<Event> handler = new EventHandler<Event>()
         {
-            private final AtomicInteger i = new AtomicInteger();
-
             @Override
             public void handleEvent( @Nonnull Event event )
             {
-                //System.out.println(i.get());
                 countDown.countDown();
-                try
-                {
-                    if ( i.incrementAndGet() % 11_000 == 0 )
-                    {
-                        TimeUnit.MILLISECONDS.sleep( 10L );
-                    }
-                }
-                catch ( InterruptedException e )
-                {
-                    throw new IllegalStateException( e );
-                }
             }
         };
 
         ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer( 
handler );
 
-        for ( int i = 0; i < 1000_000; i++ )
+        SECONDS.sleep( 1 );
+        System.gc();
+        SECONDS.sleep( 2 );
+
+        long t1 = System.currentTimeMillis();
+
+        Event event = new StandardStreamOutWithNewLineEvent( NORMAL_RUN, "" );
+        for ( int i = 0; i < 5_000_000; i++ )
         {
-            streamConsumer.handleEvent( new StandardStreamOutWithNewLineEvent( 
NORMAL_RUN, "" ) );
+            streamConsumer.handleEvent( event );
         }
 
-        assertThat( countDown.await( 10L, TimeUnit.MINUTES ) ).isTrue();
+        assertThat( countDown.await( 3L, SECONDS ) )
+            .isTrue();
+
+        long t2 = System.currentTimeMillis();
+        System.out.println( ( t2 - t1 ) + " millis in 
testThreadedStreamConsumer()" );
 
         streamConsumer.close();
     }
@@ -143,108 +125,27 @@ public class ThreadedStreamConsumerTest
     @Test
     public void test3() throws Exception
     {
-        QueueSynchronizer<String> sync = new QueueSynchronizer<>( 2 );
+        final QueueSynchronizer<String> sync = new QueueSynchronizer<>( 2, 
null );
         sync.pushNext( "1" );
         sync.pushNext( "2" );
-        //sync.pushNext( "3" );
         String s1 = sync.awaitNext();
         String s2 = sync.awaitNext();
-        //String s3 = sync.awaitNext();
-    }
-
-    static class QueueSynchronizer<T>
-    {
-        private final AtomicInteger queueSize = new AtomicInteger();
-
-        QueueSynchronizer( int max )
-        {
-            this.max = max;
-        }
-
-        private class SyncT1 extends AbstractQueuedSynchronizer
-        {
-            private static final long serialVersionUID = 1L;
-
-            @Override
-            protected int tryAcquireShared( int arg )
-            {
-                return queueSize.get() == 0 ? -1 : 1;
-            }
-
-            @Override
-            protected boolean tryReleaseShared( int arg )
-            {
-                return true;
-            }
-
-            void waitIfZero() throws InterruptedException
-            {
-                acquireSharedInterruptibly( 1 );
-            }
-
-            void release()
-            {
-                releaseShared( 0 );
-            }
-        }
-
-        private class SyncT2 extends AbstractQueuedSynchronizer
+        assertThat( s1 ).isEqualTo( "1" );
+        assertThat( s2 ).isEqualTo( "2" );
+        FutureTask<Void> future = new FutureTask<>( new Callable<Void>()
         {
-            private static final long serialVersionUID = 1L;
-
             @Override
-            protected int tryAcquireShared( int arg )
-            {
-                return queueSize.get() < max ? 1 : -1;
-            }
-
-            @Override
-            protected boolean tryReleaseShared( int arg )
-            {
-                return true;
-            }
-
-            void awaitMax()
-            {
-                acquireShared( 1 );
-            }
-
-            void tryRelease()
+            public Void call() throws Exception
             {
-                if ( queueSize.get() == 0 )
-                {
-                    releaseShared( 0 );
-                }
+                sync.awaitNext();
+                return null;
             }
-        }
-
-        private final SyncT1 t1 = new SyncT1();
-        private final SyncT2 t2 = new SyncT2();
-        private final ConcurrentLinkedDeque<T> queue = new 
ConcurrentLinkedDeque<>();
-        private final int max;
-
-        void pushNext( T t )
-        {
-            t2.awaitMax();
-            int previousCount = queueSize.get();
-            if ( previousCount == 0 )
-            {
-                t1.release();
-            }
-            queue.addLast( t );
-            previousCount = queueSize.getAndIncrement();
-            if ( previousCount == 0 )
-            {
-                t1.release();
-            }
-        }
-
-        T awaitNext() throws InterruptedException
-        {
-            t2.tryRelease();
-            t1.waitIfZero();
-            queueSize.decrementAndGet();
-            return queue.pollFirst();
-        }
+        } );
+        Thread t = new Thread( future );
+        t.setDaemon( true );
+        t.start();
+        SECONDS.sleep( 3L );
+        assertThat( t.getState() )
+            .isEqualTo( Thread.State.WAITING );
     }
 }
diff --git 
a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
 
b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
index 36425ed..9770f8a 100644
--- 
a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
+++ 
b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java
@@ -41,6 +41,7 @@ import 
org.apache.maven.plugin.surefire.booterclient.ModularClasspathForkConfigu
 import 
org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStreamBuilderTest;
 import 
org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestProvidingInputStreamTest;
 import org.apache.maven.plugin.surefire.booterclient.output.ForkClientTest;
+import 
org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumerTest;
 import org.apache.maven.plugin.surefire.extensions.ConsoleOutputReporterTest;
 import org.apache.maven.plugin.surefire.extensions.E2ETest;
 import 
org.apache.maven.plugin.surefire.extensions.ForkedProcessEventNotifierTest;
@@ -112,6 +113,7 @@ public class JUnit4SuiteTest extends TestCase
         suite.addTest( new JUnit4TestAdapter( ForkChannelTest.class ) );
         suite.addTest( new JUnit4TestAdapter( StreamFeederTest.class ) );
         suite.addTest( new JUnit4TestAdapter( E2ETest.class ) );
+        suite.addTest( new JUnit4TestAdapter( ThreadedStreamConsumerTest.class 
) );
         return suite;
     }
 }

Reply via email to