This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 13f870c  Add Async State manipulation methods (#3978)
13f870c is described below

commit 13f870c3bafedb7f73449a2a5b25f07b4f6667e9
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Thu Apr 4 11:26:16 2019 -0700

    Add Async State manipulation methods (#3978)
    
    * Add Async State manipulation methods
    
    * Fix build
    
    * Fixed unittest
---
 .../org/apache/pulsar/functions/api/Context.java   | 34 ++++++++++++++++++
 .../pulsar/functions/instance/ContextImpl.java     | 33 +++++++++++++++---
 .../functions/instance/state/StateContext.java     |  9 ++---
 .../functions/instance/state/StateContextImpl.java | 40 ++++++++++++----------
 .../pulsar/functions/instance/ContextImplTest.java | 14 ++++----
 .../instance/state/StateContextImplTest.java       |  8 ++---
 6 files changed, 101 insertions(+), 37 deletions(-)

diff --git 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
index 63cbc9e..17f989e 100644
--- 
a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
+++ 
b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java
@@ -125,6 +125,15 @@ public interface Context {
     void incrCounter(String key, long amount);
 
     /**
+     * Increment the builtin distributed counter referred by key
+     * but dont wait for the completion of the increment operation
+     *
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    CompletableFuture<Void> incrCounterAsync(String key, long amount);
+
+    /**
      * Retrieve the counter value for the key.
      *
      * @param key name of the key
@@ -133,6 +142,15 @@ public interface Context {
     long getCounter(String key);
 
     /**
+     * Retrieve the counter value for the key, but don't wait
+     * for the operation to be completed
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    CompletableFuture<Long> getCounterAsync(String key);
+
+    /**
      * Update the state value for the key.
      *
      * @param key name of the key
@@ -141,6 +159,14 @@ public interface Context {
     void putState(String key, ByteBuffer value);
 
     /**
+     * Update the state value for the key, but don't wait for the operation to 
be completed
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    CompletableFuture<Void> putStateAsync(String key, ByteBuffer value);
+
+    /**
      * Retrieve the state value for the key.
      *
      * @param key name of the key
@@ -149,6 +175,14 @@ public interface Context {
     ByteBuffer getState(String key);
 
     /**
+     * Retrieve the state value for the key, but don't wait for the operation 
to be completed
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    CompletableFuture<ByteBuffer> getStateAsync(String key);
+
+    /**
      * Get a map of all user-defined key/value configs for the function.
      *
      * @return The full map of user-defined config values
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
index bb70b41..dc99f60 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java
@@ -61,6 +61,7 @@ import java.util.concurrent.TimeUnit;
 
 import static com.google.common.base.Preconditions.checkState;
 import static 
org.apache.pulsar.functions.instance.stats.FunctionStatsManager.USER_METRIC_PREFIX;
+import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
 /**
  * This class implements the Context interface exposed to the user.
@@ -263,40 +264,64 @@ class ContextImpl implements Context, SinkContext, 
SourceContext {
     }
 
     @Override
+    public CompletableFuture<Void> incrCounterAsync(String key, long amount) {
+        ensureStateEnabled();
+        return stateContext.incrCounter(key, amount);
+    }
+
+    @Override
     public void incrCounter(String key, long amount) {
         ensureStateEnabled();
         try {
-            stateContext.incr(key, amount);
+            result(stateContext.incrCounter(key, amount));
         } catch (Exception e) {
             throw new RuntimeException("Failed to increment key '" + key + "' 
by amount '" + amount + "'", e);
         }
     }
 
     @Override
+    public CompletableFuture<Long> getCounterAsync(String key) {
+        ensureStateEnabled();
+        return stateContext.getCounter(key);
+    }
+
+    @Override
     public long getCounter(String key) {
         ensureStateEnabled();
         try {
-            return stateContext.getAmount(key);
+            return result(stateContext.getCounter(key));
         } catch (Exception e) {
             throw new RuntimeException("Failed to retrieve counter from key '" 
+ key + "'");
         }
     }
 
     @Override
+    public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) 
{
+        ensureStateEnabled();
+        return stateContext.put(key, value);
+    }
+
+    @Override
     public void putState(String key, ByteBuffer value) {
         ensureStateEnabled();
         try {
-            stateContext.put(key, value);
+            result(stateContext.put(key, value));
         } catch (Exception e) {
             throw new RuntimeException("Failed to update the state value for 
key '" + key + "'");
         }
     }
 
     @Override
+    public CompletableFuture<ByteBuffer> getStateAsync(String key) {
+        ensureStateEnabled();
+        return stateContext.get(key);
+    }
+
+    @Override
     public ByteBuffer getState(String key) {
         ensureStateEnabled();
         try {
-            return stateContext.getValue(key);
+            return result(stateContext.get(key));
         } catch (Exception e) {
             throw new RuntimeException("Failed to retrieve the state value for 
key '" + key + "'");
         }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
index c17e8b6..90e85f4 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java
@@ -19,6 +19,7 @@
 package org.apache.pulsar.functions.instance.state;
 
 import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
 
 /**
  * A state context per function.
@@ -31,7 +32,7 @@ public interface StateContext {
      * @param key key to increment
      * @param amount the amount incremented
      */
-    void incr(String key, long amount) throws Exception;
+    CompletableFuture<Void> incrCounter(String key, long amount) throws 
Exception;
 
     /**
      * Update the given <i>key</i> to the provide <i>value</i>.
@@ -49,7 +50,7 @@ public interface StateContext {
      * @param key key to update.
      * @param value value to update
      */
-    void put(String key, ByteBuffer value) throws Exception;
+    CompletableFuture<Void> put(String key, ByteBuffer value) throws Exception;
 
     /**
      * Get the value of a given <i>key</i>.
@@ -57,7 +58,7 @@ public interface StateContext {
      * @param key key to retrieve
      * @return a completable future representing the retrieve result.
      */
-    ByteBuffer getValue(String key) throws Exception;
+    CompletableFuture<ByteBuffer> get(String key) throws Exception;
 
     /**
      * Get the amount of a given <i>key</i>.
@@ -65,6 +66,6 @@ public interface StateContext {
      * @param key key to retrieve
      * @return a completable future representing the retrieve result.
      */
-    long getAmount(String key) throws Exception;
+    CompletableFuture<Long> getCounter(String key) throws Exception;
 
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
index 1a2c26d..4e60814 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java
@@ -19,11 +19,12 @@
 package org.apache.pulsar.functions.instance.state;
 
 import static java.nio.charset.StandardCharsets.UTF_8;
-import static org.apache.bookkeeper.common.concurrent.FutureUtils.result;
 
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.Unpooled;
 import java.nio.ByteBuffer;
+import java.util.concurrent.CompletableFuture;
+
 import org.apache.bookkeeper.api.kv.Table;
 
 /**
@@ -40,35 +41,38 @@ public class StateContextImpl implements StateContext {
     }
 
     @Override
-    public void incr(String key, long amount) throws Exception {
+    public CompletableFuture<Void> incrCounter(String key, long amount) {
         // TODO: this can be optimized with a batch operation.
-        result(table.increment(
+        return table.increment(
             Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
-            amount));
+            amount);
     }
 
     @Override
-    public void put(String key, ByteBuffer value) throws Exception {
-        result(table.put(
+    public CompletableFuture<Void> put(String key, ByteBuffer value) {
+        return table.put(
             Unpooled.wrappedBuffer(key.getBytes(UTF_8)),
-            Unpooled.wrappedBuffer(value)));
+            Unpooled.wrappedBuffer(value));
     }
 
     @Override
-    public ByteBuffer getValue(String key) throws Exception {
-        ByteBuf data = 
result(table.get(Unpooled.wrappedBuffer(key.getBytes(UTF_8))));
-        try {
-            ByteBuffer result = ByteBuffer.allocate(data.readableBytes());
-            data.readBytes(result);
-            return result;
-        } finally {
-            data.release();
-        }
+    public CompletableFuture<ByteBuffer> get(String key) {
+        return 
table.get(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply(
+                data -> {
+                    try {
+                        ByteBuffer result = 
ByteBuffer.allocate(data.readableBytes());
+                        data.readBytes(result);
+                        return result;
+                    } finally {
+                        data.release();
+                    }
+                }
+        );
     }
 
     @Override
-    public long getAmount(String key) throws Exception {
-        return 
result(table.getNumber(Unpooled.wrappedBuffer(key.getBytes(UTF_8))));
+    public CompletableFuture<Long> getCounter(String key) {
+        return table.getNumber(Unpooled.wrappedBuffer(key.getBytes(UTF_8)));
     }
 
 }
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
index a898e54..fd541f3 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java
@@ -104,16 +104,16 @@ public class ContextImplTest {
     public void testIncrCounterStateEnabled() throws Exception {
         StateContextImpl stateContext = mock(StateContextImpl.class);
         context.setStateContext(stateContext);
-        context.incrCounter("test-key", 10L);
-        verify(stateContext, times(1)).incr(eq("test-key"), eq(10L));
+        context.incrCounterAsync("test-key", 10L);
+        verify(stateContext, times(1)).incrCounter(eq("test-key"), eq(10L));
     }
 
     @Test
     public void testGetCounterStateEnabled() throws Exception {
         StateContextImpl stateContext = mock(StateContextImpl.class);
         context.setStateContext(stateContext);
-        context.getCounter("test-key");
-        verify(stateContext, times(1)).getAmount(eq("test-key"));
+        context.getCounterAsync("test-key");
+        verify(stateContext, times(1)).getCounter(eq("test-key"));
     }
 
     @Test
@@ -121,7 +121,7 @@ public class ContextImplTest {
         StateContextImpl stateContext = mock(StateContextImpl.class);
         context.setStateContext(stateContext);
         ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8));
-        context.putState("test-key", buffer);
+        context.putStateAsync("test-key", buffer);
         verify(stateContext, times(1)).put(eq("test-key"), same(buffer));
     }
 
@@ -129,8 +129,8 @@ public class ContextImplTest {
     public void testGetStateStateEnabled() throws Exception {
         StateContextImpl stateContext = mock(StateContextImpl.class);
         context.setStateContext(stateContext);
-        context.getState("test-key");
-        verify(stateContext, times(1)).getValue(eq("test-key"));
+        context.getStateAsync("test-key");
+        verify(stateContext, times(1)).get(eq("test-key"));
     }
 
     @Test
diff --git 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
index 338f64e..2805a3d 100644
--- 
a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
+++ 
b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java
@@ -55,7 +55,7 @@ public class StateContextImplTest {
     public void testIncr() throws Exception {
         when(mockTable.increment(any(ByteBuf.class), anyLong()))
             .thenReturn(FutureUtils.Void());
-        stateContext.incr("test-key", 10L);
+        stateContext.incrCounter("test-key", 10L).get();
         verify(mockTable, times(1)).increment(
             eq(Unpooled.copiedBuffer("test-key", UTF_8)),
             eq(10L)
@@ -66,7 +66,7 @@ public class StateContextImplTest {
     public void testPut() throws Exception {
         when(mockTable.put(any(ByteBuf.class), any(ByteBuf.class)))
             .thenReturn(FutureUtils.Void());
-        stateContext.put("test-key", 
ByteBuffer.wrap("test-value".getBytes(UTF_8)));
+        stateContext.put("test-key", 
ByteBuffer.wrap("test-value".getBytes(UTF_8))).get();
         verify(mockTable, times(1)).put(
             eq(Unpooled.copiedBuffer("test-key", UTF_8)),
             eq(Unpooled.copiedBuffer("test-value", UTF_8))
@@ -78,7 +78,7 @@ public class StateContextImplTest {
         ByteBuf returnedValue = Unpooled.copiedBuffer("test-value", UTF_8);
         when(mockTable.get(any(ByteBuf.class)))
             .thenReturn(FutureUtils.value(returnedValue));
-        ByteBuffer result = stateContext.getValue("test-key");
+        ByteBuffer result = stateContext.get("test-key").get();
         assertEquals("test-value", new String(result.array(), UTF_8));
         verify(mockTable, times(1)).get(
             eq(Unpooled.copiedBuffer("test-key", UTF_8))
@@ -89,7 +89,7 @@ public class StateContextImplTest {
     public void testGetAmount() throws Exception {
         when(mockTable.getNumber(any(ByteBuf.class)))
             .thenReturn(FutureUtils.value(10L));
-        assertEquals(10L, stateContext.getAmount("test-key"));
+        assertEquals((Long)10L, stateContext.getCounter("test-key").get());
         verify(mockTable, times(1)).getNumber(
             eq(Unpooled.copiedBuffer("test-key", UTF_8))
         );

Reply via email to