This is an automated email from the ASF dual-hosted git repository.

jerrypeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new c4098d6  Added more unit tests to the JavaInstanceTest class (#10369)
c4098d6 is described below

commit c4098d6845194ca78f3ddbc6748ec3c3bb313cb9
Author: David Kjerrumgaard <35466513+david-stream...@users.noreply.github.com>
AuthorDate: Fri May 14 09:47:37 2021 -0700

    Added more unit tests to the JavaInstanceTest class (#10369)
---
 .../functions/instance/JavaInstanceTest.java       | 96 ++++++++++++++++++++++
 1 file changed, 96 insertions(+)

diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
index 59f36d9..b1e7cc7 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/JavaInstanceTest.java
@@ -21,6 +21,8 @@ package org.apache.pulsar.functions.instance;
 import static org.mockito.Mockito.mock;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertNotNull;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertSame;
 
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
@@ -54,6 +56,35 @@ public class JavaInstanceTest {
         assertEquals(new String(testString + "-lambda"), 
result.get().getResult());
         instance.close();
     }
+    
+    @Test
+    public void testNullReturningFunction() throws Exception  {
+       JavaInstance instance = new JavaInstance(
+                mock(ContextImpl.class),
+                (Function<String, String>) (input, context) -> null,
+                new InstanceConfig());
+       String testString = "ABC123";
+       CompletableFuture<JavaExecutionResult> result = 
instance.handleMessage(mock(Record.class), testString);
+       assertNull(result.get().getResult());
+       instance.close();
+    }
+
+    @Test
+    public void testUserExceptionThrowingFunction() throws Exception  {
+       final UserException userException = new UserException("Boom");
+       Function<String, String> func = (input, context) -> {
+               throw userException;
+       };
+
+       JavaInstance instance = new JavaInstance(
+                mock(ContextImpl.class),
+                func,
+                new InstanceConfig());
+       String testString = "ABC123";
+       CompletableFuture<JavaExecutionResult> result = 
instance.handleMessage(mock(Record.class), testString);
+       assertSame(userException, result.get().getUserException());
+       instance.close();
+    }
 
     @Test
     public void testAsyncFunction() throws Exception {
@@ -86,6 +117,64 @@ public class JavaInstanceTest {
         assertEquals(new String(testString + "-lambda"), 
result.get().getResult());
         instance.close();
     }
+    
+    @Test
+    public void testNullReturningAsyncFunction() throws Exception {
+        InstanceConfig instanceConfig = new InstanceConfig();
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        Function<String, CompletableFuture<String>> function = (input, 
context) -> {
+            log.info("input string: {}", input);
+            CompletableFuture<String> result  = new CompletableFuture<>();
+            executor.submit(() -> {
+                try {
+                    Thread.sleep(500);
+                    result.complete(null);
+                } catch (Exception e) {
+                    result.completeExceptionally(e);
+                }
+            });
+
+            return result;
+        };
+
+        JavaInstance instance = new JavaInstance(
+                mock(ContextImpl.class),
+                function,
+                instanceConfig);
+        String testString = "ABC123";
+        CompletableFuture<JavaExecutionResult> result = 
instance.handleMessage(mock(Record.class), testString);
+        assertNull(result.get().getResult());
+        instance.close();
+    }
+
+    @Test
+    public void testUserExceptionThrowingAsyncFunction() throws Exception {
+       final UserException userException = new UserException("Boom");
+        InstanceConfig instanceConfig = new InstanceConfig();
+        @Cleanup("shutdownNow")
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        Function<String, CompletableFuture<String>> function = (input, 
context) -> {
+            log.info("input string: {}", input);
+            CompletableFuture<String> result  = new CompletableFuture<>();
+            executor.submit(() -> {
+               result.completeExceptionally(userException);
+            });
+
+            return result;
+        };
+
+        JavaInstance instance = new JavaInstance(
+                mock(ContextImpl.class),
+                function,
+                instanceConfig);
+        String testString = "ABC123";
+        CompletableFuture<JavaExecutionResult> result = 
instance.handleMessage(mock(Record.class), testString);
+        assertSame(userException, result.get().getUserException().getCause());
+        instance.close();
+    }
 
     @Test
     public void testAsyncFunctionMaxPending() throws Exception {
@@ -137,4 +226,11 @@ public class JavaInstanceTest {
         log.info("start:{} end:{} during:{}", startTime, endTime, endTime - 
startTime);
         instance.close();
     }
+    
+    @SuppressWarnings("serial")
+       private static class UserException extends Exception {
+       public UserException(String msg) {
+               super(msg);
+       }
+    }
 }

Reply via email to