This is an automated email from the ASF dual-hosted git repository. tibordigana pushed a commit to branch faster-queue in repository https://gitbox.apache.org/repos/asf/maven-surefire.git
The following commit(s) were added to refs/heads/faster-queue by this push: new 2515839 finished ThreadedStreamConsumer and the test 2515839 is described below commit 2515839106580f7c0ad81661ed29fd684c636acc Author: tibordigana <tibordig...@apache.org> AuthorDate: Fri Jul 17 02:43:15 2020 +0200 finished ThreadedStreamConsumer and the test --- .../output/ThreadedStreamConsumer.java | 239 +++++++++++++-------- .../output/ThreadedStreamConsumerTest.java | 189 ++++------------ .../org/apache/maven/surefire/JUnit4SuiteTest.java | 2 + 3 files changed, 198 insertions(+), 232 deletions(-) diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java index 5e1968f..1114948 100644 --- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java +++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java @@ -20,7 +20,6 @@ package org.apache.maven.plugin.surefire.booterclient.output; */ import org.apache.maven.surefire.api.event.Event; -import org.apache.maven.surefire.api.util.internal.DaemonThreadFactory; import org.apache.maven.surefire.extensions.EventHandler; import org.apache.maven.surefire.shared.utils.cli.StreamConsumer; @@ -28,14 +27,19 @@ import javax.annotation.Nonnull; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ConcurrentLinkedDeque; -import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.Condition; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; + +import static org.apache.maven.surefire.api.util.internal.DaemonThreadFactory.newDaemonThread; /** - * Knows how to reconstruct *all* the state transmitted over stdout by the forked process. + * Knows how to reconstruct *all* the state transmitted over Channel by the forked process. + * <br> + * After applying the performance improvements with {@link QueueSynchronizer} the throughput becomes + * 6.33 mega messages per second + * (158 nano seconds per message, 5 million messages within 0.79 seconds - see the test ThreadedStreamConsumerTest) + * on CPU i5 Dual Core 2.6 GHz and Oracle JDK 11. * * @author Kristian Rosenvold */ @@ -45,13 +49,9 @@ public final class ThreadedStreamConsumer private static final int QUEUE_MAX_ITEMS = 10_000; private static final Event END_ITEM = new FinalEvent(); - private final ConcurrentLinkedDeque<Event> queue = new ConcurrentLinkedDeque<>(); - private final ReentrantLock queueLock = new ReentrantLock(); - private final Condition queueCondition = queueLock.newCondition(); - private final Semaphore producerBarrier = new Semaphore( 0 ); - private final AtomicInteger queueSize = new AtomicInteger(); + private final QueueSynchronizer<Event> synchronizer = new QueueSynchronizer<>( QUEUE_MAX_ITEMS, END_ITEM ); private final AtomicBoolean stop = new AtomicBoolean(); - private final Thread thread; + private final AtomicBoolean isAlive = new AtomicBoolean( true ); private final Pumper pumper; final class Pumper @@ -80,44 +80,26 @@ public final class ThreadedStreamConsumer @Override public void run() { - queueLock.lock(); - //todo CyclicBarrier here - try + while ( !stop.get() || !synchronizer.isEmptyQueue() ) { - while ( !stop.get() || !queue.isEmpty() ) + try { - try - { - Event item = queue.pollFirst(); - - if ( item == null ) - { - queueCondition.await(); - producerBarrier.release(); - continue; - } - else - { - queueSize.decrementAndGet(); - } - - if ( shouldStopQueueing( item ) ) - { - break; - } - - target.handleEvent( item ); - } - catch ( Throwable t ) + Event item = synchronizer.awaitNext(); + + if ( shouldStopQueueing( item ) ) { - errors.addException( t ); + break; } + + target.handleEvent( item ); + } + catch ( Throwable t ) + { + errors.addException( t ); } } - finally - { - queueLock.unlock(); - } + + isAlive.set( false ); } boolean hasErrors() @@ -134,52 +116,26 @@ public final class ThreadedStreamConsumer public ThreadedStreamConsumer( EventHandler<Event> target ) { pumper = new Pumper( target ); - thread = DaemonThreadFactory.newDaemonThread( pumper, "ThreadedStreamConsumer" ); + Thread thread = newDaemonThread( pumper, "ThreadedStreamConsumer" ); thread.start(); } @Override public void handleEvent( @Nonnull Event event ) { - //todo CyclicBarrier here if ( stop.get() ) { return; } - else if ( !thread.isAlive() ) + // Do NOT call Thread.isAlive() - slow. + // It makes worse performance from 790 millis to 1250 millis for 5 million messages. + else if ( !isAlive.get() ) { - queue.clear(); + synchronizer.clearQueue(); return; } - int count = queueSize.get(); - boolean min = count == 0; - boolean max = count >= QUEUE_MAX_ITEMS; - if ( min || max ) - { - queueLock.lock(); - try - { - queueSize.incrementAndGet(); - queue.addLast( event ); - producerBarrier.drainPermits(); - queueCondition.signal(); - } - finally - { - queueLock.unlock(); - } - - if ( max ) - { - producerBarrier.acquireUninterruptibly(); - } - } - else - { - queueSize.incrementAndGet(); - queue.addLast( event ); - } + synchronizer.pushNext( event ); } @Override @@ -188,18 +144,7 @@ public final class ThreadedStreamConsumer { if ( stop.compareAndSet( false, true ) ) { - queue.addLast( END_ITEM ); - queueLock.lock(); - try - { - queueCondition.signal(); - } - finally - { - queueLock.unlock(); - } - - producerBarrier.release( 2 ); + synchronizer.markStopped(); } if ( pumper.hasErrors() ) @@ -214,7 +159,7 @@ public final class ThreadedStreamConsumer * @param item element from <code>items</code> * @return {@code true} if tail of the queue */ - private boolean shouldStopQueueing( Event item ) + private static boolean shouldStopQueueing( Event item ) { return item == END_ITEM; } @@ -271,4 +216,122 @@ public final class ThreadedStreamConsumer return false; } } + + /** + * This synchronization helper mostly avoids the locks. + * If the queue size has reached zero or {@code maxQueueSize} then the threads are locked (parked/unparked). + * The thread instance T1 is reader (see the class "Pumper") and T2 is the writer (see the method "handleEvent"). + * + * @param <T> element type in the queue + */ + static class QueueSynchronizer<T> + { + private final SyncT1 t1 = new SyncT1(); + private final SyncT2 t2 = new SyncT2(); + private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>(); + private final AtomicInteger queueSize = new AtomicInteger(); + private final int maxQueueSize; + private final T stopItemMarker; + + QueueSynchronizer( int maxQueueSize, T stopItemMarker ) + { + this.maxQueueSize = maxQueueSize; + this.stopItemMarker = stopItemMarker; + } + + private class SyncT1 extends AbstractQueuedSynchronizer + { + private static final long serialVersionUID = 1L; + + @Override + protected int tryAcquireShared( int arg ) + { + return queueSize.get() == 0 ? -1 : 1; + } + + @Override + protected boolean tryReleaseShared( int arg ) + { + return true; + } + + void waitIfZero() throws InterruptedException + { + acquireSharedInterruptibly( 1 ); + } + + void release() + { + releaseShared( 0 ); + } + } + + private class SyncT2 extends AbstractQueuedSynchronizer + { + private static final long serialVersionUID = 1L; + + @Override + protected int tryAcquireShared( int arg ) + { + return queueSize.get() < maxQueueSize ? 1 : -1; + } + + @Override + protected boolean tryReleaseShared( int arg ) + { + return true; + } + + void awaitMax() + { + acquireShared( 1 ); + } + + void tryRelease() + { + if ( queueSize.get() == 0 ) + { + releaseShared( 0 ); + } + } + } + + void markStopped() + { + addNext( stopItemMarker ); + } + + void pushNext( T t ) + { + t2.awaitMax(); + addNext( t ); + } + + T awaitNext() throws InterruptedException + { + t2.tryRelease(); + t1.waitIfZero(); + queueSize.decrementAndGet(); + return queue.pollFirst(); + } + + boolean isEmptyQueue() + { + return queue.isEmpty(); + } + + void clearQueue() + { + queue.clear(); + } + + private void addNext( T t ) + { + queue.addLast( t ); + if ( queueSize.getAndIncrement() == 0 ) + { + t1.release(); + } + } + } } diff --git a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java index 415aab1..a859c76 100644 --- a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java +++ b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java @@ -19,19 +19,18 @@ package org.apache.maven.plugin.surefire.booterclient.output; * under the License. */ +import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumer.QueueSynchronizer; import org.apache.maven.surefire.api.event.Event; import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent; import org.apache.maven.surefire.extensions.EventHandler; -import org.junit.Ignore; import org.junit.Test; import javax.annotation.Nonnull; -import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.locks.AbstractQueuedSynchronizer; +import java.util.concurrent.FutureTask; +import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN; import static org.fest.assertions.Assertions.assertThat; @@ -42,11 +41,10 @@ import static org.fest.assertions.Assertions.assertThat; public class ThreadedStreamConsumerTest { @Test - public void test5() throws Exception + public void testQueueSynchronizer() throws Exception { final CountDownLatch countDown = new CountDownLatch( 5_000_000 ); - final QueueSynchronizer<String> sync = new QueueSynchronizer<>( 5_000_000 ); - final AtomicInteger idx = new AtomicInteger(); + final QueueSynchronizer<Integer> sync = new QueueSynchronizer<>( 8 * 1024, null ); Thread t = new Thread() { @@ -55,21 +53,10 @@ public class ThreadedStreamConsumerTest { while ( true ) { - if (sync.queueSize.get() == 0){ - //System.out.println("zero at " + idx.get()); - } try { - String s = sync.awaitNext(); - if (s == null){ - System.out.println(s); - } - //System.out.println( i.get() + " " + s ); + sync.awaitNext(); countDown.countDown(); - if ( idx.incrementAndGet() % 11_000 == 0 ) - { - //TimeUnit.MILLISECONDS.sleep( 10L ); - } } catch ( InterruptedException e ) { @@ -78,64 +65,59 @@ public class ThreadedStreamConsumerTest } } }; + t.setDaemon( true ); t.start(); + SECONDS.sleep( 1 ); System.gc(); - TimeUnit.SECONDS.sleep( 2 ); + SECONDS.sleep( 2 ); long t1 = System.currentTimeMillis(); for ( int i = 0; i < 5_000_000; i++ ) { - sync.pushNext( i + "" ); + sync.pushNext( i ); } - assertThat( countDown.await( 10L, TimeUnit.MINUTES ) ).isTrue(); - long t2 = System.currentTimeMillis(); - System.out.println( ( t2 - t1 ) + " millis" ); - TimeUnit.SECONDS.sleep( 2 ); + assertThat( countDown.await( 3L, SECONDS ) ) + .isTrue(); - System.out.println( idx.get() ); - System.out.println("countDown " + countDown.getCount()); - System.out.println("queue size " + sync.queue.size()); - System.out.println("queue size " + sync.queueSize.get()); + long t2 = System.currentTimeMillis(); + System.out.println( ( t2 - t1 ) + " millis in testQueueSynchronizer()" ); } @Test - public void test() throws Exception + public void testThreadedStreamConsumer() throws Exception { - final CountDownLatch countDown = new CountDownLatch( 1000_000 ); + final CountDownLatch countDown = new CountDownLatch( 5_000_000 ); EventHandler<Event> handler = new EventHandler<Event>() { - private final AtomicInteger i = new AtomicInteger(); - @Override public void handleEvent( @Nonnull Event event ) { - //System.out.println(i.get()); countDown.countDown(); - try - { - if ( i.incrementAndGet() % 11_000 == 0 ) - { - TimeUnit.MILLISECONDS.sleep( 10L ); - } - } - catch ( InterruptedException e ) - { - throw new IllegalStateException( e ); - } } }; ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer( handler ); - for ( int i = 0; i < 1000_000; i++ ) + SECONDS.sleep( 1 ); + System.gc(); + SECONDS.sleep( 2 ); + + long t1 = System.currentTimeMillis(); + + Event event = new StandardStreamOutWithNewLineEvent( NORMAL_RUN, "" ); + for ( int i = 0; i < 5_000_000; i++ ) { - streamConsumer.handleEvent( new StandardStreamOutWithNewLineEvent( NORMAL_RUN, "" ) ); + streamConsumer.handleEvent( event ); } - assertThat( countDown.await( 10L, TimeUnit.MINUTES ) ).isTrue(); + assertThat( countDown.await( 3L, SECONDS ) ) + .isTrue(); + + long t2 = System.currentTimeMillis(); + System.out.println( ( t2 - t1 ) + " millis in testThreadedStreamConsumer()" ); streamConsumer.close(); } @@ -143,108 +125,27 @@ public class ThreadedStreamConsumerTest @Test public void test3() throws Exception { - QueueSynchronizer<String> sync = new QueueSynchronizer<>( 2 ); + final QueueSynchronizer<String> sync = new QueueSynchronizer<>( 2, null ); sync.pushNext( "1" ); sync.pushNext( "2" ); - //sync.pushNext( "3" ); String s1 = sync.awaitNext(); String s2 = sync.awaitNext(); - //String s3 = sync.awaitNext(); - } - - static class QueueSynchronizer<T> - { - private final AtomicInteger queueSize = new AtomicInteger(); - - QueueSynchronizer( int max ) - { - this.max = max; - } - - private class SyncT1 extends AbstractQueuedSynchronizer - { - private static final long serialVersionUID = 1L; - - @Override - protected int tryAcquireShared( int arg ) - { - return queueSize.get() == 0 ? -1 : 1; - } - - @Override - protected boolean tryReleaseShared( int arg ) - { - return true; - } - - void waitIfZero() throws InterruptedException - { - acquireSharedInterruptibly( 1 ); - } - - void release() - { - releaseShared( 0 ); - } - } - - private class SyncT2 extends AbstractQueuedSynchronizer + assertThat( s1 ).isEqualTo( "1" ); + assertThat( s2 ).isEqualTo( "2" ); + FutureTask<Void> future = new FutureTask<>( new Callable<Void>() { - private static final long serialVersionUID = 1L; - @Override - protected int tryAcquireShared( int arg ) - { - return queueSize.get() < max ? 1 : -1; - } - - @Override - protected boolean tryReleaseShared( int arg ) - { - return true; - } - - void awaitMax() - { - acquireShared( 1 ); - } - - void tryRelease() + public Void call() throws Exception { - if ( queueSize.get() == 0 ) - { - releaseShared( 0 ); - } + sync.awaitNext(); + return null; } - } - - private final SyncT1 t1 = new SyncT1(); - private final SyncT2 t2 = new SyncT2(); - private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>(); - private final int max; - - void pushNext( T t ) - { - t2.awaitMax(); - int previousCount = queueSize.get(); - if ( previousCount == 0 ) - { - t1.release(); - } - queue.addLast( t ); - previousCount = queueSize.getAndIncrement(); - if ( previousCount == 0 ) - { - t1.release(); - } - } - - T awaitNext() throws InterruptedException - { - t2.tryRelease(); - t1.waitIfZero(); - queueSize.decrementAndGet(); - return queue.pollFirst(); - } + } ); + Thread t = new Thread( future ); + t.setDaemon( true ); + t.start(); + SECONDS.sleep( 3L ); + assertThat( t.getState() ) + .isEqualTo( Thread.State.WAITING ); } } diff --git a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java index 36425ed..9770f8a 100644 --- a/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java +++ b/maven-surefire-common/src/test/java/org/apache/maven/surefire/JUnit4SuiteTest.java @@ -41,6 +41,7 @@ import org.apache.maven.plugin.surefire.booterclient.ModularClasspathForkConfigu import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestLessInputStreamBuilderTest; import org.apache.maven.plugin.surefire.booterclient.lazytestprovider.TestProvidingInputStreamTest; import org.apache.maven.plugin.surefire.booterclient.output.ForkClientTest; +import org.apache.maven.plugin.surefire.booterclient.output.ThreadedStreamConsumerTest; import org.apache.maven.plugin.surefire.extensions.ConsoleOutputReporterTest; import org.apache.maven.plugin.surefire.extensions.E2ETest; import org.apache.maven.plugin.surefire.extensions.ForkedProcessEventNotifierTest; @@ -112,6 +113,7 @@ public class JUnit4SuiteTest extends TestCase suite.addTest( new JUnit4TestAdapter( ForkChannelTest.class ) ); suite.addTest( new JUnit4TestAdapter( StreamFeederTest.class ) ); suite.addTest( new JUnit4TestAdapter( E2ETest.class ) ); + suite.addTest( new JUnit4TestAdapter( ThreadedStreamConsumerTest.class ) ); return suite; } }