This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new f8349e2 Expose state to sources and sinks (#4364) f8349e2 is described below commit f8349e2235a340a1ee9a416a07a4d2648dfe4f52 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Fri May 24 17:37:43 2019 -0700 Expose state to sources and sinks (#4364) * Expose state to sources and sinks * Fix unittest * Fix unittest --- .../apache/pulsar/io/common/IOConfigUtilsTest.java | 79 ++++++++++++++++++++++ .../org/apache/pulsar/io/core/SinkContext.java | 69 +++++++++++++++++++ .../org/apache/pulsar/io/core/SourceContext.java | 70 +++++++++++++++++++ .../io/kafka/sink/KafkaAbstractSinkTest.java | 42 ++++++++++++ .../io/kafka/source/KafkaAbstractSourceTest.java | 42 ++++++++++++ 5 files changed, 302 insertions(+) diff --git a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java index af62c7f..2296dae 100644 --- a/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java +++ b/pulsar-io/common/src/test/java/org/apache/pulsar/io/common/IOConfigUtilsTest.java @@ -27,9 +27,11 @@ import org.slf4j.Logger; import org.testng.Assert; import org.testng.annotations.Test; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.CompletableFuture; @Slf4j public class IOConfigUtilsTest { @@ -115,6 +117,44 @@ public class IOConfigUtilsTest { public String getSecret(String secretName) { return secretsMap.get(secretName); } + + @Override + public void incrCounter(String key, long amount) { } + + @Override + public CompletableFuture<Void> incrCounterAsync(String key, long amount) { + return null; + } + + @Override + public long getCounter(String key) { + return 0; + } + + @Override + public CompletableFuture<Long> getCounterAsync(String key) { + return null; + } + + @Override + public void putState(String key, ByteBuffer value) { + + } + + @Override + public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) { + return null; + } + + @Override + public ByteBuffer getState(String key) { + return null; + } + + @Override + public CompletableFuture<ByteBuffer> getStateAsync(String key) { + return null; + } } @Test @@ -189,6 +229,45 @@ public class IOConfigUtilsTest { public String getSecret(String secretName) { return secretsMap.get(secretName); } + + @Override + public void incrCounter(String key, long amount) { + } + + @Override + public CompletableFuture<Void> incrCounterAsync(String key, long amount) { + return null; + } + + @Override + public long getCounter(String key) { + return 0; + } + + @Override + public CompletableFuture<Long> getCounterAsync(String key) { + return null; + } + + @Override + public void putState(String key, ByteBuffer value) { + + } + + @Override + public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) { + return null; + } + + @Override + public ByteBuffer getState(String key) { + return null; + } + + @Override + public CompletableFuture<ByteBuffer> getStateAsync(String key) { + return null; + } } @Test diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java index d30ff7b..1a8a859 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SinkContext.java @@ -20,7 +20,9 @@ package org.apache.pulsar.io.core; import org.slf4j.Logger; +import java.nio.ByteBuffer; import java.util.Collection; +import java.util.concurrent.CompletableFuture; public interface SinkContext { @@ -81,4 +83,71 @@ public interface SinkContext { * @return The secret if anything was found or null */ String getSecret(String secretName); + + /** + * Increment the builtin distributed counter referred by key. + * + * @param key The name of the key + * @param amount The amount to be incremented + */ + void incrCounter(String key, long amount); + + + /** + * Increment the builtin distributed counter referred by key + * but dont wait for the completion of the increment operation + * + * @param key The name of the key + * @param amount The amount to be incremented + */ + CompletableFuture<Void> incrCounterAsync(String key, long amount); + + /** + * Retrieve the counter value for the key. + * + * @param key name of the key + * @return the amount of the counter value for this key + */ + long getCounter(String key); + + /** + * Retrieve the counter value for the key, but don't wait + * for the operation to be completed + * + * @param key name of the key + * @return the amount of the counter value for this key + */ + CompletableFuture<Long> getCounterAsync(String key); + + /** + * Update the state value for the key. + * + * @param key name of the key + * @param value state value of the key + */ + void putState(String key, ByteBuffer value); + + /** + * Update the state value for the key, but don't wait for the operation to be completed + * + * @param key name of the key + * @param value state value of the key + */ + CompletableFuture<Void> putStateAsync(String key, ByteBuffer value); + + /** + * Retrieve the state value for the key. + * + * @param key name of the key + * @return the state value for the key. + */ + ByteBuffer getState(String key); + + /** + * Retrieve the state value for the key, but don't wait for the operation to be completed + * + * @param key name of the key + * @return the state value for the key. + */ + CompletableFuture<ByteBuffer> getStateAsync(String key); } diff --git a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java index b758220..a27d05f 100644 --- a/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java +++ b/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/SourceContext.java @@ -20,6 +20,9 @@ package org.apache.pulsar.io.core; import org.slf4j.Logger; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; + public interface SourceContext { /** @@ -79,4 +82,71 @@ public interface SourceContext { * @return The secret if anything was found or null */ String getSecret(String secretName); + + /** + * Increment the builtin distributed counter referred by key. + * + * @param key The name of the key + * @param amount The amount to be incremented + */ + void incrCounter(String key, long amount); + + + /** + * Increment the builtin distributed counter referred by key + * but dont wait for the completion of the increment operation + * + * @param key The name of the key + * @param amount The amount to be incremented + */ + CompletableFuture<Void> incrCounterAsync(String key, long amount); + + /** + * Retrieve the counter value for the key. + * + * @param key name of the key + * @return the amount of the counter value for this key + */ + long getCounter(String key); + + /** + * Retrieve the counter value for the key, but don't wait + * for the operation to be completed + * + * @param key name of the key + * @return the amount of the counter value for this key + */ + CompletableFuture<Long> getCounterAsync(String key); + + /** + * Update the state value for the key. + * + * @param key name of the key + * @param value state value of the key + */ + void putState(String key, ByteBuffer value); + + /** + * Update the state value for the key, but don't wait for the operation to be completed + * + * @param key name of the key + * @param value state value of the key + */ + CompletableFuture<Void> putStateAsync(String key, ByteBuffer value); + + /** + * Retrieve the state value for the key. + * + * @param key name of the key + * @return the state value for the key. + */ + ByteBuffer getState(String key); + + /** + * Retrieve the state value for the key, but don't wait for the operation to be completed + * + * @param key name of the key + * @return the state value for the key. + */ + CompletableFuture<ByteBuffer> getStateAsync(String key); } diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java index 9e74b89..c4522d5 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/sink/KafkaAbstractSinkTest.java @@ -32,10 +32,12 @@ import org.testng.annotations.Test; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import static org.testng.Assert.*; @@ -115,6 +117,46 @@ public class KafkaAbstractSinkTest { @Override public String getSecret(String key) { return null; } + + @Override + public void incrCounter(String key, long amount) { + + } + + @Override + public CompletableFuture<Void> incrCounterAsync(String key, long amount) { + return null; + } + + @Override + public long getCounter(String key) { + return 0; + } + + @Override + public CompletableFuture<Long> getCounterAsync(String key) { + return null; + } + + @Override + public void putState(String key, ByteBuffer value) { + + } + + @Override + public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) { + return null; + } + + @Override + public ByteBuffer getState(String key) { + return null; + } + + @Override + public CompletableFuture<ByteBuffer> getStateAsync(String key) { + return null; + } }; ThrowingRunnable openAndClose = ()->{ try { diff --git a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java index 2cee062..3bfd358 100644 --- a/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java +++ b/pulsar-io/kafka/src/test/java/org/apache/pulsar/io/kafka/source/KafkaAbstractSourceTest.java @@ -31,9 +31,11 @@ import org.testng.annotations.Test; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.HashMap; import java.util.Map; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNotNull; @@ -116,6 +118,46 @@ public class KafkaAbstractSourceTest { @Override public String getSecret(String key) { return null; } + + @Override + public void incrCounter(String key, long amount) { + + } + + @Override + public CompletableFuture<Void> incrCounterAsync(String key, long amount) { + return null; + } + + @Override + public long getCounter(String key) { + return 0; + } + + @Override + public CompletableFuture<Long> getCounterAsync(String key) { + return null; + } + + @Override + public void putState(String key, ByteBuffer value) { + + } + + @Override + public CompletableFuture<Void> putStateAsync(String key, ByteBuffer value) { + return null; + } + + @Override + public ByteBuffer getState(String key) { + return null; + } + + @Override + public CompletableFuture<ByteBuffer> getStateAsync(String key) { + return null; + } }; Map<String, Object> config = new HashMap<>(); ThrowingRunnable openAndClose = ()->{