Michael Kublin has uploaded a new change for review.

Change subject: engine: Removing using of latch all around a code
......................................................................

engine: Removing using of latch all around a code

In engine code was requirement to run couple of simentenious tasks and stuck a 
main
thread untill they will be finished.
A previous solution was to use a latch, these required to write a same code and 
keep some
logic for increasing or reducing latch count.
A new solution is to provide a generic way for such problem and to use a 
standard java api which
should solve such problem

Change-Id: I34ffc04f7abf57f3d27322a5615f89d07dee16ef
Signed-off-by: Michael Kublin <[email protected]>
---
M 
backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/IsoDomainListSyncronizer.java
M 
backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/MultipleActionsRunner.java
M 
backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java
M 
backend/manager/modules/dal/src/test/java/org/ovirt/engine/core/dao/MultiThreadedDAOTest.java
M 
backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/SyncronizeNumberOfAsyncOperations.java
D 
backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java
D 
backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapper.java
M 
backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/threadpool/ThreadPoolUtil.java
D 
backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
9 files changed, 126 insertions(+), 329 deletions(-)


  git pull ssh://gerrit.ovirt.org:29418/ovirt-engine refs/changes/37/11037/1

diff --git 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/IsoDomainListSyncronizer.java
 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/IsoDomainListSyncronizer.java
index 6fe591b..94b817e 100644
--- 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/IsoDomainListSyncronizer.java
+++ 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/IsoDomainListSyncronizer.java
@@ -5,9 +5,9 @@
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantLock;
@@ -104,34 +104,28 @@
                         StorageDomainStatus.Active,
                         VDSStatus.Up);
 
-        // Set count down latch to size of map for multiple threaded.
-        final CountDownLatch latch = new CountDownLatch(repofileList.size());
-
         resetProblematicList();
         // Iterate for each storage domain.
+        List<Callable<Void>> tasks = new ArrayList<Callable<Void>>();
         for (final RepoFileMetaData repoFileMetaData : repofileList) {
             // If the list should be refreshed and the refresh from the VDSM 
was succeeded, fetch the file list again
             // from the DB.
             if (shouldRefreshIsoDomain(repoFileMetaData.getLastRefreshed())) {
-                ThreadPoolUtil.execute(new Runnable() {
+                tasks.add(new Callable<Void>() {
                     @Override
-                    public void run() {
-                        updateCachedIsoFileListFromVdsm(repoFileMetaData, 
latch);
+                    public Void call() {
+                        updateCachedIsoFileListFromVdsm(repoFileMetaData);
+                        return null;
                     }
                 });
             } else {
-                latch.countDown();
                 log.debugFormat("Automatic refresh process for {0} file type 
in storage domain id {1} was not performed since refresh time out did not 
passed yet.",
                         repoFileMetaData.getFileType(),
                         repoFileMetaData.getRepoDomainId());
             }
         }
 
-        try {
-            latch.await();
-        } catch (InterruptedException e) {
-            log.error("Automatic refresh process encounter a problem.", e);
-        }
+        ThreadPoolUtil.invokeAll(tasks);
 
         // After refresh for all Iso domains finished, handle the log.
         handleErrorLog(problematicRepoFileList);
@@ -522,7 +516,7 @@
      * If refresh from VDSM has encounter problems, we update the problematic 
domain list.
      * @param repoFileMetaData
      */
-    private void updateCachedIsoFileListFromVdsm(RepoFileMetaData 
repoFileMetaData, final CountDownLatch latch)
+    private void updateCachedIsoFileListFromVdsm(RepoFileMetaData 
repoFileMetaData)
     {
         boolean isRefreshed = false;
         try {
@@ -533,8 +527,6 @@
                             repoFileMetaData.getFileType());
             addRepoFileToProblematicList(problematicRepoFileList);
         } finally {
-            // At any case count down the latch, and print log message.
-            latch.countDown();
             log.infoFormat("Finished automatic refresh process for {0} file 
type with {1}, for storage domain id {2}.",
                     repoFileMetaData.getFileType(),
                     isRefreshed ? "success"
diff --git 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/MultipleActionsRunner.java
 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/MultipleActionsRunner.java
index 662bbea..5ccc016 100644
--- 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/MultipleActionsRunner.java
+++ 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/MultipleActionsRunner.java
@@ -5,7 +5,7 @@
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Callable;
 
 import org.apache.commons.lang.StringUtils;
 import org.ovirt.engine.core.bll.job.ExecutionContext;
@@ -113,66 +113,45 @@
      * the same time the number of threads is managed by java
      *
      * @param returnValues
-     * @param executorService
      */
     private void CheckCanDoActionsAsyncroniousely(
                                                   
ArrayList<VdcReturnValueBase> returnValues) {
         for (int i = 0; i < getCommands().size(); i += CONCURRENT_ACTIONS) {
             int handleSize = Math.min(CONCURRENT_ACTIONS, getCommands().size() 
- i);
-            CountDownLatch latch = new CountDownLatch(handleSize);
 
             int fixedSize = i + handleSize;
+            List<Callable<VdcReturnValueBase>> canDoActionTasks = new 
ArrayList<Callable<VdcReturnValueBase>>();
             for (int j = i; j < fixedSize; j++) {
-                RunCanDoActionAsyncroniousely(returnValues, j, fixedSize, 
latch);
+                canDoActionTasks.add(buildCanDoActionAsyncroniousely(j, 
fixedSize));
             }
-            try {
-                latch.await();
-            } catch (InterruptedException e) {
-            }
+            returnValues.addAll(ThreadPoolUtil.invokeAll(canDoActionTasks));
         }
     }
 
-    private void RunCanDoActionAsyncroniousely(
-                                               final 
ArrayList<VdcReturnValueBase> returnValues,
-                                               final int currentCanDoActionId, 
final int totalSize, final CountDownLatch latch) {
-        ThreadPoolUtil.execute(new Runnable() {
+    private Callable<VdcReturnValueBase> buildCanDoActionAsyncroniousely(
+            final int currentCanDoActionId, final int totalSize) {
+        return new Callable<VdcReturnValueBase>() {
+
             @Override
-            public void run() {
+            public VdcReturnValueBase call() {
                 CommandBase<?> command = 
getCommands().get(currentCanDoActionId);
-                if (command != null) {
-                    String actionType = command.getActionType().toString();
-                    try {
-                        log.infoFormat("Start time: {0}. Start running 
CanDoAction for command number {1}/{2} (Command type: {3})",
-                                new Date(),
-                                currentCanDoActionId + 1,
-                                totalSize,
-                                actionType);
-                        VdcReturnValueBase returnValue = 
command.canDoActionOnly();
-                        synchronized (returnValues) {
-                            returnValues.add(returnValue);
-                        }
-                    } catch (RuntimeException e) {
-                        log.errorFormat("Failed to execute CanDoAction() for 
command number {0}/{1} (Command type: {2}), Error: {3}",
-                                currentCanDoActionId + 1,
-                                totalSize,
-                                actionType,
-                                e);
-                    } finally {
-                        latch.countDown();
-                        log.infoFormat("End time: {0}. Finish handling 
CanDoAction for command number {1}/{2} (Command type: {3})",
-                                new Date(),
-                                currentCanDoActionId + 1,
-                                totalSize,
-                                actionType);
-                    }
-                }
-                else {
-                    log.errorFormat("Failed to execute CanDoAction() for 
command number {0}/{1}. Command is null.",
+                String actionType = command.getActionType().toString();
+                try {
+                    log.infoFormat("Start time: {0}. Start running CanDoAction 
for command number {1}/{2} (Command type: {3})",
+                            new Date(),
                             currentCanDoActionId + 1,
-                            totalSize);
+                            totalSize,
+                            actionType);
+                    return command.canDoActionOnly();
+                } finally {
+                    log.infoFormat("End time: {0}. Finish handling CanDoAction 
for command number {1}/{2} (Command type: {3})",
+                            new Date(),
+                            currentCanDoActionId + 1,
+                            totalSize,
+                            actionType);
                 }
             }
-        });
+        };
     }
 
     protected void RunCommands() {
diff --git 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java
 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java
index 10d8bd8..89929a3 100644
--- 
a/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java
+++ 
b/backend/manager/modules/bll/src/main/java/org/ovirt/engine/core/bll/storage/ReconstructMasterDomainCommand.java
@@ -1,7 +1,8 @@
 package org.ovirt.engine.core.bll.storage;
 
-import java.util.LinkedList;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
 
 import org.ovirt.engine.core.bll.Backend;
 import org.ovirt.engine.core.bll.NonTransactiveCommandAttribute;
@@ -28,7 +29,7 @@
 import org.ovirt.engine.core.compat.Guid;
 import org.ovirt.engine.core.dal.VdcBllMessages;
 import org.ovirt.engine.core.dal.dbbroker.DbFacade;
-import org.ovirt.engine.core.utils.thread.LatchedRunnableExecutor;
+import org.ovirt.engine.core.utils.threadpool.ThreadPoolUtil;
 import org.ovirt.engine.core.utils.transaction.TransactionMethod;
 
 @SuppressWarnings("serial")
@@ -208,11 +209,12 @@
         final boolean isPerformConnectOps = !_isLastMaster && commandSucceeded;
         final boolean isPerformDisconnect = !getParameters().isInactive();
         if (isPerformConnectOps || isPerformDisconnect) {
-            List<Runnable> tasks = new LinkedList<Runnable>();
+            List<Callable<Void>> tasks = new ArrayList<Callable<Void>>();
             for (final VDS vds : getAllRunningVdssInPool()) {
-                tasks.add(new Runnable() {
+                tasks.add(new Callable<Void>() {
+
                     @Override
-                    public void run() {
+                    public Void call() {
                         try {
                             if (isPerformConnectOps && 
connectVdsToNewMaster(vds)) {
                                 try {
@@ -256,10 +258,11 @@
                                     vds.getId(),
                                     e.getMessage());
                         }
+                        return null;
                     }
                 });
             }
-            new LatchedRunnableExecutor(tasks).execute();
+            ThreadPoolUtil.invokeAll(tasks);
         }
     }
 
diff --git 
a/backend/manager/modules/dal/src/test/java/org/ovirt/engine/core/dao/MultiThreadedDAOTest.java
 
b/backend/manager/modules/dal/src/test/java/org/ovirt/engine/core/dao/MultiThreadedDAOTest.java
index 71a40eb..f17cd6d 100644
--- 
a/backend/manager/modules/dal/src/test/java/org/ovirt/engine/core/dao/MultiThreadedDAOTest.java
+++ 
b/backend/manager/modules/dal/src/test/java/org/ovirt/engine/core/dao/MultiThreadedDAOTest.java
@@ -12,7 +12,6 @@
 import org.ovirt.engine.core.compat.Guid;
 import org.ovirt.engine.core.utils.log.Log;
 import org.ovirt.engine.core.utils.log.LogFactory;
-import org.ovirt.engine.core.utils.thread.LatchedRunnableWrapper;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNull;
 
diff --git 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/SyncronizeNumberOfAsyncOperations.java
 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/SyncronizeNumberOfAsyncOperations.java
index f977fe1..c7809e8 100644
--- 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/SyncronizeNumberOfAsyncOperations.java
+++ 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/SyncronizeNumberOfAsyncOperations.java
@@ -1,6 +1,8 @@
 package org.ovirt.engine.core.utils;
 
-import java.util.concurrent.CountDownLatch;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
 import org.ovirt.engine.core.utils.threadpool.ThreadPoolUtil;
 
 public final class SyncronizeNumberOfAsyncOperations {
@@ -14,36 +16,32 @@
         _factory.initialize(parameters);
     }
 
-    private class AsyncOpThread implements Runnable {
-        private CountDownLatch latch;
+    public void Execute() {
+
+        List<AsyncOpThread<Void>> operations = new 
ArrayList<AsyncOpThread<Void>>();
+        for (int i = 0; i < _numberOfOperations; i++) {
+            operations.add(new AsyncOpThread<Void>(i));
+        }
+
+        if (_numberOfOperations > 0) {
+            ThreadPoolUtil.invokeAll(operations);
+        }
+    }
+
+    private class AsyncOpThread<V> implements Callable<V> {
+
         private int currentEventId;
 
-        public AsyncOpThread(CountDownLatch latch, int currentEventId) {
-            this.latch = latch;
+        public AsyncOpThread(int currentEventId) {
             this.currentEventId = currentEventId;
         }
 
         @Override
-        public void run() {
-            try {
-                ISingleAsyncOperation operation = 
_factory.createSingleAsyncOperation();
-                operation.execute(currentEventId);
-            } finally {
-                latch.countDown();
-            }
+        public V call() {
+            ISingleAsyncOperation operation = 
_factory.createSingleAsyncOperation();
+            operation.execute(currentEventId);
+            return null;
         }
     }
 
-    public void Execute() {
-        CountDownLatch latch = new CountDownLatch(_numberOfOperations);
-
-        for (int i = 0; i < _numberOfOperations; i++) {
-            ThreadPoolUtil.execute(new AsyncOpThread(latch, i));
-        }
-
-        try {
-            latch.await();
-        } catch (InterruptedException e) {
-        }
-    }
 }
diff --git 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java
 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java
deleted file mode 100644
index 13488aa..0000000
--- 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableExecutor.java
+++ /dev/null
@@ -1,47 +0,0 @@
-package org.ovirt.engine.core.utils.thread;
-
-import java.util.Collection;
-import java.util.concurrent.CountDownLatch;
-
-import org.ovirt.engine.core.utils.threadpool.ThreadPoolUtil;
-
-public class LatchedRunnableExecutor {
-
-    private Collection<Runnable> runnables;
-
-    /**
-     * @param runnables
-     *            - list of runnables for execution
-     */
-    public LatchedRunnableExecutor(Collection<Runnable> runnables) {
-        this.runnables = runnables;
-    }
-
-    protected void executeRunnable(LatchedRunnableWrapper runnable) {
-        ThreadPoolUtil.execute(runnable);
-    }
-
-    protected LatchedRunnableWrapper createLatchedRunnableWrapper(Runnable 
runnable, CountDownLatch latch) {
-        return new LatchedRunnableWrapper(runnable, latch);
-    }
-
-    protected CountDownLatch createCountDownLatch() {
-        return new CountDownLatch(runnables.size());
-    }
-
-    /**
-     * executes the list of Runnable provided to this executer during 
creations and waits till the execution of all
-     * runnables is done.
-     */
-    public void execute() {
-        CountDownLatch latch = createCountDownLatch();
-        for (Runnable runnable : runnables) {
-            LatchedRunnableWrapper latchWrapper = 
createLatchedRunnableWrapper(runnable, latch);
-            executeRunnable(latchWrapper);
-        }
-        try {
-            latch.await();
-        } catch (InterruptedException e) {
-        }
-    }
-}
diff --git 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapper.java
 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapper.java
deleted file mode 100644
index 98a5aab..0000000
--- 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapper.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package org.ovirt.engine.core.utils.thread;
-
-import java.util.concurrent.CountDownLatch;
-
-/**
- * Wrapper for a runnable that waits until countdown latch is satisfied (all 
other threads running code with a call to
- * CountdownLatch.await)
- */
-public class LatchedRunnableWrapper implements Runnable {
-    private Runnable runnable;
-    private CountDownLatch latch;
-
-    public LatchedRunnableWrapper(Runnable runnable, CountDownLatch latch) {
-        this.runnable = runnable;
-        this.latch = latch;
-    }
-
-    @Override
-    public void run() {
-        try {
-            runnable.run();
-        } finally {
-            latch.countDown();
-        }
-    }
-}
diff --git 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/threadpool/ThreadPoolUtil.java
 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/threadpool/ThreadPoolUtil.java
index 01851f2..b8037e0 100644
--- 
a/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/threadpool/ThreadPoolUtil.java
+++ 
b/backend/manager/modules/utils/src/main/java/org/ovirt/engine/core/utils/threadpool/ThreadPoolUtil.java
@@ -1,5 +1,8 @@
 package org.ovirt.engine.core.utils.threadpool;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
@@ -70,6 +73,33 @@
 
     }
 
+    private static class InternalCallable<V> implements Callable<V> {
+
+        private Callable<V> job;
+        private IVdcUser vdcUser;
+        private String httpSessionId;
+
+        /**
+         * Identifies the correlation-id associated with the thread invoker
+         */
+        private String correlationId;
+
+        public InternalCallable(Callable<V> job) {
+            this.job = job;
+            this.vdcUser = ThreadLocalParamsContainer.getVdcUser();
+            this.httpSessionId = ThreadLocalParamsContainer.getHttpSessionId();
+            this.correlationId = ThreadLocalParamsContainer.getCorrelationId();
+        }
+
+        @Override
+        public V call() throws Exception {
+            ThreadLocalParamsContainer.setVdcUser(vdcUser);
+            ThreadLocalParamsContainer.setHttpSessionId(httpSessionId);
+            ThreadLocalParamsContainer.setCorrelationId(correlationId);
+            return job.call();
+        }
+    }
+
     private static final ExecutorService es = new InternalThreadExecutor();
 
     /**
@@ -78,9 +108,17 @@
      * execution results
      * @return
      */
-    public static <V> ExecutorCompletionService<V> createCompletionService() {
+    private static <V> ExecutorCompletionService<V> createCompletionService() {
         return new ExecutorCompletionService<V>(es);
-     }
+    }
+
+    private static <T> List<Callable<T>> buildSessionTasks(Collection<? 
extends Callable<T>> tasks) {
+        List<Callable<T>> sessionedTask = new ArrayList<Callable<T>>();
+        for (Callable<T> task : tasks) {
+            sessionedTask.add(new InternalCallable<T>(task));
+        }
+        return sessionedTask;
+    }
 
     /**
      * Creates a completion service to allow launching of tasks (callable 
objects)
@@ -119,4 +157,28 @@
             throw e;
         }
     }
+
+    /**
+     * Executes the given tasks, returning a list of results
+     * when all complete, in case of empty or null list a null will be return
+     * @param tasks
+     * @return
+     */
+    public static <T> List<T> invokeAll(Collection<? extends Callable<T>> 
tasks) {
+        if (tasks != null && !tasks.isEmpty()) {
+            try {
+                List<Callable<T>> sessionedTask = buildSessionTasks(tasks);
+                List<Future<T>> resultFutureList = es.invokeAll(sessionedTask);
+                List<T> resultList = new ArrayList<T>();
+                for (Future<T> future : resultFutureList) {
+                    resultList.add(future.get());
+                }
+                return resultList;
+            } catch (Exception e) {
+                log.warnFormat("The thread pool failed to execute list of 
tasks");
+                throw new RuntimeException(e);
+            }
+        }
+        return null;
+    }
 }
diff --git 
a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
 
b/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
deleted file mode 100644
index 8c633fe..0000000
--- 
a/backend/manager/modules/utils/src/test/java/org/ovirt/engine/core/utils/thread/LatchedRunnableWrapperTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/**
- *
- */
-package org.ovirt.engine.core.utils.thread;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.RejectedExecutionException;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.exceptions.base.MockitoException;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-public class LatchedRunnableWrapperTest {
-    private static final int THREADS_NUMBER = 20;
-
-    private AtomicInteger counter;
-    private LatchedRunnableExecutor latchedRunnableExecuter;
-    private CountDownLatch latch;
-    private ExecutorService threadPool;
-    private RunnableCreator runnableCreator;
-
-    private interface RunnableCreator {
-        Runnable createRunnable();
-    }
-
-    private class DummyRunnable implements Runnable {
-        @Override
-        public void run() {
-            counter.incrementAndGet();
-        }
-    }
-
-    @Before
-    public void setup() {
-        counter = new AtomicInteger();
-        threadPool = Executors.newFixedThreadPool(THREADS_NUMBER);
-        runnableCreator = new RunnableCreator() {
-
-            @Override
-            public Runnable createRunnable() {
-                return new DummyRunnable();
-            }
-        };
-    }
-
-    @Test
-    public void regularExecution() {
-        prepareMocks(THREADS_NUMBER);
-        latchedRunnableExecuter.execute();
-        assertEquals("the counter wasn't incremented the expected number of 
times", THREADS_NUMBER, counter.intValue());
-        verifyCommonExecutionChecks();
-    }
-
-    @Test
-    public void submitFullFailure() {
-        boolean gotException = false;
-        prepareMocks(0);
-        try {
-            latchedRunnableExecuter.execute();
-        } catch (RejectedExecutionException e) {
-            gotException = true;
-        }
-        assertTrue("expected RejectedExecutionException wasn't thrown", 
gotException);
-        assertEquals("the counter was incremented more times then expected", 
0, counter.intValue());
-        assertEquals("latch counter wasn't in the expected value", 
THREADS_NUMBER, latch.getCount());
-        verifyCommonFailureChecks();
-    }
-
-    @Test
-    public void submitPartialFailure() {
-        int expectedToRun = THREADS_NUMBER - 5;
-        prepareMocks(expectedToRun);
-        boolean gotException = false;
-        try {
-            latchedRunnableExecuter.execute();
-        } catch (RejectedExecutionException e) {
-            gotException = true;
-        }
-        assertTrue("expected RejectedExecutionException wasn't thrown", 
gotException);
-        assertFalse("the counter wasn't incremented the expected number of 
times", expectedToRun < counter.intValue());
-        assertTrue("latch counter value was lower than expected", 
latch.getCount() > 0);
-        assertTrue("latch counter value was greater than expected", 
latch.getCount() < THREADS_NUMBER);
-        verifyCommonFailureChecks();
-    }
-
-    /**
-     * @param runnableCreator
-     * @param isSubmitRetry
-     * @param isExecuteOnFirstRun
-     */
-    private void prepareMocks(final int countToExecute) {
-        List<Runnable> runnables = new LinkedList<Runnable>();
-        for (int index = 0; index < THREADS_NUMBER; index++) {
-            runnables.add(runnableCreator.createRunnable());
-        }
-
-        latchedRunnableExecuter = spy(new LatchedRunnableExecutor(runnables));
-        latch = spy(latchedRunnableExecuter.createCountDownLatch());
-
-        doReturn(latch).when(latchedRunnableExecuter).createCountDownLatch();
-
-        final HashSet<Runnable> executedRunnables = new HashSet<Runnable>();
-
-        doAnswer(new Answer<LatchedRunnableWrapper>() {
-            @Override
-            public LatchedRunnableWrapper answer(InvocationOnMock invocation) 
throws Throwable {
-                final LatchedRunnableWrapper toReturn =
-                        new LatchedRunnableWrapper((Runnable) 
invocation.getArguments()[0],
-                                (CountDownLatch) invocation.getArguments()[1]);
-                doAnswer(new Answer<Void>() {
-                    @Override
-                    public Void answer(InvocationOnMock invocation) throws 
Throwable {
-                        if (executedRunnables.size() < countToExecute) {
-                            threadPool.execute(toReturn);
-                            executedRunnables.add(toReturn);
-                        } else {
-                            throw new RejectedExecutionException();
-                        }
-                        return null;
-                    }
-                }).when(latchedRunnableExecuter).executeRunnable(toReturn);
-                return toReturn;
-            }
-        
}).when(latchedRunnableExecuter).createLatchedRunnableWrapper(any(Runnable.class),
 any(CountDownLatch.class));
-    }
-
-    private void verifyCommonExecutionChecks() {
-        verify(latch, times(THREADS_NUMBER)).countDown();
-        assertEquals("latch counter value wasn't in the expected value", 0, 
latch.getCount());
-        try {
-            verify(latch, times(1)).await();
-        } catch (InterruptedException e) {
-            throw new MockitoException(e.toString());
-        }
-    }
-
-    private void verifyCommonFailureChecks() {
-        try {
-            verify(latch, never()).await();
-        } catch (InterruptedException e) {
-            throw new MockitoException(e.toString());
-        }
-    }
-}


--
To view, visit http://gerrit.ovirt.org/11037
To unsubscribe, visit http://gerrit.ovirt.org/settings

Gerrit-MessageType: newchange
Gerrit-Change-Id: I34ffc04f7abf57f3d27322a5615f89d07dee16ef
Gerrit-PatchSet: 1
Gerrit-Project: ovirt-engine
Gerrit-Branch: master
Gerrit-Owner: Michael Kublin <[email protected]>
_______________________________________________
Engine-patches mailing list
[email protected]
http://lists.ovirt.org/mailman/listinfo/engine-patches

Reply via email to