Repository: oozie
Updated Branches:
  refs/heads/master a9fba2cde -> 68bcd3d38


OOZIE-3352 [tests] TestCallableQueueService#testPriorityExecutionOrder() is 
flaky (pbacsko)


Project: http://git-wip-us.apache.org/repos/asf/oozie/repo
Commit: http://git-wip-us.apache.org/repos/asf/oozie/commit/68bcd3d3
Tree: http://git-wip-us.apache.org/repos/asf/oozie/tree/68bcd3d3
Diff: http://git-wip-us.apache.org/repos/asf/oozie/diff/68bcd3d3

Branch: refs/heads/master
Commit: 68bcd3d38f8247f3f1aa06aa0e907d961332bf74
Parents: a9fba2c
Author: Andras Piros <andras.pi...@cloudera.com>
Authored: Tue Sep 25 09:48:24 2018 +0200
Committer: Andras Piros <andras.pi...@cloudera.com>
Committed: Tue Sep 25 09:48:24 2018 +0200

----------------------------------------------------------------------
 .../service/TestAsyncXCommandExecutor.java      | 64 +++++++++++++++++---
 .../oozie/service/TestCallableQueueService.java | 59 ------------------
 release-log.txt                                 |  1 +
 3 files changed, 55 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/oozie/blob/68bcd3d3/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java 
b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
index f9ec4d6..2dce409 100644
--- a/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
+++ b/core/src/test/java/org/apache/oozie/service/TestAsyncXCommandExecutor.java
@@ -18,6 +18,7 @@
 
 package org.apache.oozie.service;
 
+import java.util.ArrayList;
 import java.util.List;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
@@ -30,6 +31,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import 
org.apache.oozie.service.AsyncXCommandExecutor.AccessibleRunnableScheduledFuture;
+import org.apache.oozie.service.AsyncXCommandExecutor.PriorityComparator;
 import org.apache.oozie.service.AsyncXCommandExecutor.ScheduledXCallable;
 import org.apache.oozie.service.CallableQueueService.CallableWrapper;
 import org.apache.oozie.util.XCallable;
@@ -94,10 +96,11 @@ public class TestAsyncXCommandExecutor {
     @Before
     public void setup() {
         activeCommands = new AtomicInteger(0);
-        priorityBlockingQueue = new PriorityBlockingQueue<>();
+        priorityBlockingQueue = new PriorityBlockingQueue<>(100, new 
PriorityComparator());
         pendingCommandsPerType = new ConcurrentHashMap<>();
         delayQueue = new LinkedBlockingQueue<>();  // in reality it's not LBQ, 
but it's fine here
-        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 
DEFAULT_MAX_ACTIVE_COMMANDS, DEFAULT_MAXWAIT);
+        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 
DEFAULT_MAX_ACTIVE_COMMANDS, DEFAULT_MAXWAIT,
+                TEST_PRIORITIES);
         when(callableWrapper.filterDuplicates()).thenReturn(true);
         when(callableWrapper.getElement().getKey()).thenReturn("key");
         when(callableWrapper.getElement().getType()).thenReturn(DEFAULT_TYPE);
@@ -155,7 +158,7 @@ public class TestAsyncXCommandExecutor {
 
     @Test
     public void 
testSubmissionSuccessfulAfterDelayWhenMaxConcurrencyCheckDisabled() {
-        asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT);
+        asyncExecutor = createExecutor(false, 2, DEFAULT_MAXWAIT, 
TEST_PRIORITIES);
         when(callableWrapper.getInitialDelay()).thenReturn(100L);
         
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(50L);
         XCallable<?> wrappedCommand = mock(XCallable.class);
@@ -239,7 +242,7 @@ public class TestAsyncXCommandExecutor {
 
     @Test
     public void testSubmissionWhenQueueIsFull() {
-        asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT);
+        asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, 
TEST_PRIORITIES);
         callableWrapper = mock(CallableWrapper.class, 
Mockito.RETURNS_DEEP_STUBS);
         when(callableWrapper.filterDuplicates()).thenReturn(true);
         when(callableWrapper.getElement().getKey()).thenReturn("key");
@@ -254,7 +257,7 @@ public class TestAsyncXCommandExecutor {
 
     @Test
     public void testSubmissionWhenQueueSizeIsIgnored() {
-        asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT);
+        asyncExecutor = createExecutor(true, 2, DEFAULT_MAXWAIT, 
TEST_PRIORITIES);
         callableWrapper = mock(CallableWrapper.class, 
Mockito.RETURNS_DEEP_STUBS);
         when(callableWrapper.filterDuplicates()).thenReturn(true);
         when(callableWrapper.getElement().getKey()).thenReturn("key");
@@ -366,7 +369,7 @@ public class TestAsyncXCommandExecutor {
 
     @Test
     public void testAntiStarvationWhenDelayIsAboveMaxWait() {
-        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 
DEFAULT_MAX_ACTIVE_COMMANDS, 500);
+        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 
DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES);
         
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-40000L);
         when(callableWrapper.getPriority()).thenReturn(0);
         pendingCommandsPerType.put(DEFAULT_TYPE, 
Sets.newHashSet(callableWrapper));
@@ -391,7 +394,7 @@ public class TestAsyncXCommandExecutor {
 
     @Test
     public void testAntiStarvationWhenPriorityIsHighest() {
-        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 
DEFAULT_MAX_ACTIVE_COMMANDS, 500);
+        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 
DEFAULT_MAX_ACTIVE_COMMANDS, 500, TEST_PRIORITIES);
         
when(callableWrapper.getDelay(eq(TimeUnit.MILLISECONDS))).thenReturn(-1000L);
         when(callableWrapper.getPriority()).thenReturn(MAX_PRIORITY);
         pendingCommandsPerType.put(DEFAULT_TYPE, 
Sets.newHashSet(callableWrapper));
@@ -414,6 +417,47 @@ public class TestAsyncXCommandExecutor {
         verify(scheduledExecutor).awaitTermination(eq(1000L), 
eq(TimeUnit.MILLISECONDS));
     }
 
+    @Test
+    public void testPriorityHandling() {
+        asyncExecutor = createExecutor(DEFAULT_ENABLE_CONCURRENCY_CHECK, 100, 
DEFAULT_MAXWAIT, 100);
+        doAnswer(new Answer<Void>() {
+            @Override
+            public Void answer(InvocationOnMock invocation) throws Throwable {
+                CallableWrapper<?> wrapper = (CallableWrapper<?>) 
invocation.getArguments()[0];
+                priorityBlockingQueue.add(wrapper);
+                return null;
+            }
+        }).when(executor).execute(any(Runnable.class));
+
+        List<CallableWrapper<?>> mockedWrappers = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            CallableWrapper<?> wrapper = mock(CallableWrapper.class, 
Mockito.RETURNS_DEEP_STUBS);
+            when(wrapper.getPriority()).thenReturn(i);
+            when(wrapper.getInitialDelay()).thenReturn(0L);
+            when(wrapper.filterDuplicates()).thenReturn(true);
+            when(wrapper.getElement().getName()).thenReturn(String.valueOf(i));
+            mockedWrappers.add(wrapper);
+        }
+
+        for (CallableWrapper<?> callable : mockedWrappers) {
+            asyncExecutor.queue(callable, false);
+        }
+
+        CallableWrapper<?> firstElement = priorityBlockingQueue.poll();
+
+        CallableWrapper<?> lastElement = null;
+        CallableWrapper<?> previous = null;
+
+        do {
+            previous = lastElement;
+            lastElement = priorityBlockingQueue.poll();
+        } while (lastElement != null);
+        lastElement = previous;
+
+        assertEquals("Priority - first element", 99, 
firstElement.getPriority());
+        assertEquals("Priority - last element", 0, lastElement.getPriority());
+    }
+
     private void testIllegalPriority(int prio) {
         when(callableWrapper.getPriority()).thenReturn(prio);
 
@@ -446,7 +490,7 @@ public class TestAsyncXCommandExecutor {
     }
 
     private AsyncXCommandExecutor createExecutor(boolean 
needMaxConcurrencyCheck, int maxActiveCallables,
-            long maxWait) {
+            long maxWait, int priorities) {
         return new AsyncXCommandExecutor(needMaxConcurrencyCheck,
                 callableQueueService,
                 maxActiveCallables,
@@ -456,7 +500,7 @@ public class TestAsyncXCommandExecutor {
                 delayQueue,
                 pendingCommandsPerType,
                 activeCommands,
-                DEFAULT_MAXWAIT,
-                TEST_PRIORITIES);
+                maxWait,
+                priorities);
     }
 }

http://git-wip-us.apache.org/repos/asf/oozie/blob/68bcd3d3/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java 
b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
index 5d546ff..aec1765 100644
--- a/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
+++ b/core/src/test/java/org/apache/oozie/service/TestCallableQueueService.java
@@ -944,65 +944,6 @@ public class TestCallableQueueService extends XTestCase {
         assertTrue(uniquesAfter.toString(), uniquesAfter.isEmpty());
     }
 
-    public void testPriorityExecutionOrder() throws InterruptedException, 
ServiceException {
-        Services.get().destroy();
-        setSystemProperty(CallableQueueService.CONF_THREADS, "1");
-        setSystemProperty(CallableQueueService.CONF_QUEUE_SIZE, "1000000");
-        new Services().init();
-
-        final int taskCount = 999_999;
-        Multimap<Integer, Long> executions = 
Multimaps.synchronizedMultimap(ArrayListMultimap.create());
-        List<BookingCallable> callables = new ArrayList<>(taskCount);
-
-        for (int i = 2; i >= 0; i--) {
-            String type = String.valueOf(i);
-            for (int j = 0; j < taskCount / 3; j++) {
-                String key = type + "_" + UUID.randomUUID().toString();
-                BookingCallable dc = new BookingCallable(executions, 
taskCount, key, type, i, 0);
-                callables.add(dc);
-            }
-        }
-
-        CallableQueueService queueservice = 
Services.get().get(CallableQueueService.class);
-
-        for (int i = 0; i < taskCount; i++) {
-            queueservice.queue(callables.get(i));
-        }
-
-        try {
-            finished.await(10, TimeUnit.SECONDS);
-        } catch (Exception e) {
-            log.error("Error", e);
-            fail("Exception during test: " + e.getMessage());
-        }
-        // It's necessary because after finished.await() returns, the last 
XCallable
-        // could still be running
-        waitFor(1000, new Predicate() {
-            @Override
-            public boolean evaluate() throws Exception {
-                return queueservice.queueSize() == 0;
-            }
-        });
-
-        Map<Integer, Long> minTime = new HashMap<>();
-        Map<Integer, Long> maxTime = new HashMap<>();
-
-        for (Map.Entry<Integer, Collection<Long>> entry : 
executions.asMap().entrySet()) {
-            int prio = entry.getKey();
-            Collection<Long> values = entry.getValue();
-            minTime.put(prio, Collections.min(values));
-            maxTime.put(prio, Collections.max(values));
-        }
-
-        // Expected timeline of execution times:
-        // --> [min] Prio #2 [max] --> [min] Prio #1 [max] --> [min] Prio #0 
[max]
-
-        assertTrue("Failed: maxTime prio #2: " + maxTime.get(2) + " / minTime 
prio #1: " + minTime.get(1),
-                maxTime.get(2) <= minTime.get(1));
-        assertTrue("Failed: maxTime prio #1: " + maxTime.get(1) + " / minTime 
prio #0: " + minTime.get(0),
-                maxTime.get(1) <= minTime.get(0));
-    }
-
     public void testMaxConcurrencyReached() throws Exception {
         Services.get().destroy();
         setSystemProperty(CallableQueueService.CONF_QUEUE_SIZE, "100000");

http://git-wip-us.apache.org/repos/asf/oozie/blob/68bcd3d3/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index bb8702a..a99c399 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -1,5 +1,6 @@
 -- Oozie 5.1.0 release (trunk - unreleased)
 
+OOZIE-3352 [tests] TestCallableQueueService#testPriorityExecutionOrder() is 
flaky (pbacsko)
 OOZIE-3351 [tests] Flaky test TestMemoryLocks#testWriteLockSameThreadNoWait() 
(pbacsko)
 OOZIE-3229 [client] [ui] Improved SLA filtering options (asalamon74, 
andras.piros)
 OOZIE-3346 [examples] [action] Fix Git example. PrepareActionsHandler should 
support XML namespace prefixes (asalamon74, andras.piros)

Reply via email to