http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/BasicTasksFutureTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/task/BasicTasksFutureTest.java b/core/src/test/java/brooklyn/util/task/BasicTasksFutureTest.java deleted file mode 100644 index f1c1332..0000000 --- a/core/src/test/java/brooklyn/util/task/BasicTasksFutureTest.java +++ /dev/null @@ -1,224 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -import org.apache.brooklyn.api.management.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.base.Stopwatch; - -public class BasicTasksFutureTest { - - private static final Logger log = LoggerFactory.getLogger(BasicTasksFutureTest.class); - - private BasicExecutionManager em; - private BasicExecutionContext ec; - private Map<Object,Object> data; - private ExecutorService ex; - private Semaphore started; - private Semaphore waitInTask; - private Semaphore cancelledWhileSleeping; - - @BeforeMethod(alwaysRun=true) - public void setUp() { - em = new BasicExecutionManager("mycontext"); - ec = new BasicExecutionContext(em); - ex = Executors.newCachedThreadPool(); - data = Collections.synchronizedMap(new LinkedHashMap<Object,Object>()); - started = new Semaphore(0); - waitInTask = new Semaphore(0); - cancelledWhileSleeping = new Semaphore(0); - } - - @AfterMethod(alwaysRun=true) - public void tearDown() throws Exception { - if (em != null) em.shutdownNow(); - if (ex != null) ex.shutdownNow(); - } - - @Test - public void testBlockAndGetWithTimeoutsAndListenableFuture() throws InterruptedException { - Task<String> t = waitForSemaphore(Duration.FIVE_SECONDS, true, "x"); - - Assert.assertFalse(t.blockUntilEnded(Duration.millis(1))); - Assert.assertFalse(t.blockUntilEnded(Duration.ZERO)); - boolean didNotThrow = false; - - try { t.getUnchecked(Duration.millis(1)); didNotThrow = true; } - catch (Exception e) { /* expected */ } - Assert.assertFalse(didNotThrow); - - try { t.getUnchecked(Duration.ZERO); didNotThrow = true; } - catch (Exception e) { /* expected */ } - Assert.assertFalse(didNotThrow); - - addFutureListener(t, "before"); - ec.submit(t); - - Assert.assertFalse(t.blockUntilEnded(Duration.millis(1))); - Assert.assertFalse(t.blockUntilEnded(Duration.ZERO)); - - try { t.getUnchecked(Duration.millis(1)); didNotThrow = true; } - catch (Exception e) { /* expected */ } - Assert.assertFalse(didNotThrow); - - try { t.getUnchecked(Duration.ZERO); didNotThrow = true; } - catch (Exception e) { /* expected */ } - Assert.assertFalse(didNotThrow); - - addFutureListener(t, "during"); - - synchronized (data) { - // now let it finish - waitInTask.release(); - Assert.assertTrue(t.blockUntilEnded(Duration.TEN_SECONDS)); - - Assert.assertEquals(t.getUnchecked(Duration.millis(1)), "x"); - Assert.assertEquals(t.getUnchecked(Duration.ZERO), "x"); - - Assert.assertNull(data.get("before")); - Assert.assertNull(data.get("during")); - // can't set the data(above) until we release the lock (in assert call below) - assertSoonGetsData("before"); - assertSoonGetsData("during"); - } - - // and see that a listener added late also runs - synchronized (data) { - addFutureListener(t, "after"); - Assert.assertNull(data.get("after")); - assertSoonGetsData("after"); - } - } - - private void addFutureListener(Task<String> t, final String key) { - t.addListener(new Runnable() { public void run() { - synchronized (data) { - log.info("notifying for "+key); - data.notifyAll(); - data.put(key, true); - } - }}, ex); - } - - private void assertSoonGetsData(String key) throws InterruptedException { - for (int i=0; i<10; i++) { - if (Boolean.TRUE.equals(data.get(key))) { - log.info("got data for "+key); - return; - } - data.wait(Duration.ONE_SECOND.toMilliseconds()); - } - Assert.fail("did not get data for '"+key+"' in time"); - } - - private <T> Task<T> waitForSemaphore(final Duration time, final boolean requireSemaphore, final T result) { - return Tasks.<T>builder().body(new Callable<T>() { - public T call() { - try { - started.release(); - log.info("waiting up to "+time+" to acquire before returning "+result); - if (!waitInTask.tryAcquire(time.toMilliseconds(), TimeUnit.MILLISECONDS)) { - log.info("did not get semaphore"); - if (requireSemaphore) Assert.fail("task did not get semaphore"); - } else { - log.info("got semaphore"); - } - } catch (Exception e) { - log.info("cancelled before returning "+result); - cancelledWhileSleeping.release(); - throw Exceptions.propagate(e); - } - log.info("task returning "+result); - return result; - } - }).build(); - } - - @Test - public void testCancelAfterStartTriggersListenableFuture() throws Exception { - doTestCancelTriggersListenableFuture(Duration.millis(50)); - } - @Test - public void testCancelImmediateTriggersListenableFuture() throws Exception { - // if cancel fires after submit but before it passes to the executor, - // that needs handling separately; this doesn't guarantee this code path, - // but it happens sometimes (and it should be handled) - doTestCancelTriggersListenableFuture(Duration.ZERO); - } - public void doTestCancelTriggersListenableFuture(Duration delay) throws Exception { - Task<String> t = waitForSemaphore(Duration.TEN_SECONDS, true, "x"); - addFutureListener(t, "before"); - - Stopwatch watch = Stopwatch.createStarted(); - ec.submit(t); - - addFutureListener(t, "during"); - - log.info("test cancelling "+t+" ("+t.getClass()+") after "+delay); - // NB: two different code paths (callers to this method) for notifying futures - // depending whether task is started - Time.sleep(delay); - - synchronized (data) { - t.cancel(true); - - assertSoonGetsData("before"); - assertSoonGetsData("during"); - - addFutureListener(t, "after"); - Assert.assertNull(data.get("after")); - assertSoonGetsData("after"); - } - - Assert.assertTrue(t.isDone()); - Assert.assertTrue(t.isCancelled()); - try { - t.get(); - Assert.fail("should have thrown CancellationException"); - } catch (CancellationException e) { /* expected */ } - - Assert.assertTrue(watch.elapsed(TimeUnit.MILLISECONDS) < Duration.FIVE_SECONDS.toMilliseconds(), - Time.makeTimeStringRounded(watch.elapsed(TimeUnit.MILLISECONDS))+" is too long; should have cancelled very quickly"); - - if (started.tryAcquire()) - // if the task is begun, this should get released - Assert.assertTrue(cancelledWhileSleeping.tryAcquire(5, TimeUnit.SECONDS)); - } - -}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/CompoundTaskExecutionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/task/CompoundTaskExecutionTest.java b/core/src/test/java/brooklyn/util/task/CompoundTaskExecutionTest.java deleted file mode 100644 index 9fe4ba0..0000000 --- a/core/src/test/java/brooklyn/util/task/CompoundTaskExecutionTest.java +++ /dev/null @@ -1,252 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -import java.util.HashSet; -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Semaphore; - -import org.apache.brooklyn.api.management.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterClass; -import org.testng.annotations.BeforeClass; -import org.testng.annotations.Test; -import org.testng.collections.Lists; - -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; - -/** - * Test the operation of the {@link CompoundTask} class. - */ -public class CompoundTaskExecutionTest { - - private static final Logger LOG = LoggerFactory.getLogger(CompoundTaskExecutionTest.class); - - BasicExecutionManager em; - BasicExecutionContext ec; - - @BeforeClass - public void setup() { - em = new BasicExecutionManager("mycontext"); - ec = new BasicExecutionContext(em); - } - - @AfterClass - public void teardown() { - if (em != null) em.shutdownNow(); - em = null; - } - - private BasicTask<String> taskReturning(final String val) { - return new BasicTask<String>(new Callable<String>() { - @Override public String call() { - return val; - } - }); - } - - private BasicTask<String> slowTaskReturning(final String val, final Duration pauseTime) { - return new BasicTask<String>(new Callable<String>() { - @Override public String call() { - Time.sleep(pauseTime); - return val; - } - }); - } - - - @Test - public void runSequenceTask() throws Exception { - BasicTask<String> t1 = taskReturning("a"); - BasicTask<String> t2 = taskReturning("b"); - BasicTask<String> t3 = taskReturning("c"); - BasicTask<String> t4 = taskReturning("d"); - Task<List<String>> tSequence = ec.submit(new SequentialTask<String>(t1, t2, t3, t4)); - assertEquals(tSequence.get(), ImmutableList.of("a", "b", "c", "d")); - } - - @Test - public void testSequentialTaskFailsWhenIntermediateTaskThrowsException() throws Exception { - BasicTask<String> t1 = taskReturning("a"); - BasicTask<String> t2 = new BasicTask<String>(new Callable<String>() { - @Override public String call() throws Exception { - throw new IllegalArgumentException("forced exception"); - } - }); - BasicTask<String> t3 = taskReturning("c"); - SequentialTask<String> task = new SequentialTask<String>(t1, t2, t3); - Task<List<String>> tSequence = ec.submit(task); - - try { - tSequence.get(); - fail("t2 should have thrown an exception"); - } catch (Exception e) {} - - assertTrue(task.isDone()); - assertTrue(task.isError()); - assertTrue(t1.isDone()); - assertFalse(t1.isError()); - assertTrue(t2.isDone()); - assertTrue(t2.isError()); - // t3 not run because of t2 exception - assertFalse(t3.isDone()); - assertFalse(t3.isBegun()); - } - - @Test - public void testParallelTaskFailsWhenIntermediateTaskThrowsException() throws Exception { - // differs from test above of SequentialTask in that expect t3 to be executed, - // despite t2 failing. - // TODO Do we expect tSequence.get() to block for everything to either fail or complete, - // and then to throw exception? Currently it does *not* do that so test was previously failing. - - BasicTask<String> t1 = taskReturning("a"); - BasicTask<String> t2 = new BasicTask<String>(new Callable<String>() { - @Override public String call() throws Exception { - throw new IllegalArgumentException("forced exception"); - } - }); - BasicTask<String> t3 = slowTaskReturning("c", Duration.millis(100)); - ParallelTask<String> task = new ParallelTask<String>(t1, t2, t3); - Task<List<String>> tSequence = ec.submit(task); - - try { - tSequence.get(); - fail("t2 should have thrown an exception"); - } catch (Exception e) {} - - assertTrue(task.isDone()); - assertTrue(task.isError()); - assertTrue(t1.isDone()); - assertFalse(t1.isError()); - assertTrue(t2.isDone()); - assertTrue(t2.isError()); - assertTrue(t3.isBegun()); - assertTrue(t3.isDone()); - assertFalse(t3.isError()); - } - - @Test - public void runParallelTask() throws Exception { - BasicTask<String> t1 = taskReturning("a"); - BasicTask<String> t2 = taskReturning("b"); - BasicTask<String> t3 = taskReturning("c"); - BasicTask<String> t4 = taskReturning("d"); - Task<List<String>> tSequence = ec.submit(new ParallelTask<String>(t4, t2, t1, t3)); - assertEquals(new HashSet<String>(tSequence.get()), ImmutableSet.of("a", "b", "c", "d")); - } - - @Test - public void runParallelTaskWithDelay() throws Exception { - final Semaphore locker = new Semaphore(0); - BasicTask<String> t1 = new BasicTask<String>(new Callable<String>() { - @Override public String call() { - try { - locker.acquire(); - } catch (InterruptedException e) { - throw Throwables.propagate(e); - } - return "a"; - } - }); - BasicTask<String> t2 = taskReturning("b"); - BasicTask<String> t3 = taskReturning("c"); - BasicTask<String> t4 = taskReturning("d"); - final Task<List<String>> tSequence = ec.submit(new ParallelTask<String>(t4, t2, t1, t3)); - - assertEquals(ImmutableSet.of(t2.get(), t3.get(), t4.get()), ImmutableSet.of("b", "c", "d")); - assertFalse(t1.isDone()); - assertFalse(tSequence.isDone()); - - // get blocks until tasks have completed - Thread t = new Thread() { - @Override public void run() { - try { - tSequence.get(); - } catch (Exception e) { - throw Throwables.propagate(e); - } - locker.release(); - } - }; - t.start(); - Thread.sleep(30); - assertTrue(t.isAlive()); - - locker.release(); - - assertEquals(new HashSet<String>(tSequence.get()), ImmutableSet.of("a", "b", "c", "d")); - assertTrue(t1.isDone()); - assertTrue(tSequence.isDone()); - - locker.acquire(); - } - - @Test - public void testComplexOrdering() throws Exception { - List<String> data = new CopyOnWriteArrayList<String>(); - SequentialTask<String> taskA = new SequentialTask<String>( - appendAfterDelay(data, "a1"), appendAfterDelay(data, "a2"), appendAfterDelay(data, "a3"), appendAfterDelay(data, "a4")); - SequentialTask<String> taskB = new SequentialTask<String>( - appendAfterDelay(data, "b1"), appendAfterDelay(data, "b2"), appendAfterDelay(data, "b3"), appendAfterDelay(data, "b4")); - Task<List<String>> t = ec.submit(new ParallelTask<String>(taskA, taskB)); - t.get(); - - LOG.debug("Tasks happened in order: {}", data); - assertEquals(data.size(), 8); - assertEquals(new HashSet<String>(data), ImmutableSet.of("a1", "a2", "a3", "a4", "b1", "b2", "b3", "b4")); - - // a1, ..., a4 should be in order - List<String> as = Lists.newArrayList(), bs = Lists.newArrayList(); - for (String value : data) { - ((value.charAt(0) == 'a') ? as : bs).add(value); - } - assertEquals(as, ImmutableList.of("a1", "a2", "a3", "a4")); - assertEquals(bs, ImmutableList.of("b1", "b2", "b3", "b4")); - } - - private BasicTask<String> appendAfterDelay(final List<String> list, final String value) { - return new BasicTask<String>(new Callable<String>() { - @Override public String call() { - try { - Thread.sleep((int) (100 * Math.random())); - } catch (InterruptedException e) { - throw Throwables.propagate(e); - } - LOG.debug("running {}", value); - list.add(value); - return value; - } - }); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/DynamicSequentialTaskTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/task/DynamicSequentialTaskTest.java b/core/src/test/java/brooklyn/util/task/DynamicSequentialTaskTest.java deleted file mode 100644 index a5985fe..0000000 --- a/core/src/test/java/brooklyn/util/task/DynamicSequentialTaskTest.java +++ /dev/null @@ -1,365 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -import org.apache.brooklyn.api.management.HasTaskChildren; -import org.apache.brooklyn.api.management.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.test.Asserts; -import brooklyn.util.collections.MutableList; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.time.CountdownTimer; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -public class DynamicSequentialTaskTest { - - private static final Logger log = LoggerFactory.getLogger(DynamicSequentialTaskTest.class); - - public static final Duration TIMEOUT = Duration.TEN_SECONDS; - public static final Duration TINY_TIME = Duration.millis(20); - - BasicExecutionManager em; - BasicExecutionContext ec; - List<String> messages; - Semaphore cancellations; - Stopwatch stopwatch; - Map<String,Semaphore> monitorableJobSemaphoreMap; - Map<String,Task<String>> monitorableTasksMap; - - @BeforeMethod(alwaysRun=true) - public void setUp() { - em = new BasicExecutionManager("mycontext"); - ec = new BasicExecutionContext(em); - cancellations = new Semaphore(0); - messages = new ArrayList<String>(); - monitorableJobSemaphoreMap = MutableMap.of(); - monitorableTasksMap = MutableMap.of(); - monitorableTasksMap.clear(); - stopwatch = Stopwatch.createStarted(); - } - - @AfterMethod(alwaysRun=true) - public void tearDown() throws Exception { - if (em != null) em.shutdownNow(); - } - - @Test - public void testSimple() throws InterruptedException, ExecutionException { - Callable<String> mainJob = new Callable<String>() { - public String call() { - log.info("main job - "+Tasks.current()); - messages.add("main"); - DynamicTasks.queue( sayTask("world") ); - return "bye"; - } - }; - DynamicSequentialTask<String> t = new DynamicSequentialTask<String>(mainJob); - // this should be added before anything added when the task is invoked - t.queue(sayTask("hello")); - - Assert.assertEquals(messages, Lists.newArrayList()); - Assert.assertEquals(t.isBegun(), false); - Assert.assertEquals(Iterables.size(t.getChildren()), 1); - - ec.submit(t); - Assert.assertEquals(t.isSubmitted(), true); - Assert.assertEquals(t.getUnchecked(Duration.ONE_SECOND), "bye"); - long elapsed = t.getEndTimeUtc() - t.getSubmitTimeUtc(); - Assert.assertTrue(elapsed < 1000, "elapsed time should have been less than 1s but was "+ - Time.makeTimeString(elapsed, true)); - Assert.assertEquals(Iterables.size(t.getChildren()), 2); - Assert.assertEquals(messages.size(), 3, "expected 3 entries, but had "+messages); - // either main or hello can be first, but world should be last - Assert.assertEquals(messages.get(2), "world"); - } - - public Callable<String> sayCallable(final String message, final Duration duration, final String message2) { - return new Callable<String>() { - public String call() { - try { - if (message != null) { - log.info("saying: "+message+ " - "+Tasks.current()); - synchronized (messages) { - messages.add(message); - messages.notifyAll(); - } - } - if (message2 != null) { - log.info("will say "+message2+" after "+duration); - } - if (duration != null && duration.toMilliseconds() > 0) { - Thread.sleep(duration.toMillisecondsRoundingUp()); - } - } catch (InterruptedException e) { - cancellations.release(); - throw Exceptions.propagate(e); - } - if (message2 != null) { - log.info("saying: "+message2+ " - "+Tasks.current()); - synchronized (messages) { - messages.add(message2); - messages.notifyAll(); - } - } - return message; - } - }; - } - - public Task<String> sayTask(String message) { - return sayTask(message, null, null); - } - - public Task<String> sayTask(String message, Duration duration, String message2) { - return Tasks.<String>builder().body(sayCallable(message, duration, message2)).build(); - } - - @Test - public void testComplex() throws InterruptedException, ExecutionException { - Task<List<?>> t = Tasks.sequential( - sayTask("1"), - sayTask("2"), - Tasks.parallel(sayTask("4"), sayTask("3")), - sayTask("5") - ); - ec.submit(t); - Assert.assertEquals(t.get().size(), 4); - Asserts.assertEqualsIgnoringOrder((List<?>)t.get().get(2), ImmutableSet.of("3", "4")); - Assert.assertTrue(messages.equals(Arrays.asList("1", "2", "3", "4", "5")) || messages.equals(Arrays.asList("1", "2", "4", "3", "5")), "messages="+messages); - } - - @Test - public void testCancelled() throws InterruptedException, ExecutionException { - Task<List<?>> t = Tasks.sequential( - sayTask("1"), - sayTask("2a", Duration.THIRTY_SECONDS, "2b"), - sayTask("3")); - ec.submit(t); - synchronized (messages) { - while (messages.size() <= 1) - messages.wait(); - } - Assert.assertEquals(messages, Arrays.asList("1", "2a")); - Time.sleep(Duration.millis(50)); - t.cancel(true); - Assert.assertTrue(t.isDone()); - // 2 should get cancelled, and invoke the cancellation semaphore - // 3 should get cancelled and not run at all - Assert.assertEquals(messages, Arrays.asList("1", "2a")); - - // Need to ensure that 2 has been started; race where we might cancel it before its run method - // is even begun. Hence doing "2a; pause; 2b" where nothing is interruptable before pause. - Assert.assertTrue(cancellations.tryAcquire(10, TimeUnit.SECONDS)); - - Iterator<Task<?>> ci = ((HasTaskChildren)t).getChildren().iterator(); - Assert.assertEquals(ci.next().get(), "1"); - Task<?> task2 = ci.next(); - Assert.assertTrue(task2.isBegun()); - Assert.assertTrue(task2.isDone()); - Assert.assertTrue(task2.isCancelled()); - - Task<?> task3 = ci.next(); - Assert.assertFalse(task3.isBegun()); - Assert.assertTrue(task2.isDone()); - Assert.assertTrue(task2.isCancelled()); - - // but we do _not_ get a mutex from task3 as it does not run (is not interrupted) - Assert.assertEquals(cancellations.availablePermits(), 0); - } - - protected Task<String> monitorableTask(final String id) { - return monitorableTask(null, id, null); - } - protected Task<String> monitorableTask(final Runnable pre, final String id, final Callable<String> post) { - Task<String> t = Tasks.<String>builder().body(monitorableJob(pre, id, post)).build(); - monitorableTasksMap.put(id, t); - return t; - } - protected Callable<String> monitorableJob(final String id) { - return monitorableJob(null, id, null); - } - protected Callable<String> monitorableJob(final Runnable pre, final String id, final Callable<String> post) { - monitorableJobSemaphoreMap.put(id, new Semaphore(0)); - return new Callable<String>() { - @Override - public String call() throws Exception { - if (pre!=null) pre.run(); - // wait for semaphore - if (!monitorableJobSemaphoreMap.get(id).tryAcquire(1, TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)) - throw new IllegalStateException("timeout for "+id); - synchronized (messages) { - messages.add(id); - messages.notifyAll(); - } - if (post!=null) return post.call(); - return id; - } - }; - } - protected void releaseMonitorableJob(final String id) { - monitorableJobSemaphoreMap.get(id).release(); - } - protected void waitForMessage(final String id) { - CountdownTimer timer = CountdownTimer.newInstanceStarted(TIMEOUT); - synchronized (messages) { - while (!timer.isExpired()) { - if (messages.contains(id)) return; - timer.waitOnForExpiryUnchecked(messages); - } - } - Assert.fail("Did not see message "+id); - } - protected void releaseAndWaitForMonitorableJob(final String id) { - releaseMonitorableJob(id); - waitForMessage(id); - } - - @Test - public void testChildrenRunConcurrentlyWithPrimary() { - Task<String> t = Tasks.<String>builder().dynamic(true) - .body(monitorableJob("main")) - .add(monitorableTask("1")).add(monitorableTask("2")).build(); - ec.submit(t); - releaseAndWaitForMonitorableJob("1"); - releaseAndWaitForMonitorableJob("main"); - Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); - releaseMonitorableJob("2"); - - Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); - Assert.assertEquals(messages, MutableList.of("1", "main", "2")); - Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); - Assert.assertFalse(t.isError()); - } - - protected static class FailRunnable implements Runnable { - @Override public void run() { throw new RuntimeException("Planned exception for test"); } - } - protected static class FailCallable implements Callable<String> { - @Override public String call() { throw new RuntimeException("Planned exception for test"); } - } - - @Test - public void testByDefaultChildrenFailureAbortsSecondaryFailsPrimaryButNotAbortsPrimary() { - Task<String> t1 = monitorableTask(null, "1", new FailCallable()); - Task<String> t = Tasks.<String>builder().dynamic(true) - .body(monitorableJob("main")) - .add(t1).add(monitorableTask("2")).build(); - ec.submit(t); - releaseAndWaitForMonitorableJob("1"); - Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); - releaseMonitorableJob("main"); - - Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); - Assert.assertEquals(messages, MutableList.of("1", "main")); - Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); - Assert.assertTrue(t.isError()); - Assert.assertTrue(t1.isError()); - } - - @Test - public void testWhenSwallowingChildrenFailureDoesNotAbortSecondaryOrFailPrimary() { - Task<String> t1 = monitorableTask(null, "1", new FailCallable()); - Task<String> t = Tasks.<String>builder().dynamic(true) - .body(monitorableJob("main")) - .add(t1).add(monitorableTask("2")).swallowChildrenFailures(true).build(); - ec.submit(t); - releaseAndWaitForMonitorableJob("1"); - Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); - releaseAndWaitForMonitorableJob("2"); - Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); - releaseMonitorableJob("main"); - Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); - Assert.assertEquals(messages, MutableList.of("1", "2", "main")); - Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); - Assert.assertFalse(t.isError()); - Assert.assertTrue(t1.isError()); - } - - @Test - public void testInessentialChildrenFailureDoesNotAbortSecondaryOrFailPrimary() { - Task<String> t1 = monitorableTask(null, "1", new FailCallable()); - TaskTags.markInessential(t1); - Task<String> t = Tasks.<String>builder().dynamic(true) - .body(monitorableJob("main")) - .add(t1).add(monitorableTask("2")).build(); - ec.submit(t); - releaseAndWaitForMonitorableJob("1"); - Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); - releaseAndWaitForMonitorableJob("2"); - Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); - releaseMonitorableJob("main"); - Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); - Assert.assertEquals(messages, MutableList.of("1", "2", "main")); - Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); - Assert.assertFalse(t.isError()); - Assert.assertTrue(t1.isError()); - } - - @Test - public void testTaskBuilderUsingAddVarargChildren() { - Task<String> t = Tasks.<String>builder().dynamic(true) - .body(monitorableJob("main")) - .add(monitorableTask("1"), monitorableTask("2")) - .build(); - ec.submit(t); - releaseAndWaitForMonitorableJob("1"); - releaseAndWaitForMonitorableJob("2"); - releaseAndWaitForMonitorableJob("main"); - - Assert.assertEquals(messages, MutableList.of("1", "2", "main")); - } - - @Test - public void testTaskBuilderUsingAddAllChildren() { - Task<String> t = Tasks.<String>builder().dynamic(true) - .body(monitorableJob("main")) - .addAll(ImmutableList.of(monitorableTask("1"), monitorableTask("2"))) - .build(); - ec.submit(t); - releaseAndWaitForMonitorableJob("1"); - releaseAndWaitForMonitorableJob("2"); - releaseAndWaitForMonitorableJob("main"); - - Assert.assertEquals(messages, MutableList.of("1", "2", "main")); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/NonBasicTaskExecutionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/task/NonBasicTaskExecutionTest.java b/core/src/test/java/brooklyn/util/task/NonBasicTaskExecutionTest.java deleted file mode 100644 index 82e3919..0000000 --- a/core/src/test/java/brooklyn/util/task/NonBasicTaskExecutionTest.java +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertSame; -import static org.testng.Assert.assertTrue; - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - -import org.apache.brooklyn.api.management.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.test.Asserts; -import brooklyn.util.collections.MutableMap; - -/** - * Test the operation of the {@link BasicTask} class. - * - * TODO clarify test purpose - */ -public class NonBasicTaskExecutionTest { - private static final Logger log = LoggerFactory.getLogger(NonBasicTaskExecutionTest.class); - - private static final int TIMEOUT_MS = 10*1000; - - public static class ConcreteForwardingTask<T> extends ForwardingTask<T> { - private final TaskInternal<T> delegate; - - ConcreteForwardingTask(TaskInternal<T> delegate) { - this.delegate = delegate; - } - - @Override - protected TaskInternal<T> delegate() { - return delegate; - } - } - - private BasicExecutionManager em; - private Map<Integer,String> data; - - @BeforeMethod(alwaysRun=true) - public void setUp() throws Exception { - em = new BasicExecutionManager("mycontext"); - data = Collections.synchronizedMap(new HashMap<Integer,String>()); - } - - @AfterMethod(alwaysRun=true) - public void tearDown() throws Exception { - if (em != null) em.shutdownNow(); - } - - @Test - public void runSimpleTask() throws Exception { - TaskInternal<Object> t = new ConcreteForwardingTask<Object>(new BasicTask<Object>(new Callable<Object>() { - @Override public Object call() { - return data.put(1, "b"); - }})); - data.put(1, "a"); - Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t); - assertEquals("a", t.get()); - assertEquals("a", t2.get()); - assertSame(t, t2, "t="+t+"; t2="+t2); - assertEquals("b", data.get(1)); - } - - @Test - public void runBasicTaskWithWaits() throws Exception { - final CountDownLatch signalStarted = new CountDownLatch(1); - final CountDownLatch allowCompletion = new CountDownLatch(1); - final TaskInternal<Object> t = new ConcreteForwardingTask<Object>(new BasicTask<Object>(new Callable<Object>() { - @Override public Object call() throws Exception { - Object result = data.put(1, "b"); - signalStarted.countDown(); - assertTrue(allowCompletion.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - return result; - }})); - data.put(1, "a"); - - Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t); - assertEquals(t, t2); - assertFalse(t.isDone()); - - assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); - assertEquals("b", data.get(1)); - assertFalse(t.isDone()); - - log.debug("runBasicTaskWithWaits, BasicTask status: {}", t.getStatusDetail(false)); - - Asserts.succeedsEventually(new Runnable() { - @Override public void run() { - t.getStatusDetail(false).toLowerCase().contains("waiting"); - }}); - // "details="+t.getStatusDetail(false)) - - allowCompletion.countDown(); - assertEquals("a", t.get()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java b/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java deleted file mode 100644 index 1ae65f2..0000000 --- a/core/src/test/java/brooklyn/util/task/ScheduledExecutionTest.java +++ /dev/null @@ -1,287 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.apache.brooklyn.api.management.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.Test; - -import brooklyn.test.Asserts; -import brooklyn.util.collections.MutableMap; -import brooklyn.util.exceptions.Exceptions; -import brooklyn.util.exceptions.RuntimeInterruptedException; -import brooklyn.util.javalang.JavaClassNames; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -import com.google.common.base.Stopwatch; -import com.google.common.collect.Lists; - -@SuppressWarnings({"unchecked","rawtypes"}) -public class ScheduledExecutionTest { - - public static final Logger log = LoggerFactory.getLogger(ScheduledExecutionTest.class); - - @Test - public void testScheduledTask() throws Exception { - int PERIOD = 20; - BasicExecutionManager m = new BasicExecutionManager("mycontextid"); - final AtomicInteger i = new AtomicInteger(0); - ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD, "maxIterations", 5), new Callable<Task<?>>() { - public Task<?> call() throws Exception { - return new BasicTask<Integer>(new Callable<Integer>() { - public Integer call() { - log.debug("task running: "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); - return i.incrementAndGet(); - }}); - }}); - - log.info("submitting {} {}", t, t.getStatusDetail(false)); - m.submit(t); - log.info("submitted {} {}", t, t.getStatusDetail(false)); - Integer interimResult = (Integer) t.get(); - log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); - assertTrue(i.get() > 0, "i="+i); - t.blockUntilEnded(); - Integer finalResult = (Integer) t.get(); - log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)}); - assertEquals(finalResult, (Integer)5); - assertEquals(i.get(), 5); - } - - /** like testScheduledTask but the loop is terminated by the task itself adjusting the period */ - @Test - public void testScheduledTaskSelfEnding() throws Exception { - int PERIOD = 20; - BasicExecutionManager m = new BasicExecutionManager("mycontextid"); - final AtomicInteger i = new AtomicInteger(0); - ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD), new Callable<Task<?>>() { - public Task<?> call() throws Exception { - return new BasicTask<Integer>(new Callable<Integer>() { - public Integer call() { - ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask(); - if (i.get() >= 4) submitter.period = null; - log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); - return i.incrementAndGet(); - }}); - }}); - - log.info("submitting {} {}", t, t.getStatusDetail(false)); - m.submit(t); - log.info("submitted {} {}", t, t.getStatusDetail(false)); - Integer interimResult = (Integer) t.get(); - log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); - assertTrue(i.get() > 0); - t.blockUntilEnded(); - Integer finalResult = (Integer) t.get(); - log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)}); - assertEquals(finalResult, (Integer)5); - assertEquals(i.get(), 5); - } - - @Test - public void testScheduledTaskCancelEnding() throws Exception { - Duration PERIOD = Duration.millis(20); - BasicExecutionManager m = new BasicExecutionManager("mycontextid"); - final AtomicInteger i = new AtomicInteger(); - ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() { - public Task<?> call() throws Exception { - return new BasicTask<Integer>(new Callable<Integer>() { - public Integer call() { - log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); - ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask(); - i.incrementAndGet(); - if (i.get() >= 5) submitter.cancel(); - return i.get(); - }}); - }}); - - log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false)); - m.submit(t); - log.info("submitted {} {}", t, t.getStatusDetail(false)); - Integer interimResult = (Integer) t.get(); - log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); - assertTrue(i.get() > 0); - t.blockUntilEnded(); -// int finalResult = t.get() - log.info("ended ({}) {} {}", new Object[] {i, t, t.getStatusDetail(false)}); -// assertEquals(finalResult, 5) - assertEquals(i.get(), 5); - } - - @Test(groups="Integration") - public void testScheduledTaskCancelOuter() throws Exception { - final Duration PERIOD = Duration.millis(20); - final Duration CYCLE_DELAY = Duration.ONE_SECOND; - // this should be enough to start the next cycle, but not so much that the cycle ends; - // and enough that when a task is interrupted it terminates within this period - final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1)); - - BasicExecutionManager m = new BasicExecutionManager("mycontextid"); - final AtomicInteger i = new AtomicInteger(); - ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() { - public Task<?> call() throws Exception { - return new BasicTask<Integer>(new Callable<Integer>() { - public Integer call() { - log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); - Time.sleep(CYCLE_DELAY); - i.incrementAndGet(); - return i.get(); - }}); - }}); - - log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false)); - m.submit(t); - log.info("submitted {} {}", t, t.getStatusDetail(false)); - Integer interimResult = (Integer) t.get(); - log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); - assertEquals(i.get(), 1); - - Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); - assertEquals(t.get(), 2); - - Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); - Stopwatch timer = Stopwatch.createUnstarted(); - t.cancel(true); - t.blockUntilEnded(); -// int finalResult = t.get() - log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); - try { - t.get(); - Assert.fail("Should have failed getting result of cancelled "+t); - } catch (Exception e) { - /* expected */ - } - assertEquals(i.get(), 2); - log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); - Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY)); - } - - @Test(groups="Integration") - public void testScheduledTaskCancelInterrupts() throws Exception { - final Duration PERIOD = Duration.millis(20); - final Duration CYCLE_DELAY = Duration.ONE_SECOND; - // this should be enough to start the next cycle, but not so much that the cycle ends; - // and enough that when a task is interrupted it terminates within this period - final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1)); - - BasicExecutionManager m = new BasicExecutionManager("mycontextid"); - final Semaphore interruptedSemaphore = new Semaphore(0); - final AtomicInteger i = new AtomicInteger(); - ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() { - public Task<?> call() throws Exception { - return new BasicTask<Integer>(new Callable<Integer>() { - public Integer call() { - try { - log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); - Time.sleep(CYCLE_DELAY); - i.incrementAndGet(); - return i.get(); - } catch (RuntimeInterruptedException e) { - interruptedSemaphore.release(); - throw Exceptions.propagate(e); - } - }}); - }}); - - log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false)); - m.submit(t); - log.info("submitted {} {}", t, t.getStatusDetail(false)); - Integer interimResult = (Integer) t.get(); - log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); - assertEquals(i.get(), 1); - - Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); - assertEquals(t.get(), 2); - - Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); - Stopwatch timer = Stopwatch.createUnstarted(); - t.cancel(true); - t.blockUntilEnded(); -// int finalResult = t.get() - log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); - try { - t.get(); - Assert.fail("Should have failed getting result of cancelled "+t); - } catch (Exception e) { - /* expected */ - } - assertEquals(i.get(), 2); - Assert.assertTrue(interruptedSemaphore.tryAcquire(1, SMALL_FRACTION_OF_CYCLE_DELAY.toMilliseconds(), TimeUnit.MILLISECONDS), "child thread was not interrupted"); - log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); - Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY)); - } - - @Test(groups="Integration") - public void testScheduledTaskTakesLongerThanPeriod() throws Exception { - final int PERIOD = 1; - final int SLEEP_TIME = 100; - final int EARLY_RETURN_GRACE = 10; - BasicExecutionManager m = new BasicExecutionManager("mycontextid"); - final List<Long> execTimes = new CopyOnWriteArrayList<Long>(); - - ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD, "period", PERIOD), new Callable<Task<?>>() { - public Task<?> call() throws Exception { - return new BasicTask<Void>(new Runnable() { - public void run() { - execTimes.add(System.currentTimeMillis()); - try { - Thread.sleep(100); - } catch (InterruptedException e) { - throw Exceptions.propagate(e); - } - }}); - }}); - - m.submit(t); - - Asserts.succeedsEventually(new Runnable() { - public void run() { - assertTrue(execTimes.size() > 3, "size="+execTimes.size()); - }}); - - List<Long> timeDiffs = Lists.newArrayList(); - long prevExecTime = -1; - for (Long execTime : execTimes) { - if (prevExecTime == -1) { - prevExecTime = execTime; - } else { - timeDiffs.add(execTime - prevExecTime); - prevExecTime = execTime; - } - } - - for (Long timeDiff : timeDiffs) { - if (timeDiff < (SLEEP_TIME - EARLY_RETURN_GRACE)) fail("timeDiffs="+timeDiffs+"; execTimes="+execTimes); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/SingleThreadedSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/task/SingleThreadedSchedulerTest.java b/core/src/test/java/brooklyn/util/task/SingleThreadedSchedulerTest.java deleted file mode 100644 index 265956d..0000000 --- a/core/src/test/java/brooklyn/util/task/SingleThreadedSchedulerTest.java +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.fail; - -import java.util.List; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.test.Asserts; -import brooklyn.util.collections.MutableMap; - -import com.google.common.util.concurrent.Callables; - -public class SingleThreadedSchedulerTest { - - private static final Logger log = LoggerFactory.getLogger(SingleThreadedSchedulerTest.class); - - private BasicExecutionManager em; - - @BeforeMethod - public void setUp() { - em = new BasicExecutionManager("mycontextid"); - em.setTaskSchedulerForTag("category1", SingleThreadedScheduler.class); - } - - @AfterMethod - public void tearDown() { - if (em != null) em.shutdownNow(); - } - - @Test - public void testExecutesInOrder() throws Exception { - final int NUM_TIMES = 1000; - final List<Integer> result = new CopyOnWriteArrayList<Integer>(); - for (int i = 0; i < NUM_TIMES; i++) { - final int counter = i; - em.submit(MutableMap.of("tag", "category1"), new Runnable() { - public void run() { - result.add(counter); - }}); - } - - Asserts.succeedsEventually(new Runnable() { - @Override public void run() { - assertEquals(result.size(), NUM_TIMES); - }}); - - for (int i = 0; i < NUM_TIMES; i++) { - assertEquals(result.get(i), (Integer)i); - } - } - - @Test - public void testLargeQueueDoesNotConsumeTooManyThreads() throws Exception { - final int NUM_TIMES = 3000; - final CountDownLatch latch = new CountDownLatch(1); - BasicTask<Void> blockingTask = new BasicTask<Void>(newLatchAwaiter(latch)); - em.submit(MutableMap.of("tag", "category1"), blockingTask); - - final AtomicInteger counter = new AtomicInteger(0); - for (int i = 0; i < NUM_TIMES; i++) { - BasicTask<Void> t = new BasicTask<Void>(new Runnable() { - public void run() { - counter.incrementAndGet(); - }}); - em.submit(MutableMap.of("tag", "category1"), t); - if (i % 500 == 0) log.info("Submitted "+i+" jobs..."); - } - - Thread.sleep(100); // give it more of a chance to create the threads before we let them execute - latch.countDown(); - - Asserts.succeedsEventually(new Runnable() { - @Override public void run() { - assertEquals(counter.get(), NUM_TIMES); - }}); - } - - @Test - public void testGetResultOfQueuedTaskBeforeItExecutes() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch)); - - BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123)); - Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t); - - Thread thread = new Thread(new Runnable() { - public void run() { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - latch.countDown(); - }}); - thread.start(); - assertEquals(future.get(), (Integer)123); - } - - @Test - public void testGetResultOfQueuedTaskBeforeItExecutesWithTimeout() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch)); - - BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123)); - Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t); - - try { - assertEquals(future.get(10, TimeUnit.MILLISECONDS), (Integer)123); - fail(); - } catch (TimeoutException e) { - // success - } - } - - @Test - public void testCancelQueuedTaskBeforeItExecutes() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch)); - - final AtomicBoolean executed = new AtomicBoolean(); - BasicTask<?> t = new BasicTask<Void>(new Runnable() { - public void run() { - executed.set(true); - }}); - Future<?> future = em.submit(MutableMap.of("tag", "category1"), t); - - future.cancel(true); - latch.countDown(); - Thread.sleep(10); - try { - future.get(); - } catch (CancellationException e) { - // success - } - assertFalse(executed.get()); - } - - @Test - public void testGetResultOfQueuedTaskAfterItExecutes() throws Exception { - final CountDownLatch latch = new CountDownLatch(1); - em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch)); - - BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123)); - Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t); - - latch.countDown(); - assertEquals(future.get(), (Integer)123); - } - - private Callable<Void> newLatchAwaiter(final CountDownLatch latch) { - return new Callable<Void>() { - public Void call() throws Exception { - latch.await(); - return null; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/TaskFinalizationTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/task/TaskFinalizationTest.java b/core/src/test/java/brooklyn/util/task/TaskFinalizationTest.java deleted file mode 100644 index 51750ca..0000000 --- a/core/src/test/java/brooklyn/util/task/TaskFinalizationTest.java +++ /dev/null @@ -1,62 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.concurrent.Semaphore; -import java.util.concurrent.TimeUnit; - -import org.apache.brooklyn.api.management.Task; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.Test; - -import brooklyn.util.time.Time; - -import com.google.common.base.Stopwatch; - -public class TaskFinalizationTest { - - private static final Logger log = LoggerFactory.getLogger(TaskFinalizationTest.class); - - // integration because it can take a while (and finalizers aren't even guaranteed) - @Test(groups="Integration") - public void testFinalizerInvoked() throws InterruptedException { - BasicTask<?> t = new BasicTask<Void>(new Runnable() { public void run() { /* no op */ }}); - final Semaphore x = new Semaphore(0); - t.setFinalizer(new BasicTask.TaskFinalizer() { - public void onTaskFinalization(Task<?> t) { - synchronized (x) { - x.release(); - } - } - }); - t = null; - Stopwatch watch = Stopwatch.createStarted(); - for (int i=0; i<30; i++) { - System.gc(); System.gc(); - if (x.tryAcquire(1, TimeUnit.SECONDS)) { - log.info("finalizer ran after "+Time.makeTimeStringRounded(watch)); - return; - } - } - Assert.fail("finalizer did not run in time"); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/TasksTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/task/TasksTest.java b/core/src/test/java/brooklyn/util/task/TasksTest.java deleted file mode 100644 index 58ce24f..0000000 --- a/core/src/test/java/brooklyn/util/task/TasksTest.java +++ /dev/null @@ -1,181 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import static brooklyn.event.basic.DependentConfiguration.attributeWhenReady; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertTrue; - -import java.util.Map; -import java.util.Set; -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.management.ExecutionContext; -import org.apache.brooklyn.api.management.Task; -import org.apache.brooklyn.test.entity.TestApplication; -import org.apache.brooklyn.test.entity.TestEntity; -import org.testng.Assert; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.BrooklynAppUnitTestSupport; -import brooklyn.entity.basic.EntityFunctions; -import brooklyn.util.guava.Functionals; -import brooklyn.util.repeat.Repeater; -import brooklyn.util.time.Duration; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Iterables; -import com.google.common.util.concurrent.Callables; - - -public class TasksTest extends BrooklynAppUnitTestSupport { - - private ExecutionContext executionContext; - - @BeforeMethod(alwaysRun=true) - @Override - public void setUp() throws Exception { - super.setUp(); - executionContext = app.getExecutionContext(); - } - - @Test - public void testResolveNull() throws Exception { - assertResolvesValue(null, String.class, null); - } - - @Test - public void testResolveValueCastsToType() throws Exception { - assertResolvesValue(123, String.class, "123"); - } - - @Test - public void testResolvesAttributeWhenReady() throws Exception { - app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval"); - assertResolvesValue(attributeWhenReady(app, TestApplication.MY_ATTRIBUTE), String.class, "myval"); - } - - @Test - public void testResolvesMapWithAttributeWhenReady() throws Exception { - app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval"); - Map<?,?> orig = ImmutableMap.of("mykey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE)); - Map<?,?> expected = ImmutableMap.of("mykey", "myval"); - assertResolvesValue(orig, String.class, expected); - } - - @Test - public void testResolvesSetWithAttributeWhenReady() throws Exception { - app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval"); - Set<?> orig = ImmutableSet.of(attributeWhenReady(app, TestApplication.MY_ATTRIBUTE)); - Set<?> expected = ImmutableSet.of("myval"); - assertResolvesValue(orig, String.class, expected); - } - - @Test - public void testResolvesMapOfMapsWithAttributeWhenReady() throws Exception { - app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval"); - Map<?,?> orig = ImmutableMap.of("mykey", ImmutableMap.of("mysubkey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE))); - Map<?,?> expected = ImmutableMap.of("mykey", ImmutableMap.of("mysubkey", "myval")); - assertResolvesValue(orig, String.class, expected); - } - - @SuppressWarnings("unchecked") - @Test - public void testResolvesIterableOfMapsWithAttributeWhenReady() throws Exception { - app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval"); - // using Iterables.concat so that orig is of type FluentIterable rather than List etc - Iterable<?> orig = Iterables.concat(ImmutableList.of(ImmutableMap.of("mykey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE)))); - Iterable<Map<?,?>> expected = ImmutableList.<Map<?,?>>of(ImmutableMap.of("mykey", "myval")); - assertResolvesValue(orig, String.class, expected); - } - - private void assertResolvesValue(Object actual, Class<?> type, Object expected) throws Exception { - Object result = Tasks.resolveValue(actual, type, executionContext); - assertEquals(result, expected); - } - - @Test - public void testErrorsResolvingPropagatesOrSwallowedAllCorrectly() throws Exception { - app.setConfig(TestEntity.CONF_OBJECT, ValueResolverTest.newThrowTask(Duration.ZERO)); - Task<Object> t = Tasks.builder().body(Functionals.callable(EntityFunctions.config(TestEntity.CONF_OBJECT), app)).build(); - ValueResolver<Object> v = Tasks.resolving(t).as(Object.class).context(app.getExecutionContext()); - - ValueResolverTest.assertThrowsOnMaybe(v); - ValueResolverTest.assertThrowsOnGet(v); - - v.swallowExceptions(); - ValueResolverTest.assertMaybeIsAbsent(v); - ValueResolverTest.assertThrowsOnGet(v); - - v.defaultValue("foo"); - ValueResolverTest.assertMaybeIsAbsent(v); - assertEquals(v.clone().get(), "foo"); - assertResolvesValue(v, Object.class, "foo"); - } - - @Test - public void testRepeater() throws Exception { - Task<?> t; - - t = Tasks.requiring(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build(); - app.getExecutionContext().submit(t); - t.get(Duration.TEN_SECONDS); - - t = Tasks.testing(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build(); - app.getExecutionContext().submit(t); - Assert.assertEquals(t.get(Duration.TEN_SECONDS), true); - - t = Tasks.requiring(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build(); - app.getExecutionContext().submit(t); - try { - t.get(Duration.TEN_SECONDS); - Assert.fail("Should have failed"); - } catch (Exception e) { - // expected - } - - t = Tasks.testing(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build(); - app.getExecutionContext().submit(t); - Assert.assertEquals(t.get(Duration.TEN_SECONDS), false); - } - - @Test - public void testRepeaterDescription() throws Exception{ - final String description = "task description"; - Repeater repeater = Repeater.create(description) - .repeat(Callables.returning(null)) - .every(Duration.ONE_MILLISECOND) - .limitIterationsTo(1) - .until(new Callable<Boolean>() { - @Override - public Boolean call() { - TaskInternal<?> current = (TaskInternal<?>)Tasks.current(); - assertEquals(current.getBlockingDetails(), description); - return true; - } - }); - Task<Boolean> t = Tasks.testing(repeater).build(); - app.getExecutionContext().submit(t); - assertTrue(t.get(Duration.TEN_SECONDS)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/ValueResolverTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/task/ValueResolverTest.java b/core/src/test/java/brooklyn/util/task/ValueResolverTest.java deleted file mode 100644 index d50ff54..0000000 --- a/core/src/test/java/brooklyn/util/task/ValueResolverTest.java +++ /dev/null @@ -1,132 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task; - -import java.util.concurrent.Callable; - -import org.apache.brooklyn.api.management.ExecutionContext; -import org.apache.brooklyn.api.management.Task; -import org.testng.Assert; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.BrooklynAppUnitTestSupport; -import brooklyn.util.guava.Maybe; -import brooklyn.util.time.Duration; -import brooklyn.util.time.Time; - -/** - * see also {@link TasksTest} for more tests - */ -@Test -public class ValueResolverTest extends BrooklynAppUnitTestSupport { - - private ExecutionContext executionContext; - - @BeforeMethod(alwaysRun=true) - @Override - public void setUp() throws Exception { - super.setUp(); - executionContext = app.getExecutionContext(); - } - - public static final Task<String> newSleepTask(final Duration timeout, final String result) { - return Tasks.<String>builder().body(new Callable<String>() { - public String call() { - Time.sleep(timeout); - return result; - }} - ).build(); - } - - public static final Task<String> newThrowTask(final Duration timeout) { - return Tasks.<String>builder().body(new Callable<String>() { - public String call() { - Time.sleep(timeout); - throw new IllegalStateException("intended, during tests"); - }} - ).build(); - } - - public void testTimeoutZero() { - Maybe<String> result = Tasks.resolving(newSleepTask(Duration.TEN_SECONDS, "foo")).as(String.class).context(executionContext).timeout(Duration.ZERO).getMaybe(); - Assert.assertFalse(result.isPresent()); - } - - public void testTimeoutBig() { - Maybe<String> result = Tasks.resolving(newSleepTask(Duration.ZERO, "foo")).as(String.class).context(executionContext).timeout(Duration.TEN_SECONDS).getMaybe(); - Assert.assertEquals(result.get(), "foo"); - } - - public void testNoExecutionContextOnCompleted() { - Task<String> t = newSleepTask(Duration.ZERO, "foo"); - executionContext.submit(t).getUnchecked(); - Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe(); - Assert.assertEquals(result.get(), "foo"); - } - - public static Throwable assertThrowsOnMaybe(ValueResolver<?> result) { - try { - result = result.clone(); - result.getMaybe(); - Assert.fail("should have thrown"); - return null; - } catch (Exception e) { return e; } - } - public static Throwable assertThrowsOnGet(ValueResolver<?> result) { - result = result.clone(); - try { - result.get(); - Assert.fail("should have thrown"); - return null; - } catch (Exception e) { return e; } - } - public static <T> Maybe<T> assertMaybeIsAbsent(ValueResolver<T> result) { - result = result.clone(); - Maybe<T> maybe = result.getMaybe(); - Assert.assertFalse(maybe.isPresent()); - return maybe; - } - - public void testSwallowError() { - ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions(); - assertMaybeIsAbsent(result); - assertThrowsOnGet(result); - } - - - public void testDontSwallowError() { - ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext); - assertThrowsOnMaybe(result); - assertThrowsOnGet(result); - } - - public void testDefaultWhenSwallowError() { - ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions().defaultValue("foo"); - assertMaybeIsAbsent(result); - Assert.assertEquals(result.get(), "foo"); - } - - public void testDefaultBeforeDelayAndError() { - ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.TEN_SECONDS)).as(String.class).context(executionContext).timeout(Duration.ZERO).defaultValue("foo"); - assertMaybeIsAbsent(result); - Assert.assertEquals(result.get(), "foo"); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/brooklyn/util/task/ssh/SshTasksTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/task/ssh/SshTasksTest.java b/core/src/test/java/brooklyn/util/task/ssh/SshTasksTest.java deleted file mode 100644 index 578164f..0000000 --- a/core/src/test/java/brooklyn/util/task/ssh/SshTasksTest.java +++ /dev/null @@ -1,207 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package brooklyn.util.task.ssh; - -import java.io.File; -import java.io.IOException; - -import org.apache.brooklyn.api.location.LocationSpec; -import org.apache.brooklyn.api.management.ManagementContext; -import org.apache.brooklyn.core.management.internal.LocalManagementContext; -import org.apache.commons.io.FileUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; - -import brooklyn.entity.basic.BrooklynConfigKeys; -import brooklyn.entity.basic.Entities; - -import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation; -import org.apache.brooklyn.location.basic.SshMachineLocation; - -import brooklyn.util.net.Urls; -import brooklyn.util.os.Os; -import brooklyn.util.ssh.BashCommandsIntegrationTest; -import brooklyn.util.task.system.ProcessTaskFactory; -import brooklyn.util.task.system.ProcessTaskWrapper; - -/** - * Some tests for {@link SshTasks}. Note more tests in {@link BashCommandsIntegrationTest}, - * {@link SshEffectorTasksTest}, and {@link SoftwareEffectorTest}. - */ -public class SshTasksTest { - - private static final Logger log = LoggerFactory.getLogger(SshTasksTest.class); - - ManagementContext mgmt; - SshMachineLocation host; - File tempDir; - - boolean failureExpected; - - @BeforeMethod(alwaysRun=true) - public void setup() throws Exception { - mgmt = new LocalManagementContext(); - - LocalhostMachineProvisioningLocation lhc = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class)); - host = lhc.obtain(); - clearExpectedFailure(); - tempDir = Os.newTempDir(getClass()); - } - - @AfterMethod(alwaysRun=true) - public void tearDown() throws Exception { - if (mgmt != null) Entities.destroyAll(mgmt); - mgmt = null; - tempDir = Os.deleteRecursively(tempDir).asNullOrThrowing(); - checkExpectedFailure(); - } - - protected void checkExpectedFailure() { - if (failureExpected) { - clearExpectedFailure(); - Assert.fail("Test should have thrown an exception but it did not."); - } - } - - protected void clearExpectedFailure() { - failureExpected = false; - } - - protected void setExpectingFailure() { - failureExpected = true; - } - - - protected <T> ProcessTaskWrapper<T> submit(final ProcessTaskFactory<T> tf) { - tf.machine(host); - ProcessTaskWrapper<T> t = tf.newTask(); - mgmt.getExecutionManager().submit(t); - return t; - } - - protected SshPutTaskWrapper submit(final SshPutTaskFactory tf) { - SshPutTaskWrapper t = tf.newTask(); - mgmt.getExecutionManager().submit(t); - return t; - } - - @Test(groups="Integration") - public void testSshEchoHello() { - ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, "sleep 1 ; echo hello world")); - Assert.assertFalse(t.isDone()); - Assert.assertEquals(t.get(), (Integer)0); - Assert.assertEquals(t.getTask().getUnchecked(), (Integer)0); - Assert.assertEquals(t.getStdout().trim(), "hello world"); - } - - @Test(groups="Integration") - public void testCopyTo() throws IOException { - String fn = Urls.mergePaths(tempDir.getPath(), "f1"); - SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world")); - t.block(); - Assert.assertEquals(FileUtils.readFileToString(new File(fn)), "hello world"); - // and make sure this doesn't throw - Assert.assertTrue(t.isDone()); - Assert.assertTrue(t.isSuccessful()); - Assert.assertEquals(t.get(), null); - Assert.assertEquals(t.getExitCode(), (Integer)0); - } - - @Test(groups="Integration") - public void testCopyToFailBadSubdir() throws IOException { - String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir/file"); - SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world")); - //this doesn't fail - t.block(); - Assert.assertTrue(t.isDone()); - setExpectingFailure(); - try { - // but this does - t.get(); - } catch (Exception e) { - log.info("The error if file cannot be written is: "+e); - clearExpectedFailure(); - } - checkExpectedFailure(); - // and the results indicate failure - Assert.assertFalse(t.isSuccessful()); - Assert.assertNotNull(t.getException()); - Assert.assertNotEquals(t.getExitCode(), (Integer)0); - } - - @Test(groups="Integration") - public void testCopyToFailBadSubdirAllow() throws IOException { - String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir/file"); - SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world").allowFailure()); - //this doesn't fail - t.block(); - Assert.assertTrue(t.isDone()); - // and this doesn't fail either - Assert.assertEquals(t.get(), null); - // but it's not successful - Assert.assertNotNull(t.getException()); - Assert.assertFalse(t.isSuccessful()); - // exit code probably null, but won't be zero - Assert.assertNotEquals(t.getExitCode(), (Integer)0); - } - - @Test(groups="Integration") - public void testCopyToFailBadSubdirCreate() throws IOException { - String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir-to-create/file"); - SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world").createDirectory()); - t.block(); - // directory should be created, and file readable now - Assert.assertEquals(FileUtils.readFileToString(new File(fn)), "hello world"); - Assert.assertEquals(t.getExitCode(), (Integer)0); - } - - @Test(groups="Integration") - public void testSshFetch() throws IOException { - String fn = Urls.mergePaths(tempDir.getPath(), "f2"); - FileUtils.write(new File(fn), "hello fetched world"); - - SshFetchTaskFactory tf = SshTasks.newSshFetchTaskFactory(host, fn); - SshFetchTaskWrapper t = tf.newTask(); - mgmt.getExecutionManager().submit(t); - - t.block(); - Assert.assertTrue(t.isDone()); - Assert.assertEquals(t.get(), "hello fetched world"); - Assert.assertEquals(t.getBytes(), "hello fetched world".getBytes()); - } - - @Test(groups="Integration") - public void testSshWithHeaderProperty() { - host.setConfig(BrooklynConfigKeys.SSH_CONFIG_SCRIPT_HEADER, "#!/bin/bash -e\necho foo\n"); - ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, "echo bar")); - Assert.assertTrue(t.block().getStdout().trim().matches("foo\\s+bar"), "mismatched output was: "+t.getStdout()); - } - - @Test(groups="Integration") - public void testSshIgnoringHeaderProperty() { - host.setConfig(BrooklynConfigKeys.SSH_CONFIG_SCRIPT_HEADER, "#!/bin/bash -e\necho foo\n"); - ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, false, "echo bar")); - Assert.assertTrue(t.block().getStdout().trim().matches("bar"), "mismatched output was: "+t.getStdout()); - } - -}
