This is an automated email from the ASF dual-hosted git repository. sanjeevrk pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new da0cc00 Made Window Context up to date with Context (#2932) da0cc00 is described below commit da0cc00e07001d3aab7dcc940812b1c4df67b642 Author: Sanjeev Kulkarni <sanjee...@gmail.com> AuthorDate: Tue Nov 6 09:11:17 2018 -0800 Made Window Context up to date with Context (#2932) * Made Window Context up to date with Context * Added state interfaces as well --- .../pulsar/functions/windowing/WindowContext.java | 105 ++++++++++++++++++--- .../functions/windowing/WindowContextImpl.java | 69 ++++++++++++-- 2 files changed, 153 insertions(+), 21 deletions(-) diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java index 63e395c..2f1f2e7 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java @@ -20,9 +20,25 @@ package org.apache.pulsar.functions.windowing; import org.slf4j.Logger; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; import java.util.concurrent.CompletableFuture; public interface WindowContext { + + /** + * The tenant this function belongs to + * @return the tenant this function belongs to + */ + String getTenant(); + + /** + * The namespace this function belongs to + * @return the namespace this function belongs to + */ + String getNamespace(); + /** * The name of the function that we are executing * @return The Function name @@ -43,29 +59,35 @@ public interface WindowContext { int getInstanceId(); /** + * Get the number of instances that invoke this function. + * + * @return the number of instances that invoke this function. + */ + int getNumInstances(); + + /** * The version of the function that we are executing * @return The version id */ String getFunctionVersion(); /** - * The memory limit that this function is limited to - * @return Memory limit in bytes + * Get a list of all input topics + * @return a list of all input topics */ - long getMemoryLimit(); + Collection<String> getInputTopics(); /** - * The time budget in ms that the function is limited to. - * @return Time budget in msecs. + * Get the output topic of the function + * @return output topic name */ - long getTimeBudgetInMs(); + String getOutputTopic(); /** - * The time in ms remaining for this function execution to complete before it - * will be flagged as an error - * @return Time remaining in ms. + * Get output schema builtin type or custom class name + * @return output schema builtin type or custom class name */ - long getRemainingTimeInMs(); + String getOutputSchemaType(); /** * The logger object that can be used to log in a function @@ -74,6 +96,43 @@ public interface WindowContext { Logger getLogger(); /** + * Increment the builtin distributed counter refered by key + * @param key The name of the key + * @param amount The amount to be incremented + */ + void incrCounter(String key, long amount); + + /** + * Retrieve the counter value for the key. + * + * @param key name of the key + * @return the amount of the counter value for this key + */ + long getCounter(String key); + + /** + * Updare the state value for the key. + * + * @param key name of the key + * @param value state value of the key + */ + void putState(String key, ByteBuffer value); + + /** + * Retrieve the state value for the key. + * + * @param key name of the key + * @return the state value for the key. + */ + ByteBuffer getState(String key); + + /** + * Get a map of all user-defined key/value configs for the function + * @return The full map of user-defined config values + */ + Map<String, Object> getUserConfigMap(); + + /** * Get Any user defined key/value * @param key The key * @return The value specified by the user for that key. null if no such key @@ -81,6 +140,14 @@ public interface WindowContext { String getUserConfigValue(String key); /** + * Get any user-defined key/value or a default value if none is present + * @param key + * @param defaultValue + * @return Either the user config value associated with a given key or a supplied default value + */ + Object getUserConfigValueOrDefault(String key, Object defaultValue); + + /** * Record a user defined metric * @param metricName The name of the metric * @param value The value of the metric @@ -89,10 +156,22 @@ public interface WindowContext { /** * Publish an object using serDe for serializing to the topic + * + * @param topicName + * The name of the topic for publishing + * @param object + * The object that needs to be published + * @param schemaOrSerdeClassName + * Either a builtin schema type (eg: "avro", "json", "protobuf") or the class name of the custom schema class + * @return A future that completes when the framework is done publishing the message + */ + <O> CompletableFuture<Void> publish(String topicName, O object, String schemaOrSerdeClassName); + + /** + * Publish an object to the topic using default schemas * @param topicName The name of the topic for publishing * @param object The object that needs to be published - * @param serDeClassName The class name of the class that needs to be used to serialize the object before publishing - * @return + * @return A future that completes when the framework is done publishing the message */ - CompletableFuture<Void> publish(String topicName, Object object, String serDeClassName); + <O> CompletableFuture<Void> publish(String topicName, O object); } diff --git a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java index e03ed97..41e8ebe 100644 --- a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java +++ b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java @@ -21,6 +21,9 @@ package org.apache.pulsar.functions.windowing; import org.apache.pulsar.functions.api.Context; import org.slf4j.Logger; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.Map; import java.util.concurrent.CompletableFuture; public class WindowContextImpl implements WindowContext { @@ -32,6 +35,16 @@ public class WindowContextImpl implements WindowContext { } @Override + public String getTenant() { + return this.context.getTenant(); + } + + @Override + public String getNamespace() { + return this.context.getNamespace(); + } + + @Override public String getFunctionName() { return this.context.getFunctionName(); } @@ -47,23 +60,28 @@ public class WindowContextImpl implements WindowContext { } @Override + public int getNumInstances() { + return this.context.getNumInstances(); + } + + @Override public String getFunctionVersion() { return this.getFunctionVersion(); } @Override - public long getMemoryLimit() { - return this.getMemoryLimit(); + public Collection<String> getInputTopics() { + return this.context.getInputTopics(); } @Override - public long getTimeBudgetInMs() { - return this.getTimeBudgetInMs(); + public String getOutputTopic() { + return this.context.getOutputTopic(); } @Override - public long getRemainingTimeInMs() { - return this.getRemainingTimeInMs(); + public String getOutputSchemaType() { + return this.context.getOutputSchemaType(); } @Override @@ -72,17 +90,52 @@ public class WindowContextImpl implements WindowContext { } @Override + public void incrCounter(String key, long amount) { + this.context.incrCounter(key, amount); + } + + @Override + public long getCounter(String key) { + return this.context.getCounter(key); + } + + @Override + public void putState(String key, ByteBuffer value) { + this.context.putState(key, value); + } + + @Override + public ByteBuffer getState(String key) { + return this.context.getState(key); + } + + @Override + public Map<String, Object> getUserConfigMap() { + return this.context.getUserConfigMap(); + } + + @Override public String getUserConfigValue(String key) { return this.getUserConfigValue(key); } @Override + public Object getUserConfigValueOrDefault(String key, Object defaultValue) { + return this.context.getUserConfigValueOrDefault(key, defaultValue); + } + + @Override public void recordMetric(String metricName, double value) { this.context.recordMetric(metricName, value); } @Override - public CompletableFuture<Void> publish(String topicName, Object object, String serDeClassName) { - return this.context.publish(topicName, object, serDeClassName); + public <O> CompletableFuture<Void> publish(String topicName, O object) { + return this.context.publish(topicName, object); + } + + @Override + public CompletableFuture<Void> publish(String topicName, Object object, String schemaOrSerdeClassName) { + return this.context.publish(topicName, object, schemaOrSerdeClassName); } }