This is an automated email from the ASF dual-hosted git repository. jerrypeng pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new c4098d6 Added more unit tests to the JavaInstanceTest class (#10369) c4098d6 is described below commit c4098d6845194ca78f3ddbc6748ec3c3bb313cb9 Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com> AuthorDate: Fri May 14 09:47:37 2021 -0700 Added more unit tests to the JavaInstanceTest class (#10369) --- .../functions/instance/JavaInstanceTest.java | 96 ++++++++++++++++++++++ 1 file changed, 96 insertions(+) diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java index 59f36d9..b1e7cc7 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java @@ -21,6 +21,8 @@ package org.apache.pulsar.functions.instance; import static org.mockito.Mockito.mock; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; +import static org.testng.Assert.assertNull; +import static org.testng.Assert.assertSame; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -54,6 +56,35 @@ public class JavaInstanceTest { assertEquals(new String(testString + "-lambda"), result.get().getResult()); instance.close(); } + + @Test + public void testNullReturningFunction() throws Exception { + JavaInstance instance = new JavaInstance( + mock(ContextImpl.class), + (Function<String, String>) (input, context) -> null, + new InstanceConfig()); + String testString = "ABC123"; + CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString); + assertNull(result.get().getResult()); + instance.close(); + } + + @Test + public void testUserExceptionThrowingFunction() throws Exception { + final UserException userException = new UserException("Boom"); + Function<String, String> func = (input, context) -> { + throw userException; + }; + + JavaInstance instance = new JavaInstance( + mock(ContextImpl.class), + func, + new InstanceConfig()); + String testString = "ABC123"; + CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString); + assertSame(userException, result.get().getUserException()); + instance.close(); + } @Test public void testAsyncFunction() throws Exception { @@ -86,6 +117,64 @@ public class JavaInstanceTest { assertEquals(new String(testString + "-lambda"), result.get().getResult()); instance.close(); } + + @Test + public void testNullReturningAsyncFunction() throws Exception { + InstanceConfig instanceConfig = new InstanceConfig(); + @Cleanup("shutdownNow") + ExecutorService executor = Executors.newCachedThreadPool(); + + Function<String, CompletableFuture<String>> function = (input, context) -> { + log.info("input string: {}", input); + CompletableFuture<String> result = new CompletableFuture<>(); + executor.submit(() -> { + try { + Thread.sleep(500); + result.complete(null); + } catch (Exception e) { + result.completeExceptionally(e); + } + }); + + return result; + }; + + JavaInstance instance = new JavaInstance( + mock(ContextImpl.class), + function, + instanceConfig); + String testString = "ABC123"; + CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString); + assertNull(result.get().getResult()); + instance.close(); + } + + @Test + public void testUserExceptionThrowingAsyncFunction() throws Exception { + final UserException userException = new UserException("Boom"); + InstanceConfig instanceConfig = new InstanceConfig(); + @Cleanup("shutdownNow") + ExecutorService executor = Executors.newCachedThreadPool(); + + Function<String, CompletableFuture<String>> function = (input, context) -> { + log.info("input string: {}", input); + CompletableFuture<String> result = new CompletableFuture<>(); + executor.submit(() -> { + result.completeExceptionally(userException); + }); + + return result; + }; + + JavaInstance instance = new JavaInstance( + mock(ContextImpl.class), + function, + instanceConfig); + String testString = "ABC123"; + CompletableFuture<JavaExecutionResult> result = instance.handleMessage(mock(Record.class), testString); + assertSame(userException, result.get().getUserException().getCause()); + instance.close(); + } @Test public void testAsyncFunctionMaxPending() throws Exception { @@ -137,4 +226,11 @@ public class JavaInstanceTest { log.info("start:{} end:{} during:{}", startTime, endTime, endTime - startTime); instance.close(); } + + @SuppressWarnings("serial") + private static class UserException extends Exception { + public UserException(String msg) { + super(msg); + } + } }