This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 13f870c Add Async State manipulation methods (#3978) 13f870c is described below commit 13f870c3bafedb7f73449a2a5b25f07b4f6667e9 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Thu Apr 4 11:26:16 2019 -0700 Add Async State manipulation methods (#3978) * Add Async State manipulation methods * Fix build * Fixed unittest --- .../org/apache/pulsar/functions/api/Context.java | 34 ++++++++++++++++++ .../pulsar/functions/instance/ContextImpl.java | 33 +++++++++++++++--- .../functions/instance/state/StateContext.java | 9 ++--- .../functions/instance/state/StateContextImpl.java | 40 ++++++++++++---------- .../pulsar/functions/instance/ContextImplTest.java | 14 ++++---- .../instance/state/StateContextImplTest.java | 8 ++--- 6 files changed, 101 insertions(+), 37 deletions(-) diff --git a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java index 63cbc9e..17f989e 100644 --- a/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java +++ b/pulsar-functions/api-java/src/main/java/org/apache/pulsar/functions/api/Context.java @@ -125,6 +125,15 @@ public interface Context { void incrCounter(String key, long amount); /** + * Increment the builtin distributed counter referred by key + * but dont wait for the completion of the increment operation + * + * @param key The name of the key + * @param amount The amount to be incremented + */ + CompletableFuture<Void> incrCounterAsync(String key, long amount); + + /** * Retrieve the counter value for the key. * * @param key name of the key @@ -133,6 +142,15 @@ public interface Context { long getCounter(String key); /** + * Retrieve the counter value for the key, but don't wait + * for the operation to be completed + * + * @param key name of the key + * @return the amount of the counter value for this key + */ + CompletableFuture<Long> getCounterAsync(String key); + + /** * Update the state value for the key. * * @param key name of the key @@ -141,6 +159,14 @@ public interface Context { void putState(String key, ByteBuffer value); /** + * Update the state value for the key, but don't wait for the operation to be completed + * + * @param key name of the key + * @param value state value of the key + */ + CompletableFuture<Void> putStateAsync(String key, ByteBuffer value); + + /** * Retrieve the state value for the key. * * @param key name of the key @@ -149,6 +175,14 @@ public interface Context { ByteBuffer getState(String key); /** + * Retrieve the state value for the key, but don't wait for the operation to be completed + * + * @param key name of the key + * @return the state value for the key. + */ + CompletableFuture<ByteBuffer> getStateAsync(String key); + + /** * Get a map of all user-defined key/value configs for the function. * * @return The full map of user-defined config values diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java index bb70b41..dc99f60 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/ContextImpl.java @@ -61,6 +61,7 @@ import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkState; import static org.apache.pulsar.functions.instance.stats.FunctionStatsManager.USER_METRIC_PREFIX; +import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; /** * This class implements the Context interface exposed to the user. @@ -263,40 +264,64 @@ class ContextImpl implements Context, SinkContext, SourceContext { } @Override + public CompletableFuture<Void> incrCounterAsync(String key, long amount) { + ensureStateEnabled(); + return stateContext.incrCounter(key, amount); + } + + @Override public void incrCounter(String key, long amount) { ensureStateEnabled(); try { - stateContext.incr(key, amount); + result(stateContext.incrCounter(key, amount)); } catch (Exception e) { throw new RuntimeException("Failed to increment key '" + key + "' by amount '" + amount + "'", e); } } @Override + public CompletableFuture<Long> getCounterAsync(String key) { + ensureStateEnabled(); + return stateContext.getCounter(key); + } + + @Override public long getCounter(String key) { ensureStateEnabled(); try { - return stateContext.getAmount(key); + return result(stateContext.getCounter(key)); } catch (Exception e) { throw new RuntimeException("Failed to retrieve counter from key '" + key + "'"); } } @Override + public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) { + ensureStateEnabled(); + return stateContext.put(key, value); + } + + @Override public void putState(String key, ByteBuffer value) { ensureStateEnabled(); try { - stateContext.put(key, value); + result(stateContext.put(key, value)); } catch (Exception e) { throw new RuntimeException("Failed to update the state value for key '" + key + "'"); } } @Override + public CompletableFuture<ByteBuffer> getStateAsync(String key) { + ensureStateEnabled(); + return stateContext.get(key); + } + + @Override public ByteBuffer getState(String key) { ensureStateEnabled(); try { - return stateContext.getValue(key); + return result(stateContext.get(key)); } catch (Exception e) { throw new RuntimeException("Failed to retrieve the state value for key '" + key + "'"); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java index c17e8b6..90e85f4 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContext.java @@ -19,6 +19,7 @@ package org.apache.pulsar.functions.instance.state; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; /** * A state context per function. @@ -31,7 +32,7 @@ public interface StateContext { * @param key key to increment * @param amount the amount incremented */ - void incr(String key, long amount) throws Exception; + CompletableFuture<Void> incrCounter(String key, long amount) throws Exception; /** * Update the given <i>key</i> to the provide <i>value</i>. @@ -49,7 +50,7 @@ public interface StateContext { * @param key key to update. * @param value value to update */ - void put(String key, ByteBuffer value) throws Exception; + CompletableFuture<Void> put(String key, ByteBuffer value) throws Exception; /** * Get the value of a given <i>key</i>. @@ -57,7 +58,7 @@ public interface StateContext { * @param key key to retrieve * @return a completable future representing the retrieve result. */ - ByteBuffer getValue(String key) throws Exception; + CompletableFuture<ByteBuffer> get(String key) throws Exception; /** * Get the amount of a given <i>key</i>. @@ -65,6 +66,6 @@ public interface StateContext { * @param key key to retrieve * @return a completable future representing the retrieve result. */ - long getAmount(String key) throws Exception; + CompletableFuture<Long> getCounter(String key) throws Exception; } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java index 1a2c26d..4e60814 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/instance/state/StateContextImpl.java @@ -19,11 +19,12 @@ package org.apache.pulsar.functions.instance.state; import static java.nio.charset.StandardCharsets.UTF_8; -import static org.apache.bookkeeper.common.concurrent.FutureUtils.result; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + import org.apache.bookkeeper.api.kv.Table; /** @@ -40,35 +41,38 @@ public class StateContextImpl implements StateContext { } @Override - public void incr(String key, long amount) throws Exception { + public CompletableFuture<Void> incrCounter(String key, long amount) { // TODO: this can be optimized with a batch operation. - result(table.increment( + return table.increment( Unpooled.wrappedBuffer(key.getBytes(UTF_8)), - amount)); + amount); } @Override - public void put(String key, ByteBuffer value) throws Exception { - result(table.put( + public CompletableFuture<Void> put(String key, ByteBuffer value) { + return table.put( Unpooled.wrappedBuffer(key.getBytes(UTF_8)), - Unpooled.wrappedBuffer(value))); + Unpooled.wrappedBuffer(value)); } @Override - public ByteBuffer getValue(String key) throws Exception { - ByteBuf data = result(table.get(Unpooled.wrappedBuffer(key.getBytes(UTF_8)))); - try { - ByteBuffer result = ByteBuffer.allocate(data.readableBytes()); - data.readBytes(result); - return result; - } finally { - data.release(); - } + public CompletableFuture<ByteBuffer> get(String key) { + return table.get(Unpooled.wrappedBuffer(key.getBytes(UTF_8))).thenApply( + data -> { + try { + ByteBuffer result = ByteBuffer.allocate(data.readableBytes()); + data.readBytes(result); + return result; + } finally { + data.release(); + } + } + ); } @Override - public long getAmount(String key) throws Exception { - return result(table.getNumber(Unpooled.wrappedBuffer(key.getBytes(UTF_8)))); + public CompletableFuture<Long> getCounter(String key) { + return table.getNumber(Unpooled.wrappedBuffer(key.getBytes(UTF_8))); } } diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java index a898e54..fd541f3 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/ContextImplTest.java @@ -104,16 +104,16 @@ public class ContextImplTest { public void testIncrCounterStateEnabled() throws Exception { StateContextImpl stateContext = mock(StateContextImpl.class); context.setStateContext(stateContext); - context.incrCounter("test-key", 10L); - verify(stateContext, times(1)).incr(eq("test-key"), eq(10L)); + context.incrCounterAsync("test-key", 10L); + verify(stateContext, times(1)).incrCounter(eq("test-key"), eq(10L)); } @Test public void testGetCounterStateEnabled() throws Exception { StateContextImpl stateContext = mock(StateContextImpl.class); context.setStateContext(stateContext); - context.getCounter("test-key"); - verify(stateContext, times(1)).getAmount(eq("test-key")); + context.getCounterAsync("test-key"); + verify(stateContext, times(1)).getCounter(eq("test-key")); } @Test @@ -121,7 +121,7 @@ public class ContextImplTest { StateContextImpl stateContext = mock(StateContextImpl.class); context.setStateContext(stateContext); ByteBuffer buffer = ByteBuffer.wrap("test-value".getBytes(UTF_8)); - context.putState("test-key", buffer); + context.putStateAsync("test-key", buffer); verify(stateContext, times(1)).put(eq("test-key"), same(buffer)); } @@ -129,8 +129,8 @@ public class ContextImplTest { public void testGetStateStateEnabled() throws Exception { StateContextImpl stateContext = mock(StateContextImpl.class); context.setStateContext(stateContext); - context.getState("test-key"); - verify(stateContext, times(1)).getValue(eq("test-key")); + context.getStateAsync("test-key"); + verify(stateContext, times(1)).get(eq("test-key")); } @Test diff --git a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java index 338f64e..2805a3d 100644 --- a/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java +++ b/pulsar-functions/instance/src/test/java/org/apache/pulsar/functions/instance/state/StateContextImplTest.java @@ -55,7 +55,7 @@ public class StateContextImplTest { public void testIncr() throws Exception { when(mockTable.increment(any(ByteBuf.class), anyLong())) .thenReturn(FutureUtils.Void()); - stateContext.incr("test-key", 10L); + stateContext.incrCounter("test-key", 10L).get(); verify(mockTable, times(1)).increment( eq(Unpooled.copiedBuffer("test-key", UTF_8)), eq(10L) @@ -66,7 +66,7 @@ public class StateContextImplTest { public void testPut() throws Exception { when(mockTable.put(any(ByteBuf.class), any(ByteBuf.class))) .thenReturn(FutureUtils.Void()); - stateContext.put("test-key", ByteBuffer.wrap("test-value".getBytes(UTF_8))); + stateContext.put("test-key", ByteBuffer.wrap("test-value".getBytes(UTF_8))).get(); verify(mockTable, times(1)).put( eq(Unpooled.copiedBuffer("test-key", UTF_8)), eq(Unpooled.copiedBuffer("test-value", UTF_8)) @@ -78,7 +78,7 @@ public class StateContextImplTest { ByteBuf returnedValue = Unpooled.copiedBuffer("test-value", UTF_8); when(mockTable.get(any(ByteBuf.class))) .thenReturn(FutureUtils.value(returnedValue)); - ByteBuffer result = stateContext.getValue("test-key"); + ByteBuffer result = stateContext.get("test-key").get(); assertEquals("test-value", new String(result.array(), UTF_8)); verify(mockTable, times(1)).get( eq(Unpooled.copiedBuffer("test-key", UTF_8)) @@ -89,7 +89,7 @@ public class StateContextImplTest { public void testGetAmount() throws Exception { when(mockTable.getNumber(any(ByteBuf.class))) .thenReturn(FutureUtils.value(10L)); - assertEquals(10L, stateContext.getAmount("test-key")); + assertEquals((Long)10L, stateContext.getCounter("test-key").get()); verify(mockTable, times(1)).getNumber( eq(Unpooled.copiedBuffer("test-key", UTF_8)) );