This is an automated email from the ASF dual-hosted git repository. tibordigana pushed a commit to branch faster-queue in repository https://gitbox.apache.org/repos/asf/maven-surefire.git
The following commit(s) were added to refs/heads/faster-queue by this push: new 78339d3 custom QueueSynchronizer instead of ReentrantLock+Semaphore+CyclicBarrier 78339d3 is described below commit 78339d3c7ff19bdb29d8761b720d88d65fd840ba Author: tibordigana <tibordig...@apache.org> AuthorDate: Tue Jul 14 02:22:31 2020 +0200 custom QueueSynchronizer instead of ReentrantLock+Semaphore+CyclicBarrier --- .../output/ThreadedStreamConsumer.java | 50 ++++---- .../output/ThreadedStreamConsumerTest.java | 129 +++++++++++++++++++-- 2 files changed, 147 insertions(+), 32 deletions(-) diff --git a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java index 34d3a31..5e1968f 100644 --- a/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java +++ b/maven-surefire-common/src/main/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumer.java @@ -28,40 +28,34 @@ import javax.annotation.Nonnull; import java.io.Closeable; import java.io.IOException; import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.Semaphore; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; -import static java.util.concurrent.TimeUnit.SECONDS; - /** * Knows how to reconstruct *all* the state transmitted over stdout by the forked process. * * @author Kristian Rosenvold */ public final class ThreadedStreamConsumer - implements EventHandler<Event>, Closeable + implements EventHandler<Event>, Closeable { private static final int QUEUE_MAX_ITEMS = 10_000; private static final Event END_ITEM = new FinalEvent(); private final ConcurrentLinkedDeque<Event> queue = new ConcurrentLinkedDeque<>(); - private final ReentrantLock queueLock = new ReentrantLock(); - private final Condition queueCondition = queueLock.newCondition(); - + private final Semaphore producerBarrier = new Semaphore( 0 ); private final AtomicInteger queueSize = new AtomicInteger(); - private final AtomicBoolean stop = new AtomicBoolean(); - private final Thread thread; - private final Pumper pumper; final class Pumper - implements Runnable + implements Runnable { private final EventHandler<Event> target; @@ -86,9 +80,10 @@ public final class ThreadedStreamConsumer @Override public void run() { + queueLock.lock(); + //todo CyclicBarrier here try { - queueLock.lock(); while ( !stop.get() || !queue.isEmpty() ) { try @@ -97,7 +92,8 @@ public final class ThreadedStreamConsumer if ( item == null ) { - queueCondition.await( 1L, SECONDS ); + queueCondition.await(); + producerBarrier.release(); continue; } else @@ -145,6 +141,7 @@ public final class ThreadedStreamConsumer @Override public void handleEvent( @Nonnull Event event ) { + //todo CyclicBarrier here if ( stop.get() ) { return; @@ -156,17 +153,27 @@ public final class ThreadedStreamConsumer } int count = queueSize.get(); - if ( count == 0 || count >= QUEUE_MAX_ITEMS ) + boolean min = count == 0; + boolean max = count >= QUEUE_MAX_ITEMS; + if ( min || max ) { + queueLock.lock(); try { - queueLock.lock(); - updateAndNotifyReader( event ); + queueSize.incrementAndGet(); + queue.addLast( event ); + producerBarrier.drainPermits(); + queueCondition.signal(); } finally { queueLock.unlock(); } + + if ( max ) + { + producerBarrier.acquireUninterruptibly(); + } } else { @@ -175,29 +182,24 @@ public final class ThreadedStreamConsumer } } - private void updateAndNotifyReader( @Nonnull Event event ) - { - queueSize.incrementAndGet(); - queue.addLast( event ); - queueCondition.signal(); - } - @Override public void close() - throws IOException + throws IOException { if ( stop.compareAndSet( false, true ) ) { queue.addLast( END_ITEM ); + queueLock.lock(); try { - queueLock.lock(); queueCondition.signal(); } finally { queueLock.unlock(); } + + producerBarrier.release( 2 ); } if ( pumper.hasErrors() ) diff --git a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java index 54491a4..83b6c50 100644 --- a/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java +++ b/maven-surefire-common/src/test/java/org/apache/maven/plugin/surefire/booterclient/output/ThreadedStreamConsumerTest.java @@ -22,13 +22,18 @@ package org.apache.maven.plugin.surefire.booterclient.output; import org.apache.maven.surefire.api.event.Event; import org.apache.maven.surefire.api.event.StandardStreamOutWithNewLineEvent; import org.apache.maven.surefire.extensions.EventHandler; +import org.junit.Ignore; import org.junit.Test; import javax.annotation.Nonnull; - +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.AbstractQueuedSynchronizer; import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN; +import static org.fest.assertions.Assertions.assertThat; /** * @@ -37,38 +42,146 @@ import static org.apache.maven.surefire.api.report.RunMode.NORMAL_RUN; public class ThreadedStreamConsumerTest { @Test + @Ignore public void test() throws Exception { + final CountDownLatch countDown = new CountDownLatch( 100_000 ); EventHandler<Event> handler = new EventHandler<Event>() { - private int i; + private final AtomicInteger i = new AtomicInteger(); @Override public void handleEvent( @Nonnull Event event ) { + //System.out.println(i.get()); + countDown.countDown(); try { - System.out.println( Thread.currentThread() ); - if ( i++ % 5000 == 0 ) + if ( i.incrementAndGet() % 11_000 == 0 ) { - TimeUnit.MILLISECONDS.sleep( 500L ); + TimeUnit.MILLISECONDS.sleep( 10L ); } } catch ( InterruptedException e ) { - e.printStackTrace(); + throw new IllegalStateException( e ); } } }; ThreadedStreamConsumer streamConsumer = new ThreadedStreamConsumer( handler ); - for ( int i = 0; i < 11_000; i++ ) + for ( int i = 0; i < 100_000; i++ ) { - System.out.println( i ); streamConsumer.handleEvent( new StandardStreamOutWithNewLineEvent( NORMAL_RUN, "" ) ); } + assertThat( countDown.await( 10L, TimeUnit.MINUTES ) ).isTrue(); + streamConsumer.close(); } + + @Test + public void test3() throws Exception + { + QueueSynchronizer<String> sync = new QueueSynchronizer<>( 2 ); + sync.pushNext( "1" ); + sync.pushNext( "2" ); + //sync.pushNext( "3" ); + String s1 = sync.awaitNext(); + String s2 = sync.awaitNext(); + //String s3 = sync.awaitNext(); + } + + static class QueueSynchronizer<T> + { + private final AtomicInteger queueSize = new AtomicInteger(); + + QueueSynchronizer( int max ) + { + this.max = max; + } + + private class SyncT1 extends AbstractQueuedSynchronizer + { + private static final long serialVersionUID = 1L; + + @Override + protected int tryAcquireShared( int arg ) + { + return queueSize.get() == 0 ? -1 : 1; + } + + @Override + protected boolean tryReleaseShared( int arg ) + { + return true; + } + + void waitIfZero() throws InterruptedException + { + acquireSharedInterruptibly( 1 ); + } + + void release() + { + releaseShared( 0 ); + } + } + + private class SyncT2 extends AbstractQueuedSynchronizer + { + private static final long serialVersionUID = 1L; + + @Override + protected int tryAcquireShared( int arg ) + { + return queueSize.get() < max ? 1 : -1; + } + + @Override + protected boolean tryReleaseShared( int arg ) + { + return true; + } + + void awaitMax() + { + acquireShared( 1 ); + } + + void tryRelease() + { + if ( queueSize.get() == 0 ) + { + releaseShared( 0 ); + } + } + } + + private final SyncT1 t1 = new SyncT1(); + private final SyncT2 t2 = new SyncT2(); + private final ConcurrentLinkedDeque<T> queue = new ConcurrentLinkedDeque<>(); + private final int max; + + void pushNext( T t ) + { + queue.addLast( t ); + int previousCount = queueSize.get(); + t2.awaitMax(); + queueSize.incrementAndGet(); + if ( previousCount == 0 ) + { + t1.release(); + } + } + + T awaitNext() throws InterruptedException + { + t2.tryRelease(); + t1.waitIfZero(); + queueSize.decrementAndGet(); + return queue.pollFirst(); + } + } }