http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java new file mode 100644 index 0000000..c4f8d4c --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/DynamicSequentialTaskTest.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.management.HasTaskChildren; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.core.util.task.BasicExecutionContext; +import org.apache.brooklyn.core.util.task.BasicExecutionManager; +import org.apache.brooklyn.core.util.task.DynamicSequentialTask; +import org.apache.brooklyn.core.util.task.DynamicTasks; +import org.apache.brooklyn.core.util.task.TaskTags; +import org.apache.brooklyn.core.util.task.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.time.CountdownTimer; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +public class DynamicSequentialTaskTest { + + private static final Logger log = LoggerFactory.getLogger(DynamicSequentialTaskTest.class); + + public static final Duration TIMEOUT = Duration.TEN_SECONDS; + public static final Duration TINY_TIME = Duration.millis(20); + + BasicExecutionManager em; + BasicExecutionContext ec; + List<String> messages; + Semaphore cancellations; + Stopwatch stopwatch; + Map<String,Semaphore> monitorableJobSemaphoreMap; + Map<String,Task<String>> monitorableTasksMap; + + @BeforeMethod(alwaysRun=true) + public void setUp() { + em = new BasicExecutionManager("mycontext"); + ec = new BasicExecutionContext(em); + cancellations = new Semaphore(0); + messages = new ArrayList<String>(); + monitorableJobSemaphoreMap = MutableMap.of(); + monitorableTasksMap = MutableMap.of(); + monitorableTasksMap.clear(); + stopwatch = Stopwatch.createStarted(); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (em != null) em.shutdownNow(); + } + + @Test + public void testSimple() throws InterruptedException, ExecutionException { + Callable<String> mainJob = new Callable<String>() { + public String call() { + log.info("main job - "+Tasks.current()); + messages.add("main"); + DynamicTasks.queue( sayTask("world") ); + return "bye"; + } + }; + DynamicSequentialTask<String> t = new DynamicSequentialTask<String>(mainJob); + // this should be added before anything added when the task is invoked + t.queue(sayTask("hello")); + + Assert.assertEquals(messages, Lists.newArrayList()); + Assert.assertEquals(t.isBegun(), false); + Assert.assertEquals(Iterables.size(t.getChildren()), 1); + + ec.submit(t); + Assert.assertEquals(t.isSubmitted(), true); + Assert.assertEquals(t.getUnchecked(Duration.ONE_SECOND), "bye"); + long elapsed = t.getEndTimeUtc() - t.getSubmitTimeUtc(); + Assert.assertTrue(elapsed < 1000, "elapsed time should have been less than 1s but was "+ + Time.makeTimeString(elapsed, true)); + Assert.assertEquals(Iterables.size(t.getChildren()), 2); + Assert.assertEquals(messages.size(), 3, "expected 3 entries, but had "+messages); + // either main or hello can be first, but world should be last + Assert.assertEquals(messages.get(2), "world"); + } + + public Callable<String> sayCallable(final String message, final Duration duration, final String message2) { + return new Callable<String>() { + public String call() { + try { + if (message != null) { + log.info("saying: "+message+ " - "+Tasks.current()); + synchronized (messages) { + messages.add(message); + messages.notifyAll(); + } + } + if (message2 != null) { + log.info("will say "+message2+" after "+duration); + } + if (duration != null && duration.toMilliseconds() > 0) { + Thread.sleep(duration.toMillisecondsRoundingUp()); + } + } catch (InterruptedException e) { + cancellations.release(); + throw Exceptions.propagate(e); + } + if (message2 != null) { + log.info("saying: "+message2+ " - "+Tasks.current()); + synchronized (messages) { + messages.add(message2); + messages.notifyAll(); + } + } + return message; + } + }; + } + + public Task<String> sayTask(String message) { + return sayTask(message, null, null); + } + + public Task<String> sayTask(String message, Duration duration, String message2) { + return Tasks.<String>builder().body(sayCallable(message, duration, message2)).build(); + } + + @Test + public void testComplex() throws InterruptedException, ExecutionException { + Task<List<?>> t = Tasks.sequential( + sayTask("1"), + sayTask("2"), + Tasks.parallel(sayTask("4"), sayTask("3")), + sayTask("5") + ); + ec.submit(t); + Assert.assertEquals(t.get().size(), 4); + Asserts.assertEqualsIgnoringOrder((List<?>)t.get().get(2), ImmutableSet.of("3", "4")); + Assert.assertTrue(messages.equals(Arrays.asList("1", "2", "3", "4", "5")) || messages.equals(Arrays.asList("1", "2", "4", "3", "5")), "messages="+messages); + } + + @Test + public void testCancelled() throws InterruptedException, ExecutionException { + Task<List<?>> t = Tasks.sequential( + sayTask("1"), + sayTask("2a", Duration.THIRTY_SECONDS, "2b"), + sayTask("3")); + ec.submit(t); + synchronized (messages) { + while (messages.size() <= 1) + messages.wait(); + } + Assert.assertEquals(messages, Arrays.asList("1", "2a")); + Time.sleep(Duration.millis(50)); + t.cancel(true); + Assert.assertTrue(t.isDone()); + // 2 should get cancelled, and invoke the cancellation semaphore + // 3 should get cancelled and not run at all + Assert.assertEquals(messages, Arrays.asList("1", "2a")); + + // Need to ensure that 2 has been started; race where we might cancel it before its run method + // is even begun. Hence doing "2a; pause; 2b" where nothing is interruptable before pause. + Assert.assertTrue(cancellations.tryAcquire(10, TimeUnit.SECONDS)); + + Iterator<Task<?>> ci = ((HasTaskChildren)t).getChildren().iterator(); + Assert.assertEquals(ci.next().get(), "1"); + Task<?> task2 = ci.next(); + Assert.assertTrue(task2.isBegun()); + Assert.assertTrue(task2.isDone()); + Assert.assertTrue(task2.isCancelled()); + + Task<?> task3 = ci.next(); + Assert.assertFalse(task3.isBegun()); + Assert.assertTrue(task2.isDone()); + Assert.assertTrue(task2.isCancelled()); + + // but we do _not_ get a mutex from task3 as it does not run (is not interrupted) + Assert.assertEquals(cancellations.availablePermits(), 0); + } + + protected Task<String> monitorableTask(final String id) { + return monitorableTask(null, id, null); + } + protected Task<String> monitorableTask(final Runnable pre, final String id, final Callable<String> post) { + Task<String> t = Tasks.<String>builder().body(monitorableJob(pre, id, post)).build(); + monitorableTasksMap.put(id, t); + return t; + } + protected Callable<String> monitorableJob(final String id) { + return monitorableJob(null, id, null); + } + protected Callable<String> monitorableJob(final Runnable pre, final String id, final Callable<String> post) { + monitorableJobSemaphoreMap.put(id, new Semaphore(0)); + return new Callable<String>() { + @Override + public String call() throws Exception { + if (pre!=null) pre.run(); + // wait for semaphore + if (!monitorableJobSemaphoreMap.get(id).tryAcquire(1, TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS)) + throw new IllegalStateException("timeout for "+id); + synchronized (messages) { + messages.add(id); + messages.notifyAll(); + } + if (post!=null) return post.call(); + return id; + } + }; + } + protected void releaseMonitorableJob(final String id) { + monitorableJobSemaphoreMap.get(id).release(); + } + protected void waitForMessage(final String id) { + CountdownTimer timer = CountdownTimer.newInstanceStarted(TIMEOUT); + synchronized (messages) { + while (!timer.isExpired()) { + if (messages.contains(id)) return; + timer.waitOnForExpiryUnchecked(messages); + } + } + Assert.fail("Did not see message "+id); + } + protected void releaseAndWaitForMonitorableJob(final String id) { + releaseMonitorableJob(id); + waitForMessage(id); + } + + @Test + public void testChildrenRunConcurrentlyWithPrimary() { + Task<String> t = Tasks.<String>builder().dynamic(true) + .body(monitorableJob("main")) + .add(monitorableTask("1")).add(monitorableTask("2")).build(); + ec.submit(t); + releaseAndWaitForMonitorableJob("1"); + releaseAndWaitForMonitorableJob("main"); + Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); + releaseMonitorableJob("2"); + + Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); + Assert.assertEquals(messages, MutableList.of("1", "main", "2")); + Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); + Assert.assertFalse(t.isError()); + } + + protected static class FailRunnable implements Runnable { + @Override public void run() { throw new RuntimeException("Planned exception for test"); } + } + protected static class FailCallable implements Callable<String> { + @Override public String call() { throw new RuntimeException("Planned exception for test"); } + } + + @Test + public void testByDefaultChildrenFailureAbortsSecondaryFailsPrimaryButNotAbortsPrimary() { + Task<String> t1 = monitorableTask(null, "1", new FailCallable()); + Task<String> t = Tasks.<String>builder().dynamic(true) + .body(monitorableJob("main")) + .add(t1).add(monitorableTask("2")).build(); + ec.submit(t); + releaseAndWaitForMonitorableJob("1"); + Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); + releaseMonitorableJob("main"); + + Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); + Assert.assertEquals(messages, MutableList.of("1", "main")); + Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); + Assert.assertTrue(t.isError()); + Assert.assertTrue(t1.isError()); + } + + @Test + public void testWhenSwallowingChildrenFailureDoesNotAbortSecondaryOrFailPrimary() { + Task<String> t1 = monitorableTask(null, "1", new FailCallable()); + Task<String> t = Tasks.<String>builder().dynamic(true) + .body(monitorableJob("main")) + .add(t1).add(monitorableTask("2")).swallowChildrenFailures(true).build(); + ec.submit(t); + releaseAndWaitForMonitorableJob("1"); + Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); + releaseAndWaitForMonitorableJob("2"); + Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); + releaseMonitorableJob("main"); + Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); + Assert.assertEquals(messages, MutableList.of("1", "2", "main")); + Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); + Assert.assertFalse(t.isError()); + Assert.assertTrue(t1.isError()); + } + + @Test + public void testInessentialChildrenFailureDoesNotAbortSecondaryOrFailPrimary() { + Task<String> t1 = monitorableTask(null, "1", new FailCallable()); + TaskTags.markInessential(t1); + Task<String> t = Tasks.<String>builder().dynamic(true) + .body(monitorableJob("main")) + .add(t1).add(monitorableTask("2")).build(); + ec.submit(t); + releaseAndWaitForMonitorableJob("1"); + Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); + releaseAndWaitForMonitorableJob("2"); + Assert.assertFalse(t.blockUntilEnded(TINY_TIME)); + releaseMonitorableJob("main"); + Assert.assertTrue(t.blockUntilEnded(TIMEOUT)); + Assert.assertEquals(messages, MutableList.of("1", "2", "main")); + Assert.assertTrue(stopwatch.elapsed(TimeUnit.MILLISECONDS) < TIMEOUT.toMilliseconds(), "took too long: "+stopwatch); + Assert.assertFalse(t.isError()); + Assert.assertTrue(t1.isError()); + } + + @Test + public void testTaskBuilderUsingAddVarargChildren() { + Task<String> t = Tasks.<String>builder().dynamic(true) + .body(monitorableJob("main")) + .add(monitorableTask("1"), monitorableTask("2")) + .build(); + ec.submit(t); + releaseAndWaitForMonitorableJob("1"); + releaseAndWaitForMonitorableJob("2"); + releaseAndWaitForMonitorableJob("main"); + + Assert.assertEquals(messages, MutableList.of("1", "2", "main")); + } + + @Test + public void testTaskBuilderUsingAddAllChildren() { + Task<String> t = Tasks.<String>builder().dynamic(true) + .body(monitorableJob("main")) + .addAll(ImmutableList.of(monitorableTask("1"), monitorableTask("2"))) + .build(); + ec.submit(t); + releaseAndWaitForMonitorableJob("1"); + releaseAndWaitForMonitorableJob("2"); + releaseAndWaitForMonitorableJob("main"); + + Assert.assertEquals(messages, MutableList.of("1", "2", "main")); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java new file mode 100644 index 0000000..980a701 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/NonBasicTaskExecutionTest.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertSame; +import static org.testng.Assert.assertTrue; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.core.util.task.BasicExecutionManager; +import org.apache.brooklyn.core.util.task.BasicTask; +import org.apache.brooklyn.core.util.task.ForwardingTask; +import org.apache.brooklyn.core.util.task.TaskInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; + +/** + * Test the operation of the {@link BasicTask} class. + * + * TODO clarify test purpose + */ +public class NonBasicTaskExecutionTest { + private static final Logger log = LoggerFactory.getLogger(NonBasicTaskExecutionTest.class); + + private static final int TIMEOUT_MS = 10*1000; + + public static class ConcreteForwardingTask<T> extends ForwardingTask<T> { + private final TaskInternal<T> delegate; + + ConcreteForwardingTask(TaskInternal<T> delegate) { + this.delegate = delegate; + } + + @Override + protected TaskInternal<T> delegate() { + return delegate; + } + } + + private BasicExecutionManager em; + private Map<Integer,String> data; + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + em = new BasicExecutionManager("mycontext"); + data = Collections.synchronizedMap(new HashMap<Integer,String>()); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (em != null) em.shutdownNow(); + } + + @Test + public void runSimpleTask() throws Exception { + TaskInternal<Object> t = new ConcreteForwardingTask<Object>(new BasicTask<Object>(new Callable<Object>() { + @Override public Object call() { + return data.put(1, "b"); + }})); + data.put(1, "a"); + Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t); + assertEquals("a", t.get()); + assertEquals("a", t2.get()); + assertSame(t, t2, "t="+t+"; t2="+t2); + assertEquals("b", data.get(1)); + } + + @Test + public void runBasicTaskWithWaits() throws Exception { + final CountDownLatch signalStarted = new CountDownLatch(1); + final CountDownLatch allowCompletion = new CountDownLatch(1); + final TaskInternal<Object> t = new ConcreteForwardingTask<Object>(new BasicTask<Object>(new Callable<Object>() { + @Override public Object call() throws Exception { + Object result = data.put(1, "b"); + signalStarted.countDown(); + assertTrue(allowCompletion.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + return result; + }})); + data.put(1, "a"); + + Task<?> t2 = em.submit(MutableMap.of("tag", "A"), t); + assertEquals(t, t2); + assertFalse(t.isDone()); + + assertTrue(signalStarted.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)); + assertEquals("b", data.get(1)); + assertFalse(t.isDone()); + + log.debug("runBasicTaskWithWaits, BasicTask status: {}", t.getStatusDetail(false)); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + t.getStatusDetail(false).toLowerCase().contains("waiting"); + }}); + // "details="+t.getStatusDetail(false)) + + allowCompletion.countDown(); + assertEquals("a", t.get()); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java new file mode 100644 index 0000000..3b338da --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/ScheduledExecutionTest.java @@ -0,0 +1,291 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; +import static org.testng.Assert.fail; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.core.util.task.BasicExecutionManager; +import org.apache.brooklyn.core.util.task.BasicTask; +import org.apache.brooklyn.core.util.task.ScheduledTask; +import org.apache.brooklyn.core.util.task.Tasks; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.exceptions.RuntimeInterruptedException; +import brooklyn.util.javalang.JavaClassNames; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.base.Stopwatch; +import com.google.common.collect.Lists; + +@SuppressWarnings({"unchecked","rawtypes"}) +public class ScheduledExecutionTest { + + public static final Logger log = LoggerFactory.getLogger(ScheduledExecutionTest.class); + + @Test + public void testScheduledTask() throws Exception { + int PERIOD = 20; + BasicExecutionManager m = new BasicExecutionManager("mycontextid"); + final AtomicInteger i = new AtomicInteger(0); + ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD, "maxIterations", 5), new Callable<Task<?>>() { + public Task<?> call() throws Exception { + return new BasicTask<Integer>(new Callable<Integer>() { + public Integer call() { + log.debug("task running: "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); + return i.incrementAndGet(); + }}); + }}); + + log.info("submitting {} {}", t, t.getStatusDetail(false)); + m.submit(t); + log.info("submitted {} {}", t, t.getStatusDetail(false)); + Integer interimResult = (Integer) t.get(); + log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); + assertTrue(i.get() > 0, "i="+i); + t.blockUntilEnded(); + Integer finalResult = (Integer) t.get(); + log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)}); + assertEquals(finalResult, (Integer)5); + assertEquals(i.get(), 5); + } + + /** like testScheduledTask but the loop is terminated by the task itself adjusting the period */ + @Test + public void testScheduledTaskSelfEnding() throws Exception { + int PERIOD = 20; + BasicExecutionManager m = new BasicExecutionManager("mycontextid"); + final AtomicInteger i = new AtomicInteger(0); + ScheduledTask t = new ScheduledTask(MutableMap.of("delay", 2*PERIOD, "period", PERIOD), new Callable<Task<?>>() { + public Task<?> call() throws Exception { + return new BasicTask<Integer>(new Callable<Integer>() { + public Integer call() { + ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask(); + if (i.get() >= 4) submitter.period = null; + log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); + return i.incrementAndGet(); + }}); + }}); + + log.info("submitting {} {}", t, t.getStatusDetail(false)); + m.submit(t); + log.info("submitted {} {}", t, t.getStatusDetail(false)); + Integer interimResult = (Integer) t.get(); + log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); + assertTrue(i.get() > 0); + t.blockUntilEnded(); + Integer finalResult = (Integer) t.get(); + log.info("ended ({}) {} {}", new Object[] {finalResult, t, t.getStatusDetail(false)}); + assertEquals(finalResult, (Integer)5); + assertEquals(i.get(), 5); + } + + @Test + public void testScheduledTaskCancelEnding() throws Exception { + Duration PERIOD = Duration.millis(20); + BasicExecutionManager m = new BasicExecutionManager("mycontextid"); + final AtomicInteger i = new AtomicInteger(); + ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() { + public Task<?> call() throws Exception { + return new BasicTask<Integer>(new Callable<Integer>() { + public Integer call() { + log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); + ScheduledTask submitter = (ScheduledTask) ((BasicTask)Tasks.current()).getSubmittedByTask(); + i.incrementAndGet(); + if (i.get() >= 5) submitter.cancel(); + return i.get(); + }}); + }}); + + log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false)); + m.submit(t); + log.info("submitted {} {}", t, t.getStatusDetail(false)); + Integer interimResult = (Integer) t.get(); + log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); + assertTrue(i.get() > 0); + t.blockUntilEnded(); +// int finalResult = t.get() + log.info("ended ({}) {} {}", new Object[] {i, t, t.getStatusDetail(false)}); +// assertEquals(finalResult, 5) + assertEquals(i.get(), 5); + } + + @Test(groups="Integration") + public void testScheduledTaskCancelOuter() throws Exception { + final Duration PERIOD = Duration.millis(20); + final Duration CYCLE_DELAY = Duration.ONE_SECOND; + // this should be enough to start the next cycle, but not so much that the cycle ends; + // and enough that when a task is interrupted it terminates within this period + final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1)); + + BasicExecutionManager m = new BasicExecutionManager("mycontextid"); + final AtomicInteger i = new AtomicInteger(); + ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() { + public Task<?> call() throws Exception { + return new BasicTask<Integer>(new Callable<Integer>() { + public Integer call() { + log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); + Time.sleep(CYCLE_DELAY); + i.incrementAndGet(); + return i.get(); + }}); + }}); + + log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false)); + m.submit(t); + log.info("submitted {} {}", t, t.getStatusDetail(false)); + Integer interimResult = (Integer) t.get(); + log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); + assertEquals(i.get(), 1); + + Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); + assertEquals(t.get(), 2); + + Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); + Stopwatch timer = Stopwatch.createUnstarted(); + t.cancel(true); + t.blockUntilEnded(); +// int finalResult = t.get() + log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); + try { + t.get(); + Assert.fail("Should have failed getting result of cancelled "+t); + } catch (Exception e) { + /* expected */ + } + assertEquals(i.get(), 2); + log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); + Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY)); + } + + @Test(groups="Integration") + public void testScheduledTaskCancelInterrupts() throws Exception { + final Duration PERIOD = Duration.millis(20); + final Duration CYCLE_DELAY = Duration.ONE_SECOND; + // this should be enough to start the next cycle, but not so much that the cycle ends; + // and enough that when a task is interrupted it terminates within this period + final Duration SMALL_FRACTION_OF_CYCLE_DELAY = PERIOD.add(CYCLE_DELAY.multiply(0.1)); + + BasicExecutionManager m = new BasicExecutionManager("mycontextid"); + final Semaphore interruptedSemaphore = new Semaphore(0); + final AtomicInteger i = new AtomicInteger(); + ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD.times(2), "period", PERIOD), new Callable<Task<?>>() { + public Task<?> call() throws Exception { + return new BasicTask<Integer>(new Callable<Integer>() { + public Integer call() { + try { + log.info("task running ("+i+"): "+Tasks.current()+" "+Tasks.current().getStatusDetail(false)); + Time.sleep(CYCLE_DELAY); + i.incrementAndGet(); + return i.get(); + } catch (RuntimeInterruptedException e) { + interruptedSemaphore.release(); + throw Exceptions.propagate(e); + } + }}); + }}); + + log.info(JavaClassNames.niceClassAndMethod()+" - submitting {} {}", t, t.getStatusDetail(false)); + m.submit(t); + log.info("submitted {} {}", t, t.getStatusDetail(false)); + Integer interimResult = (Integer) t.get(); + log.info("done one ({}) {} {}", new Object[] {interimResult, t, t.getStatusDetail(false)}); + assertEquals(i.get(), 1); + + Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); + assertEquals(t.get(), 2); + + Time.sleep(SMALL_FRACTION_OF_CYCLE_DELAY); + Stopwatch timer = Stopwatch.createUnstarted(); + t.cancel(true); + t.blockUntilEnded(); +// int finalResult = t.get() + log.info("blocked until ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); + try { + t.get(); + Assert.fail("Should have failed getting result of cancelled "+t); + } catch (Exception e) { + /* expected */ + } + assertEquals(i.get(), 2); + Assert.assertTrue(interruptedSemaphore.tryAcquire(1, SMALL_FRACTION_OF_CYCLE_DELAY.toMilliseconds(), TimeUnit.MILLISECONDS), "child thread was not interrupted"); + log.info("ended ({}) {} {}, in {}", new Object[] {i, t, t.getStatusDetail(false), Duration.of(timer)}); + Assert.assertTrue(Duration.of(timer).isShorterThan(SMALL_FRACTION_OF_CYCLE_DELAY)); + } + + @Test(groups="Integration") + public void testScheduledTaskTakesLongerThanPeriod() throws Exception { + final int PERIOD = 1; + final int SLEEP_TIME = 100; + final int EARLY_RETURN_GRACE = 10; + BasicExecutionManager m = new BasicExecutionManager("mycontextid"); + final List<Long> execTimes = new CopyOnWriteArrayList<Long>(); + + ScheduledTask t = new ScheduledTask(MutableMap.of("delay", PERIOD, "period", PERIOD), new Callable<Task<?>>() { + public Task<?> call() throws Exception { + return new BasicTask<Void>(new Runnable() { + public void run() { + execTimes.add(System.currentTimeMillis()); + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + }}); + }}); + + m.submit(t); + + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertTrue(execTimes.size() > 3, "size="+execTimes.size()); + }}); + + List<Long> timeDiffs = Lists.newArrayList(); + long prevExecTime = -1; + for (Long execTime : execTimes) { + if (prevExecTime == -1) { + prevExecTime = execTime; + } else { + timeDiffs.add(execTime - prevExecTime); + prevExecTime = execTime; + } + } + + for (Long timeDiff : timeDiffs) { + if (timeDiff < (SLEEP_TIME - EARLY_RETURN_GRACE)) fail("timeDiffs="+timeDiffs+"; execTimes="+execTimes); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java new file mode 100644 index 0000000..e3420c8 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/SingleThreadedSchedulerTest.java @@ -0,0 +1,195 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.fail; + +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.brooklyn.core.util.task.BasicExecutionManager; +import org.apache.brooklyn.core.util.task.BasicTask; +import org.apache.brooklyn.core.util.task.SingleThreadedScheduler; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.test.Asserts; +import brooklyn.util.collections.MutableMap; + +import com.google.common.util.concurrent.Callables; + +public class SingleThreadedSchedulerTest { + + private static final Logger log = LoggerFactory.getLogger(SingleThreadedSchedulerTest.class); + + private BasicExecutionManager em; + + @BeforeMethod + public void setUp() { + em = new BasicExecutionManager("mycontextid"); + em.setTaskSchedulerForTag("category1", SingleThreadedScheduler.class); + } + + @AfterMethod + public void tearDown() { + if (em != null) em.shutdownNow(); + } + + @Test + public void testExecutesInOrder() throws Exception { + final int NUM_TIMES = 1000; + final List<Integer> result = new CopyOnWriteArrayList<Integer>(); + for (int i = 0; i < NUM_TIMES; i++) { + final int counter = i; + em.submit(MutableMap.of("tag", "category1"), new Runnable() { + public void run() { + result.add(counter); + }}); + } + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(result.size(), NUM_TIMES); + }}); + + for (int i = 0; i < NUM_TIMES; i++) { + assertEquals(result.get(i), (Integer)i); + } + } + + @Test + public void testLargeQueueDoesNotConsumeTooManyThreads() throws Exception { + final int NUM_TIMES = 3000; + final CountDownLatch latch = new CountDownLatch(1); + BasicTask<Void> blockingTask = new BasicTask<Void>(newLatchAwaiter(latch)); + em.submit(MutableMap.of("tag", "category1"), blockingTask); + + final AtomicInteger counter = new AtomicInteger(0); + for (int i = 0; i < NUM_TIMES; i++) { + BasicTask<Void> t = new BasicTask<Void>(new Runnable() { + public void run() { + counter.incrementAndGet(); + }}); + em.submit(MutableMap.of("tag", "category1"), t); + if (i % 500 == 0) log.info("Submitted "+i+" jobs..."); + } + + Thread.sleep(100); // give it more of a chance to create the threads before we let them execute + latch.countDown(); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(counter.get(), NUM_TIMES); + }}); + } + + @Test + public void testGetResultOfQueuedTaskBeforeItExecutes() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch)); + + BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123)); + Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t); + + Thread thread = new Thread(new Runnable() { + public void run() { + try { + Thread.sleep(10); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + latch.countDown(); + }}); + thread.start(); + assertEquals(future.get(), (Integer)123); + } + + @Test + public void testGetResultOfQueuedTaskBeforeItExecutesWithTimeout() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch)); + + BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123)); + Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t); + + try { + assertEquals(future.get(10, TimeUnit.MILLISECONDS), (Integer)123); + fail(); + } catch (TimeoutException e) { + // success + } + } + + @Test + public void testCancelQueuedTaskBeforeItExecutes() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch)); + + final AtomicBoolean executed = new AtomicBoolean(); + BasicTask<?> t = new BasicTask<Void>(new Runnable() { + public void run() { + executed.set(true); + }}); + Future<?> future = em.submit(MutableMap.of("tag", "category1"), t); + + future.cancel(true); + latch.countDown(); + Thread.sleep(10); + try { + future.get(); + } catch (CancellationException e) { + // success + } + assertFalse(executed.get()); + } + + @Test + public void testGetResultOfQueuedTaskAfterItExecutes() throws Exception { + final CountDownLatch latch = new CountDownLatch(1); + em.submit(MutableMap.of("tag", "category1"), newLatchAwaiter(latch)); + + BasicTask<Integer> t = new BasicTask<Integer>(Callables.returning(123)); + Future<Integer> future = em.submit(MutableMap.of("tag", "category1"), t); + + latch.countDown(); + assertEquals(future.get(), (Integer)123); + } + + private Callable<Void> newLatchAwaiter(final CountDownLatch latch) { + return new Callable<Void>() { + public Void call() throws Exception { + latch.await(); + return null; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java new file mode 100644 index 0000000..1ff181b --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/TaskFinalizationTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.core.util.task.BasicTask; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.Test; + +import brooklyn.util.time.Time; + +import com.google.common.base.Stopwatch; + +public class TaskFinalizationTest { + + private static final Logger log = LoggerFactory.getLogger(TaskFinalizationTest.class); + + // integration because it can take a while (and finalizers aren't even guaranteed) + @Test(groups="Integration") + public void testFinalizerInvoked() throws InterruptedException { + BasicTask<?> t = new BasicTask<Void>(new Runnable() { public void run() { /* no op */ }}); + final Semaphore x = new Semaphore(0); + t.setFinalizer(new BasicTask.TaskFinalizer() { + public void onTaskFinalization(Task<?> t) { + synchronized (x) { + x.release(); + } + } + }); + t = null; + Stopwatch watch = Stopwatch.createStarted(); + for (int i=0; i<30; i++) { + System.gc(); System.gc(); + if (x.tryAcquire(1, TimeUnit.SECONDS)) { + log.info("finalizer ran after "+Time.makeTimeStringRounded(watch)); + return; + } + } + Assert.fail("finalizer did not run in time"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java new file mode 100644 index 0000000..0800984 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/TasksTest.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import static brooklyn.event.basic.DependentConfiguration.attributeWhenReady; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.management.ExecutionContext; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.core.util.task.TaskInternal; +import org.apache.brooklyn.core.util.task.Tasks; +import org.apache.brooklyn.core.util.task.ValueResolver; +import org.apache.brooklyn.test.entity.TestApplication; +import org.apache.brooklyn.test.entity.TestEntity; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.BrooklynAppUnitTestSupport; +import brooklyn.entity.basic.EntityFunctions; +import brooklyn.util.guava.Functionals; +import brooklyn.util.repeat.Repeater; +import brooklyn.util.time.Duration; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Iterables; +import com.google.common.util.concurrent.Callables; + + +public class TasksTest extends BrooklynAppUnitTestSupport { + + private ExecutionContext executionContext; + + @BeforeMethod(alwaysRun=true) + @Override + public void setUp() throws Exception { + super.setUp(); + executionContext = app.getExecutionContext(); + } + + @Test + public void testResolveNull() throws Exception { + assertResolvesValue(null, String.class, null); + } + + @Test + public void testResolveValueCastsToType() throws Exception { + assertResolvesValue(123, String.class, "123"); + } + + @Test + public void testResolvesAttributeWhenReady() throws Exception { + app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval"); + assertResolvesValue(attributeWhenReady(app, TestApplication.MY_ATTRIBUTE), String.class, "myval"); + } + + @Test + public void testResolvesMapWithAttributeWhenReady() throws Exception { + app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval"); + Map<?,?> orig = ImmutableMap.of("mykey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE)); + Map<?,?> expected = ImmutableMap.of("mykey", "myval"); + assertResolvesValue(orig, String.class, expected); + } + + @Test + public void testResolvesSetWithAttributeWhenReady() throws Exception { + app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval"); + Set<?> orig = ImmutableSet.of(attributeWhenReady(app, TestApplication.MY_ATTRIBUTE)); + Set<?> expected = ImmutableSet.of("myval"); + assertResolvesValue(orig, String.class, expected); + } + + @Test + public void testResolvesMapOfMapsWithAttributeWhenReady() throws Exception { + app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval"); + Map<?,?> orig = ImmutableMap.of("mykey", ImmutableMap.of("mysubkey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE))); + Map<?,?> expected = ImmutableMap.of("mykey", ImmutableMap.of("mysubkey", "myval")); + assertResolvesValue(orig, String.class, expected); + } + + @SuppressWarnings("unchecked") + @Test + public void testResolvesIterableOfMapsWithAttributeWhenReady() throws Exception { + app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval"); + // using Iterables.concat so that orig is of type FluentIterable rather than List etc + Iterable<?> orig = Iterables.concat(ImmutableList.of(ImmutableMap.of("mykey", attributeWhenReady(app, TestApplication.MY_ATTRIBUTE)))); + Iterable<Map<?,?>> expected = ImmutableList.<Map<?,?>>of(ImmutableMap.of("mykey", "myval")); + assertResolvesValue(orig, String.class, expected); + } + + private void assertResolvesValue(Object actual, Class<?> type, Object expected) throws Exception { + Object result = Tasks.resolveValue(actual, type, executionContext); + assertEquals(result, expected); + } + + @Test + public void testErrorsResolvingPropagatesOrSwallowedAllCorrectly() throws Exception { + app.setConfig(TestEntity.CONF_OBJECT, ValueResolverTest.newThrowTask(Duration.ZERO)); + Task<Object> t = Tasks.builder().body(Functionals.callable(EntityFunctions.config(TestEntity.CONF_OBJECT), app)).build(); + ValueResolver<Object> v = Tasks.resolving(t).as(Object.class).context(app.getExecutionContext()); + + ValueResolverTest.assertThrowsOnMaybe(v); + ValueResolverTest.assertThrowsOnGet(v); + + v.swallowExceptions(); + ValueResolverTest.assertMaybeIsAbsent(v); + ValueResolverTest.assertThrowsOnGet(v); + + v.defaultValue("foo"); + ValueResolverTest.assertMaybeIsAbsent(v); + assertEquals(v.clone().get(), "foo"); + assertResolvesValue(v, Object.class, "foo"); + } + + @Test + public void testRepeater() throws Exception { + Task<?> t; + + t = Tasks.requiring(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build(); + app.getExecutionContext().submit(t); + t.get(Duration.TEN_SECONDS); + + t = Tasks.testing(Repeater.create().until(Callables.returning(true)).every(Duration.millis(1))).build(); + app.getExecutionContext().submit(t); + Assert.assertEquals(t.get(Duration.TEN_SECONDS), true); + + t = Tasks.requiring(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build(); + app.getExecutionContext().submit(t); + try { + t.get(Duration.TEN_SECONDS); + Assert.fail("Should have failed"); + } catch (Exception e) { + // expected + } + + t = Tasks.testing(Repeater.create().until(Callables.returning(false)).limitIterationsTo(2).every(Duration.millis(1))).build(); + app.getExecutionContext().submit(t); + Assert.assertEquals(t.get(Duration.TEN_SECONDS), false); + } + + @Test + public void testRepeaterDescription() throws Exception{ + final String description = "task description"; + Repeater repeater = Repeater.create(description) + .repeat(Callables.returning(null)) + .every(Duration.ONE_MILLISECOND) + .limitIterationsTo(1) + .until(new Callable<Boolean>() { + @Override + public Boolean call() { + TaskInternal<?> current = (TaskInternal<?>)Tasks.current(); + assertEquals(current.getBlockingDetails(), description); + return true; + } + }); + Task<Boolean> t = Tasks.testing(repeater).build(); + app.getExecutionContext().submit(t); + assertTrue(t.get(Duration.TEN_SECONDS)); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java new file mode 100644 index 0000000..9f65bc4 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/ValueResolverTest.java @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.management.ExecutionContext; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.core.util.task.Tasks; +import org.apache.brooklyn.core.util.task.ValueResolver; +import org.testng.Assert; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.BrooklynAppUnitTestSupport; +import brooklyn.util.guava.Maybe; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +/** + * see also {@link TasksTest} for more tests + */ +@Test +public class ValueResolverTest extends BrooklynAppUnitTestSupport { + + private ExecutionContext executionContext; + + @BeforeMethod(alwaysRun=true) + @Override + public void setUp() throws Exception { + super.setUp(); + executionContext = app.getExecutionContext(); + } + + public static final Task<String> newSleepTask(final Duration timeout, final String result) { + return Tasks.<String>builder().body(new Callable<String>() { + public String call() { + Time.sleep(timeout); + return result; + }} + ).build(); + } + + public static final Task<String> newThrowTask(final Duration timeout) { + return Tasks.<String>builder().body(new Callable<String>() { + public String call() { + Time.sleep(timeout); + throw new IllegalStateException("intended, during tests"); + }} + ).build(); + } + + public void testTimeoutZero() { + Maybe<String> result = Tasks.resolving(newSleepTask(Duration.TEN_SECONDS, "foo")).as(String.class).context(executionContext).timeout(Duration.ZERO).getMaybe(); + Assert.assertFalse(result.isPresent()); + } + + public void testTimeoutBig() { + Maybe<String> result = Tasks.resolving(newSleepTask(Duration.ZERO, "foo")).as(String.class).context(executionContext).timeout(Duration.TEN_SECONDS).getMaybe(); + Assert.assertEquals(result.get(), "foo"); + } + + public void testNoExecutionContextOnCompleted() { + Task<String> t = newSleepTask(Duration.ZERO, "foo"); + executionContext.submit(t).getUnchecked(); + Maybe<String> result = Tasks.resolving(t).as(String.class).timeout(Duration.ZERO).getMaybe(); + Assert.assertEquals(result.get(), "foo"); + } + + public static Throwable assertThrowsOnMaybe(ValueResolver<?> result) { + try { + result = result.clone(); + result.getMaybe(); + Assert.fail("should have thrown"); + return null; + } catch (Exception e) { return e; } + } + public static Throwable assertThrowsOnGet(ValueResolver<?> result) { + result = result.clone(); + try { + result.get(); + Assert.fail("should have thrown"); + return null; + } catch (Exception e) { return e; } + } + public static <T> Maybe<T> assertMaybeIsAbsent(ValueResolver<T> result) { + result = result.clone(); + Maybe<T> maybe = result.getMaybe(); + Assert.assertFalse(maybe.isPresent()); + return maybe; + } + + public void testSwallowError() { + ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions(); + assertMaybeIsAbsent(result); + assertThrowsOnGet(result); + } + + + public void testDontSwallowError() { + ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext); + assertThrowsOnMaybe(result); + assertThrowsOnGet(result); + } + + public void testDefaultWhenSwallowError() { + ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.ZERO)).as(String.class).context(executionContext).swallowExceptions().defaultValue("foo"); + assertMaybeIsAbsent(result); + Assert.assertEquals(result.get(), "foo"); + } + + public void testDefaultBeforeDelayAndError() { + ValueResolver<String> result = Tasks.resolving(newThrowTask(Duration.TEN_SECONDS)).as(String.class).context(executionContext).timeout(Duration.ZERO).defaultValue("foo"); + assertMaybeIsAbsent(result); + Assert.assertEquals(result.get(), "foo"); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java new file mode 100644 index 0000000..94fe3e6 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/ssh/SshTasksTest.java @@ -0,0 +1,213 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.ssh; + +import java.io.File; +import java.io.IOException; + +import org.apache.brooklyn.api.location.LocationSpec; +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.core.management.internal.LocalManagementContext; +import org.apache.brooklyn.core.util.ssh.BashCommandsIntegrationTest; +import org.apache.brooklyn.core.util.task.ssh.SshFetchTaskFactory; +import org.apache.brooklyn.core.util.task.ssh.SshFetchTaskWrapper; +import org.apache.brooklyn.core.util.task.ssh.SshPutTaskFactory; +import org.apache.brooklyn.core.util.task.ssh.SshPutTaskWrapper; +import org.apache.brooklyn.core.util.task.ssh.SshTasks; +import org.apache.brooklyn.core.util.task.ssh.SshTasksTest; +import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory; +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.BrooklynConfigKeys; +import brooklyn.entity.basic.Entities; + +import org.apache.brooklyn.location.basic.LocalhostMachineProvisioningLocation; +import org.apache.brooklyn.location.basic.SshMachineLocation; + +import brooklyn.util.net.Urls; +import brooklyn.util.os.Os; + +/** + * Some tests for {@link SshTasks}. Note more tests in {@link BashCommandsIntegrationTest}, + * {@link SshEffectorTasksTest}, and {@link SoftwareEffectorTest}. + */ +public class SshTasksTest { + + private static final Logger log = LoggerFactory.getLogger(SshTasksTest.class); + + ManagementContext mgmt; + SshMachineLocation host; + File tempDir; + + boolean failureExpected; + + @BeforeMethod(alwaysRun=true) + public void setup() throws Exception { + mgmt = new LocalManagementContext(); + + LocalhostMachineProvisioningLocation lhc = mgmt.getLocationManager().createLocation(LocationSpec.create(LocalhostMachineProvisioningLocation.class)); + host = lhc.obtain(); + clearExpectedFailure(); + tempDir = Os.newTempDir(getClass()); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (mgmt != null) Entities.destroyAll(mgmt); + mgmt = null; + tempDir = Os.deleteRecursively(tempDir).asNullOrThrowing(); + checkExpectedFailure(); + } + + protected void checkExpectedFailure() { + if (failureExpected) { + clearExpectedFailure(); + Assert.fail("Test should have thrown an exception but it did not."); + } + } + + protected void clearExpectedFailure() { + failureExpected = false; + } + + protected void setExpectingFailure() { + failureExpected = true; + } + + + protected <T> ProcessTaskWrapper<T> submit(final ProcessTaskFactory<T> tf) { + tf.machine(host); + ProcessTaskWrapper<T> t = tf.newTask(); + mgmt.getExecutionManager().submit(t); + return t; + } + + protected SshPutTaskWrapper submit(final SshPutTaskFactory tf) { + SshPutTaskWrapper t = tf.newTask(); + mgmt.getExecutionManager().submit(t); + return t; + } + + @Test(groups="Integration") + public void testSshEchoHello() { + ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, "sleep 1 ; echo hello world")); + Assert.assertFalse(t.isDone()); + Assert.assertEquals(t.get(), (Integer)0); + Assert.assertEquals(t.getTask().getUnchecked(), (Integer)0); + Assert.assertEquals(t.getStdout().trim(), "hello world"); + } + + @Test(groups="Integration") + public void testCopyTo() throws IOException { + String fn = Urls.mergePaths(tempDir.getPath(), "f1"); + SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world")); + t.block(); + Assert.assertEquals(FileUtils.readFileToString(new File(fn)), "hello world"); + // and make sure this doesn't throw + Assert.assertTrue(t.isDone()); + Assert.assertTrue(t.isSuccessful()); + Assert.assertEquals(t.get(), null); + Assert.assertEquals(t.getExitCode(), (Integer)0); + } + + @Test(groups="Integration") + public void testCopyToFailBadSubdir() throws IOException { + String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir/file"); + SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world")); + //this doesn't fail + t.block(); + Assert.assertTrue(t.isDone()); + setExpectingFailure(); + try { + // but this does + t.get(); + } catch (Exception e) { + log.info("The error if file cannot be written is: "+e); + clearExpectedFailure(); + } + checkExpectedFailure(); + // and the results indicate failure + Assert.assertFalse(t.isSuccessful()); + Assert.assertNotNull(t.getException()); + Assert.assertNotEquals(t.getExitCode(), (Integer)0); + } + + @Test(groups="Integration") + public void testCopyToFailBadSubdirAllow() throws IOException { + String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir/file"); + SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world").allowFailure()); + //this doesn't fail + t.block(); + Assert.assertTrue(t.isDone()); + // and this doesn't fail either + Assert.assertEquals(t.get(), null); + // but it's not successful + Assert.assertNotNull(t.getException()); + Assert.assertFalse(t.isSuccessful()); + // exit code probably null, but won't be zero + Assert.assertNotEquals(t.getExitCode(), (Integer)0); + } + + @Test(groups="Integration") + public void testCopyToFailBadSubdirCreate() throws IOException { + String fn = Urls.mergePaths(tempDir.getPath(), "non-existent-subdir-to-create/file"); + SshPutTaskWrapper t = submit(SshTasks.newSshPutTaskFactory(host, fn).contents("hello world").createDirectory()); + t.block(); + // directory should be created, and file readable now + Assert.assertEquals(FileUtils.readFileToString(new File(fn)), "hello world"); + Assert.assertEquals(t.getExitCode(), (Integer)0); + } + + @Test(groups="Integration") + public void testSshFetch() throws IOException { + String fn = Urls.mergePaths(tempDir.getPath(), "f2"); + FileUtils.write(new File(fn), "hello fetched world"); + + SshFetchTaskFactory tf = SshTasks.newSshFetchTaskFactory(host, fn); + SshFetchTaskWrapper t = tf.newTask(); + mgmt.getExecutionManager().submit(t); + + t.block(); + Assert.assertTrue(t.isDone()); + Assert.assertEquals(t.get(), "hello fetched world"); + Assert.assertEquals(t.getBytes(), "hello fetched world".getBytes()); + } + + @Test(groups="Integration") + public void testSshWithHeaderProperty() { + host.setConfig(BrooklynConfigKeys.SSH_CONFIG_SCRIPT_HEADER, "#!/bin/bash -e\necho foo\n"); + ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, "echo bar")); + Assert.assertTrue(t.block().getStdout().trim().matches("foo\\s+bar"), "mismatched output was: "+t.getStdout()); + } + + @Test(groups="Integration") + public void testSshIgnoringHeaderProperty() { + host.setConfig(BrooklynConfigKeys.SSH_CONFIG_SCRIPT_HEADER, "#!/bin/bash -e\necho foo\n"); + ProcessTaskWrapper<Integer> t = submit(SshTasks.newSshExecTaskFactory(host, false, "echo bar")); + Assert.assertTrue(t.block().getStdout().trim().matches("bar"), "mismatched output was: "+t.getStdout()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java b/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java new file mode 100644 index 0000000..b673056 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/task/system/SystemTasksTest.java @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task.system; + +import java.io.File; + +import org.apache.brooklyn.api.management.ManagementContext; +import org.apache.brooklyn.core.management.internal.LocalManagementContext; +import org.apache.brooklyn.core.util.task.ssh.SshTasks; +import org.apache.brooklyn.core.util.task.system.ProcessTaskFactory; +import org.apache.brooklyn.core.util.task.system.ProcessTaskWrapper; +import org.apache.brooklyn.core.util.task.system.SystemTasks; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.Entities; +import brooklyn.util.os.Os; + +/** + * Some tests for {@link SystemTasks}. See {@link SshTasks}. + */ +public class SystemTasksTest { + + ManagementContext mgmt; + File tempDir; + + boolean failureExpected; + + @BeforeMethod(alwaysRun=true) + public void setup() throws Exception { + mgmt = new LocalManagementContext(); + + clearExpectedFailure(); + tempDir = Os.newTempDir(getClass()); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + if (mgmt != null) Entities.destroyAll(mgmt); + mgmt = null; + tempDir = Os.deleteRecursively(tempDir).asNullOrThrowing(); + checkExpectedFailure(); + } + + protected void checkExpectedFailure() { + if (failureExpected) { + clearExpectedFailure(); + Assert.fail("Test should have thrown an exception but it did not."); + } + } + + protected void clearExpectedFailure() { + failureExpected = false; + } + + protected void setExpectingFailure() { + failureExpected = true; + } + + + protected <T> ProcessTaskWrapper<T> submit(final ProcessTaskFactory<T> tf) { + ProcessTaskWrapper<T> t = tf.newTask(); + mgmt.getExecutionManager().submit(t); + return t; + } + + @Test(groups="Integration") + public void testExecEchoHello() { + ProcessTaskWrapper<Integer> t = submit(SystemTasks.exec("sleep 1 ; echo hello world")); + Assert.assertFalse(t.isDone()); + Assert.assertEquals(t.get(), (Integer)0); + Assert.assertEquals(t.getTask().getUnchecked(), (Integer)0); + Assert.assertEquals(t.getStdout().trim(), "hello world"); + } + + // FIXME Behaviour of Bash shell changes from 3.x to 4.x so test is disabled + @Test(groups="Integration", enabled=false) + public void testSubshellExitScriptDoesNotExit() { + checkSubshellExitDoesNotExit(taskSubshellExit().runAsScript()); + } + + @Test(groups="Integration") + public void testSubshellExitCommandDoesNotExit() { + checkSubshellExitDoesNotExit(taskSubshellExit().runAsCommand()); + } + + public ProcessTaskFactory<Integer> taskSubshellExit() { + return SystemTasks.exec("echo hello", "( exit 1 )", "echo bye code $?"); + } + + public void checkSubshellExitDoesNotExit(ProcessTaskFactory<Integer> task) { + ProcessTaskWrapper<Integer> t = submit(task); + t.block(); + Assert.assertEquals(t.get(), (Integer)0); + Assert.assertTrue(t.getStdout().contains("bye code 1"), "stdout is: "+t.getStdout()); + } + + @Test(groups="Integration") + public void testGroupExitScriptDoesNotExit() { + checkGroupExitDoesExit(taskGroupExit().runAsScript()); + } + + @Test(groups="Integration") + public void testGroupExitCommandDoesNotExit() { + checkGroupExitDoesExit(taskGroupExit().runAsCommand()); + } + + public ProcessTaskFactory<Integer> taskGroupExit() { + return SystemTasks.exec("echo hello", "{ exit 1 ; }", "echo bye code $?"); + } + + public void checkGroupExitDoesExit(ProcessTaskFactory<Integer> task) { + ProcessTaskWrapper<Integer> t = submit(task); + t.block(); + Assert.assertEquals(t.get(), (Integer)1); + Assert.assertFalse(t.getStdout().contains("bye"), "stdout is: "+t.getStdout()); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java b/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java new file mode 100644 index 0000000..73794a3 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/text/DataUriSchemeParserTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.text; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; + +import org.apache.brooklyn.core.util.text.DataUriSchemeParser; +import org.bouncycastle.util.encoders.Base64; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class DataUriSchemeParserTest { + + @Test + public void testSimple() { + Assert.assertEquals(new DataUriSchemeParser("data:,hello").parse().getDataAsString(), "hello"); + Assert.assertEquals(DataUriSchemeParser.toString("data:,hello"), "hello"); + } + + @Test + public void testMimeType() throws UnsupportedEncodingException { + DataUriSchemeParser p = new DataUriSchemeParser("data:application/json,"+URLEncoder.encode("{ }", "US-ASCII")).parse(); + Assert.assertEquals(p.getMimeType(), "application/json"); + Assert.assertEquals(p.getData(), "{ }".getBytes()); + } + + @Test + public void testBase64() { + Assert.assertEquals(DataUriSchemeParser.toString( + "data:;base64,"+new String(Base64.encode("hello".getBytes()))), + "hello"); + } + + // TODO test pictures, etc + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/a4c0e5fd/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java b/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java new file mode 100644 index 0000000..05f4fde --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/core/util/text/TemplateProcessorTest.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.text; + +import static org.testng.Assert.assertEquals; + +import org.apache.brooklyn.api.entity.proxying.EntitySpec; +import org.apache.brooklyn.core.management.internal.ManagementContextInternal; +import org.apache.brooklyn.core.util.text.TemplateProcessor; +import org.apache.brooklyn.test.entity.TestApplication; +import org.apache.brooklyn.test.entity.TestEntity; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.BrooklynAppUnitTestSupport; +import brooklyn.event.basic.DependentConfiguration; +import brooklyn.test.FixedLocaleTest; + +import com.google.common.collect.ImmutableMap; + +public class TemplateProcessorTest extends BrooklynAppUnitTestSupport { + private FixedLocaleTest localeFix = new FixedLocaleTest(); + + @BeforeMethod(alwaysRun=true) + public void setUp() throws Exception { + super.setUp(); + localeFix.setUp(); + } + + @AfterMethod(alwaysRun=true) + public void tearDown() throws Exception { + super.tearDown(); + localeFix.tearDown(); + } + + @Test + public void testAdditionalArgs() { + String templateContents = "${mykey}"; + String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.of("mykey", "myval")); + assertEquals(result, "myval"); + } + + @Test + public void testEntityConfig() { + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .configure(TestEntity.CONF_NAME, "myval")); + String templateContents = "${config['"+TestEntity.CONF_NAME.getName()+"']}"; + String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of()); + assertEquals(result, "myval"); + } + + @Test + public void testEntityConfigNumber() { + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .configure(TestEntity.CONF_OBJECT, 123456)); + String templateContents = "${config['"+TestEntity.CONF_OBJECT.getName()+"']}"; + String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of()); + assertEquals(result, "123,456"); + } + + @Test + public void testEntityConfigNumberUnadorned() { + // ?c is needed to avoid commas (i always forget this!) + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .configure(TestEntity.CONF_OBJECT, 123456)); + String templateContents = "${config['"+TestEntity.CONF_OBJECT.getName()+"']?c}"; + String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of()); + assertEquals(result, "123456"); + } + + @Test + public void testGetSysProp() { + System.setProperty("testGetSysProp", "myval"); + + String templateContents = "${javaSysProps['testGetSysProp']}"; + String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of()); + assertEquals(result, "myval"); + } + + @Test + public void testEntityGetterMethod() { + String templateContents = "${entity.id}"; + String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of()); + assertEquals(result, app.getId()); + } + + @Test + public void testManagementContextConfig() { + mgmt.getBrooklynProperties().put("globalmykey", "myval"); + String templateContents = "${mgmt.globalmykey}"; + String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of()); + assertEquals(result, "myval"); + } + + @Test + public void testManagementContextDefaultValue() { + String templateContents = "${(missing)!\"defval\"}"; + Object result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of()); + assertEquals(result, "defval"); + } + + @Test + public void testManagementContextDefaultValueInDotMissingValue() { + String templateContents = "${(mgmt.missing.more_missing)!\"defval\"}"; + Object result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of()); + assertEquals(result, "defval"); + } + + @Test + public void testManagementContextConfigWithDot() { + mgmt.getBrooklynProperties().put("global.mykey", "myval"); + String templateContents = "${mgmt['global.mykey']}"; + String result = TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of()); + assertEquals(result, "myval"); + } + + @Test + public void testManagementContextErrors() { + try { + // NB: dot has special meaning so this should fail; must be accessed using bracket notation as above + mgmt.getBrooklynProperties().put("global.mykey", "myval"); + String templateContents = "${mgmt.global.mykey}"; + TemplateProcessor.processTemplateContents(templateContents, app, ImmutableMap.<String,Object>of()); + Assert.fail("Should not have found value with intermediate dot"); + } catch (Exception e) { + Assert.assertTrue(e.toString().contains("global"), "Should have mentioned missing key 'global' in error"); + } + } + + @Test + public void testApplyTemplatedConfigWithAttributeWhenReady() { + app.setAttribute(TestApplication.MY_ATTRIBUTE, "myval"); + + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .configure(TestEntity.CONF_NAME, DependentConfiguration.attributeWhenReady(app, TestApplication.MY_ATTRIBUTE))); + + String templateContents = "${config['"+TestEntity.CONF_NAME.getName()+"']}"; + String result = TemplateProcessor.processTemplateContents(templateContents, entity, ImmutableMap.<String,Object>of()); + assertEquals(result, "myval"); + } + + @Test + public void testDotSeparatedKey() { + String templateContents = "${a.b}"; + String result = TemplateProcessor.processTemplateContents(templateContents, (ManagementContextInternal)null, + ImmutableMap.<String,Object>of("a.b", "myval")); + assertEquals(result, "myval"); + } + + @Test + public void testDotSeparatedKeyCollisionFailure() { + String templateContents = "${aaa.bbb}"; + try { + TemplateProcessor.processTemplateContents(templateContents, (ManagementContextInternal)null, + ImmutableMap.<String,Object>of("aaa.bbb", "myval", "aaa", "blocker")); + Assert.fail("Should not have found value with intermediate dot where prefix is overridden"); + } catch (Exception e) { + Assert.assertTrue(e.toString().contains("aaa"), "Should have mentioned missing key 'aaa' in error"); + } + } + +}
