This is an automated email from the ASF dual-hosted git repository.

sanjeevrk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new da0cc00  Made Window Context up to date with Context (#2932)
da0cc00 is described below

commit da0cc00e07001d3aab7dcc940812b1c4df67b642
Author: Sanjeev Kulkarni <sanjee...@gmail.com>
AuthorDate: Tue Nov 6 09:11:17 2018 -0800

    Made Window Context up to date with Context (#2932)
    
    * Made Window Context up to date with Context
    
    * Added state interfaces as well
---
 .../pulsar/functions/windowing/WindowContext.java  | 105 ++++++++++++++++++---
 .../functions/windowing/WindowContextImpl.java     |  69 ++++++++++++--
 2 files changed, 153 insertions(+), 21 deletions(-)

diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
index 63e395c..2f1f2e7 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContext.java
@@ -20,9 +20,25 @@ package org.apache.pulsar.functions.windowing;
 
 import org.slf4j.Logger;
 
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 public interface WindowContext {
+
+    /**
+     * The tenant this function belongs to
+     * @return the tenant this function belongs to
+     */
+    String getTenant();
+
+    /**
+     * The namespace this function belongs to
+     * @return the namespace this function belongs to
+     */
+    String getNamespace();
+
     /**
      * The name of the function that we are executing
      * @return The Function name
@@ -43,29 +59,35 @@ public interface WindowContext {
     int getInstanceId();
 
     /**
+     * Get the number of instances that invoke this function.
+     *
+     * @return the number of instances that invoke this function.
+     */
+    int getNumInstances();
+
+    /**
      * The version of the function that we are executing
      * @return The version id
      */
     String getFunctionVersion();
 
     /**
-     * The memory limit that this function is limited to
-     * @return Memory limit in bytes
+     * Get a list of all input topics
+     * @return a list of all input topics
      */
-    long getMemoryLimit();
+    Collection<String> getInputTopics();
 
     /**
-     * The time budget in ms that the function is limited to.
-     * @return Time budget in msecs.
+     * Get the output topic of the function
+     * @return output topic name
      */
-    long getTimeBudgetInMs();
+    String getOutputTopic();
 
     /**
-     * The time in ms remaining for this function execution to complete before 
it
-     * will be flagged as an error
-     * @return Time remaining in ms.
+     * Get output schema builtin type or custom class name
+     * @return output schema builtin type or custom class name
      */
-    long getRemainingTimeInMs();
+    String getOutputSchemaType();
 
     /**
      * The logger object that can be used to log in a function
@@ -74,6 +96,43 @@ public interface WindowContext {
     Logger getLogger();
 
     /**
+     * Increment the builtin distributed counter refered by key
+     * @param key The name of the key
+     * @param amount The amount to be incremented
+     */
+    void incrCounter(String key, long amount);
+
+    /**
+     * Retrieve the counter value for the key.
+     *
+     * @param key name of the key
+     * @return the amount of the counter value for this key
+     */
+    long getCounter(String key);
+
+    /**
+     * Updare the state value for the key.
+     *
+     * @param key name of the key
+     * @param value state value of the key
+     */
+    void putState(String key, ByteBuffer value);
+
+    /**
+     * Retrieve the state value for the key.
+     *
+     * @param key name of the key
+     * @return the state value for the key.
+     */
+    ByteBuffer getState(String key);
+
+    /**
+     * Get a map of all user-defined key/value configs for the function
+     * @return The full map of user-defined config values
+     */
+    Map<String, Object> getUserConfigMap();
+
+    /**
      * Get Any user defined key/value
      * @param key The key
      * @return The value specified by the user for that key. null if no such 
key
@@ -81,6 +140,14 @@ public interface WindowContext {
     String getUserConfigValue(String key);
 
     /**
+     * Get any user-defined key/value or a default value if none is present
+     * @param key
+     * @param defaultValue
+     * @return Either the user config value associated with a given key or a 
supplied default value
+     */
+    Object getUserConfigValueOrDefault(String key, Object defaultValue);
+
+    /**
      * Record a user defined metric
      * @param metricName The name of the metric
      * @param value The value of the metric
@@ -89,10 +156,22 @@ public interface WindowContext {
 
     /**
      * Publish an object using serDe for serializing to the topic
+     *
+     * @param topicName
+     *            The name of the topic for publishing
+     * @param object
+     *            The object that needs to be published
+     * @param schemaOrSerdeClassName
+     *            Either a builtin schema type (eg: "avro", "json", 
"protobuf") or the class name of the custom schema class
+     * @return A future that completes when the framework is done publishing 
the message
+     */
+    <O> CompletableFuture<Void> publish(String topicName, O object, String 
schemaOrSerdeClassName);
+
+    /**
+     * Publish an object to the topic using default schemas
      * @param topicName The name of the topic for publishing
      * @param object The object that needs to be published
-     * @param serDeClassName The class name of the class that needs to be used 
to serialize the object before publishing
-     * @return
+     * @return A future that completes when the framework is done publishing 
the message
      */
-    CompletableFuture<Void> publish(String topicName, Object object, String 
serDeClassName);
+    <O> CompletableFuture<Void> publish(String topicName, O object);
 }
diff --git 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
index e03ed97..41e8ebe 100644
--- 
a/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
+++ 
b/pulsar-functions/instance/src/main/java/org/apache/pulsar/functions/windowing/WindowContextImpl.java
@@ -21,6 +21,9 @@ package org.apache.pulsar.functions.windowing;
 import org.apache.pulsar.functions.api.Context;
 import org.slf4j.Logger;
 
+import java.nio.ByteBuffer;
+import java.util.Collection;
+import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 
 public class WindowContextImpl implements WindowContext {
@@ -32,6 +35,16 @@ public class WindowContextImpl implements WindowContext {
     }
 
     @Override
+    public String getTenant() {
+        return this.context.getTenant();
+    }
+
+    @Override
+    public String getNamespace() {
+        return this.context.getNamespace();
+    }
+
+    @Override
     public String getFunctionName() {
         return this.context.getFunctionName();
     }
@@ -47,23 +60,28 @@ public class WindowContextImpl implements WindowContext {
     }
 
     @Override
+    public int getNumInstances() {
+        return this.context.getNumInstances();
+    }
+
+    @Override
     public String getFunctionVersion() {
         return this.getFunctionVersion();
     }
 
     @Override
-    public long getMemoryLimit() {
-        return this.getMemoryLimit();
+    public Collection<String> getInputTopics() {
+        return this.context.getInputTopics();
     }
 
     @Override
-    public long getTimeBudgetInMs() {
-        return this.getTimeBudgetInMs();
+    public String getOutputTopic() {
+        return this.context.getOutputTopic();
     }
 
     @Override
-    public long getRemainingTimeInMs() {
-        return this.getRemainingTimeInMs();
+    public String getOutputSchemaType() {
+        return this.context.getOutputSchemaType();
     }
 
     @Override
@@ -72,17 +90,52 @@ public class WindowContextImpl implements WindowContext {
     }
 
     @Override
+    public void incrCounter(String key, long amount) {
+        this.context.incrCounter(key, amount);
+    }
+
+    @Override
+    public long getCounter(String key) {
+        return this.context.getCounter(key);
+    }
+
+    @Override
+    public void putState(String key, ByteBuffer value) {
+        this.context.putState(key, value);
+    }
+
+    @Override
+    public ByteBuffer getState(String key) {
+        return this.context.getState(key);
+    }
+
+    @Override
+    public Map<String, Object> getUserConfigMap() {
+        return this.context.getUserConfigMap();
+    }
+
+    @Override
     public String getUserConfigValue(String key) {
         return this.getUserConfigValue(key);
     }
 
     @Override
+    public Object getUserConfigValueOrDefault(String key, Object defaultValue) 
{
+        return this.context.getUserConfigValueOrDefault(key, defaultValue);
+    }
+
+    @Override
     public void recordMetric(String metricName, double value) {
         this.context.recordMetric(metricName, value);
     }
 
     @Override
-    public CompletableFuture<Void> publish(String topicName, Object object, 
String serDeClassName) {
-        return this.context.publish(topicName, object, serDeClassName);
+    public <O> CompletableFuture<Void> publish(String topicName, O object) {
+        return this.context.publish(topicName, object);
+    }
+
+    @Override
+    public CompletableFuture<Void> publish(String topicName, Object object, 
String schemaOrSerdeClassName) {
+        return this.context.publish(topicName, object, schemaOrSerdeClassName);
     }
 }

Reply via email to